项目作者: factset

项目描述 :
Lazypipe with labels.
高级语言: JavaScript
项目地址: git://github.com/factset/labeledpipe.git
创建时间: 2016-03-23T16:45:49Z
项目社区:https://github.com/factset/labeledpipe

开源协议:Apache License 2.0

下载


Archived

This repository is not being actively maintained, and has been archived.

labeledpipe

CircleCI
codecov.io

Lazypipe with optional labels.

Like lazypipe, labeledpipe creates an immutable lazily initialized pipeline.
Unlike lazypipe it allows pipeline stages to be labeled. You can then use those
labels to control where pipeline stages are added, and even remove previously
added stages.

Table of Contents

Installation

To install labeledpipe for use in your project please run the following command:

  1. yarn add labeledpipe

If you are using the npm package manager:

  1. npm install labeledpipe

Usage

Basic

In this example we use labeledpipe exactly as we would use lazypipe.

  1. var labeledpipe = require('labeledpipe');
  2. var through = require('through2');
  3. // A simple transform stream that reports the stage name to the console.
  4. function reportStage (name) {
  5. return through.obj(function (obj, enc, done) {
  6. console.log(name + ':', obj);
  7. done(null, obj);
  8. });
  9. }
  10. // create a pipeline
  11. var pipeline = labeledpipe()
  12. .pipe(reportStage, 'A')
  13. .pipe(reportStage, 'B')
  14. ;
  15. // create a stream from the pipeline
  16. var stream = pipeline();
  17. // We could also have piped to the stream created from our pipeline
  18. // var stream = through.obj()
  19. // .pipe(pipeline())
  20. // ;
  21. // write some data to the stream and close it.
  22. stream.write('Some data');
  23. stream.end();

Output:

  1. A: Some data
  2. B: Some data

Labeling stages

To label a stage of the pipeline, we just prefix arguments to .pipe with the
stages label:

  1. var pipeline = labeledpipe()
  2. .pipe('stage-label', reportStage, 'A')
  3. ;

We can now use the stage labels to change the point at which .pipe inserts the
next pipeline stage. labeledpipe refers to this point as the cursor.

  1. var labeledpipe = require('labeledpipe');
  2. var through = require('through2');
  3. // A simple transform stream that reports the stage name to the console.
  4. function reportStage (name) {
  5. return through.obj(function (obj, enc, done) {
  6. console.log(name + ':', obj);
  7. done(null, obj);
  8. });
  9. }
  10. // create a pipeline
  11. var pipeline = labeledpipe()
  12. .pipe('stage-A', reportStage, 'A')
  13. .pipe('stage-B', reportStage, 'B')
  14. // position the cursor before the stage labeled 'stage-B'
  15. .before('stage-B')
  16. .pipe('stage-C', reportStage, 'C')
  17. ;
  18. // create a stream from the pipeline
  19. var stream = pipeline();
  20. stream.write('Some data');
  21. stream.end();

Output:

  1. A: Some data
  2. C: Some data
  3. B: Some data

Cursor Movement

In addition to .before, labeledpipe allows several other cursor positioning
commands. Here is a complete list:

  • .before(label)
  • .after(label)
  • .first()
  • .last()
  • .beginningOf(label)
  • .endOf(label)
  1. var labeledpipe = require('labeledpipe');
  2. var through = require('through2');
  3. // A simple transform stream that reports the stage name to the console.
  4. function reportStage (name) {
  5. return through.obj(function (obj, enc, done) {
  6. console.log(name + ':', obj);
  7. done(null, obj);
  8. });
  9. }
  10. // create a pipeline
  11. var pipeline = labeledpipe()
  12. .pipe('stage-A', reportStage, 'A')
  13. .pipe('stage-B', reportStage, 'B')
  14. .pipe('stage-C', reportStage, 'C')
  15. // insert before stage B
  16. .before('stage-B')
  17. .pipe(reportStage, 'A/B')
  18. // insert after stage B
  19. .after('stage-B')
  20. .pipe(reportStage, 'B/C')
  21. // insert at the beginning of the pipeline
  22. .first()
  23. .pipe(reportStage, 'Start')
  24. // insert at the end of the pipeline
  25. .last()
  26. .pipe(reportStage, 'Finish')
  27. ;
  28. // create a stream from the pipeline
  29. var stream = pipeline();
  30. stream.write('Some data');
  31. stream.end();

Output:

  1. Start: Some data
  2. A: Some data
  3. A/B: Some data
  4. B: Some data
  5. B/C: Some data
  6. C: Some data
  7. Finish: Some data

Removing Stages

Labeledpipe also lets you remove labeled stages that were previously added to
the pipeline. This allows you to use most of a pipeline created by another
project

  1. var labeledpipe = require('labeledpipe');
  2. var through = require('through2');
  3. // A simple transform stream that reports the stage name to the console.
  4. function reportStage (name) {
  5. return through.obj(function (obj, enc, done) {
  6. console.log(name + ':', obj);
  7. done(null, obj);
  8. });
  9. }
  10. // create a pipeline
  11. var pipeline = labeledpipe()
  12. .pipe('stage-A', reportStage, 'A')
  13. .pipe('stage-B', reportStage, 'B')
  14. .pipe('stage-C', reportStage, 'C')
  15. //remove stage-B
  16. .remove('stage-B')
  17. // continue working with pipeline
  18. .pipe(reportStage, 'D')
  19. ;
  20. // create a stream from the pipeline
  21. var stream = pipeline();
  22. stream.write('Some data');
  23. stream.end();

Output:

  1. A: Some data
  2. C: Some data
  3. D: Some data

Nested Pipelines

Like lazypipe, labeledpipe also lets you nest pipelines. This allows common
pipeline to be written once and used in multiple pipelines.

  1. var labeledpipe = require('labeledpipe');
  2. var through = require('through2');
  3. // A simple transform stream that reports the stage name to the console.
  4. function reportStage (name) {
  5. return through.obj(function (obj, enc, done) {
  6. console.log(name + ':', obj);
  7. done(null, obj);
  8. });
  9. }
  10. var common = labeledpipe()
  11. .pipe(reportStage, 'A')
  12. .pipe(reportStage, 'B')
  13. .pipe(reportStage, 'C')
  14. ;
  15. // create a pipeline
  16. var pipeline = labeledpipe()
  17. .pipe(common)
  18. // continue working with pipeline
  19. .pipe(reportStage, 'D')
  20. ;
  21. // create a stream from the pipeline
  22. var stream = pipeline();
  23. stream.write('Some data');
  24. stream.end();

Output:

  1. A: Some data
  2. B: Some data
  3. C: Some data
  4. D: Some data

Unlike lazypipe however, you can also label the common pipelines and use the
cursor positioning commands to position relative to both the nested pipeline
itself, and the stages in the nested pipeline:

  1. var labeledpipe = require('labeledpipe');
  2. var through = require('through2');
  3. // A simple transform stream that reports the stage name to the console.
  4. function reportStage (name) {
  5. return through.obj(function (obj, enc, done) {
  6. console.log(name + ':', obj);
  7. done(null, obj);
  8. });
  9. }
  10. var common = labeledpipe()
  11. .pipe('stage-A', reportStage, 'A')
  12. .pipe('stage-B', reportStage, 'B')
  13. .pipe('stage-C', reportStage, 'C')
  14. ;
  15. // create a pipeline
  16. var pipeline = labeledpipe()
  17. .pipe('common-stage', common)
  18. // insert before common
  19. .before('common-stage')
  20. .pipe(reportStage, 'before-common')
  21. // insert at beginning of common
  22. .beginningOf('common-stage')
  23. .pipe(reportStage, 'beginning-of-common')
  24. // insert at end of common
  25. .endOf('common-stage')
  26. .pipe(reportStage, 'end-of-common')
  27. // insert after common
  28. .after('common-stage')
  29. .pipe(reportStage, 'after-common')
  30. // insert into common
  31. .after('stage-B')
  32. .pipe(reportStage, 'inside-common')
  33. ;
  34. // create a stream from the pipeline
  35. var stream = pipeline();
  36. stream.write('Some data');
  37. stream.end();

Output:

  1. before-common: Some data
  2. beginning-of-common: Some data
  3. A: Some data
  4. B: Some data
  5. inside-common: Some data
  6. C: Some data
  7. end-of-common: Some data
  8. after-common: Some data

Pseudo Stages

Labeledpipe lets you add labeled stages that do not contain a transform
stream. These are useful if you need to provide well know extension points.
Theses pseudo stages act like nested pipelines. As such you can use the
nested pipeline movement operators:

  • .beginningOf(label)
  • .endOf(label)

These operators move the cursor to just after the beginning or just before the
end of the pseudo stage.

  1. var labeledpipe = require('labeledpipe');
  2. var through = require('through2');
  3. // A simple transform stream that reports the stage name to the console.
  4. function reportStage (name) {
  5. return through.obj(function (obj, enc, done) {
  6. console.log(name + ':', obj);
  7. done(null, obj);
  8. });
  9. }
  10. // create a pipeline
  11. var pipeline = labeledpipe()
  12. .pipe('stage-A', reportStage, 'A')
  13. .pipe('extend-here')
  14. .pipe('stage-B', reportStage, 'B')
  15. // Add something right before the end of the extend-here marker
  16. .endOf('extend-here')
  17. .pipe('stage-Y', reportStage, 'Y')
  18. // Add something right after the beginning of the extend-here marker
  19. .beginningOf('extend-here')
  20. .pipe('stage-X', reportStage, 'X')
  21. // Add something right before the end of the extend-here marker
  22. .endOf('extend-here')
  23. .pipe('stage-Z', reportStage, 'Z')
  24. ;
  25. // create a stream from the pipeline
  26. var stream = pipeline();
  27. stream.write('Some data');
  28. stream.end();

Output:

  1. A: Some data
  2. X: Some data
  3. Y: Some data
  4. Z: Some data
  5. B: Some data

Mixing with lazypipe

Labeledpipe is designed to work seamlessly with lazypipe. Lazypipes can be used
as stages in a labeledpipe:

  1. var labeledpipe = require('labeledpipe');
  2. var lazypipe = require('lazypipe');
  3. var through = require('through2');
  4. // A simple transform stream that reports the stage name to the console.
  5. function reportStage (name) {
  6. return through.obj(function (obj, enc, done) {
  7. console.log(name + ':', obj);
  8. done(null, obj);
  9. });
  10. }
  11. var lazyPipeline = lazypipe()
  12. .pipe(reportStage, 'A')
  13. .pipe(reportStage, 'B')
  14. .pipe(reportStage, 'C')
  15. ;
  16. // create a pipeline
  17. var pipeline = labeledpipe()
  18. .pipe('lazy', lazyPipeline)
  19. // insert before lazy
  20. .before('lazy')
  21. .pipe(reportStage, 'before-lazy')
  22. // insert after lazy
  23. .after('lazy')
  24. .pipe(reportStage, 'after-lazy')
  25. ;
  26. // create a stream from the pipeline
  27. var stream = pipeline();
  28. stream.write('Some data');
  29. stream.end();

Output:

  1. before-lazy: Some data
  2. A: Some data
  3. B: Some data
  4. C: Some data
  5. after-lazy: Some data

Similarly, labeledpipes can be used a stages in a lazypipe:

  1. var labeledpipe = require('labeledpipe');
  2. var lazypipe = require('lazypipe');
  3. var through = require('through2');
  4. // A simple transform stream that reports the stage name to the console.
  5. function reportStage (name) {
  6. return through.obj(function (obj, enc, done) {
  7. console.log(name + ':', obj);
  8. done(null, obj);
  9. });
  10. }
  11. var labeledPipeline = labeledpipe()
  12. .pipe('stage-A', reportStage, 'A')
  13. .pipe('stage-B', reportStage, 'B')
  14. .pipe('stage-C', reportStage, 'C')
  15. ;
  16. // create a pipeline
  17. var pipeline = lazypipe()
  18. .pipe(reportStage, 'before-labeled')
  19. .pipe(labeledPipeline)
  20. .pipe(reportStage, 'after-labeled')
  21. ;
  22. // create a stream from the pipeline
  23. var stream = pipeline();
  24. stream.write('Some data');
  25. stream.end();

Output:

  1. before-labeled: Some data
  2. A: Some data
  3. B: Some data
  4. C: Some data
  5. after-labeled: Some data

Events

The labeledpipe object exports all of the chainable event emitter methods:

  • addListener
  • on
  • once
  • removeListener
  • removeAllListeners
  • setMaxListeners

The allows events handlers to be added to a pipeline as if the pipeline was
being constructed immediately.

  1. var labeledpipe = require('labeledpipe');
  2. var through = require('through2');
  3. function emitEvent (name) {
  4. return through.obj(function (obj, enc, done) {
  5. console.log(name + ':', obj);
  6. this.emit(name, name);
  7. done(null, obj);
  8. });
  9. }
  10. var pipeline = labeledpipe()
  11. .pipe(emitEvent, 'A')
  12. .on('A', console.log.bind(console, 'Event:'))
  13. .pipe(emitEvent, 'B')
  14. .on('B', console.log.bind(console, 'Event:'))
  15. ;
  16. var stream = pipeline();
  17. stream.write('Some data');
  18. stream.end();

Output:

  1. A: Some data
  2. Event: A
  3. B: Some data
  4. Event: B

Errors

Error events are a special case in node already. This treatment continues in
labeledpipe. By default, error events on pipeline’s streams “bubble up” and are
re-emitted on the pipeline stream. For example:

  1. var labeledpipe = require('labeledpipe');
  2. var through = require('through2');
  3. function returnError (name) {
  4. return through.obj(function (obj, enc, done) {
  5. done(new Error(name));
  6. });
  7. }
  8. var stream = labeledpipe()
  9. .pipe(returnError, 'A')
  10. ()
  11. ;
  12. stream.on('error', function (error) {
  13. console.log('Stream Hander:', error.message);
  14. });
  15. stream.write('my data');
  16. stream.end();

Output:

  1. Stream Hander: A

However, if you add an error handler to a pipeline stage, error event from that stage will no longer bubble up to the
stream.

  1. var labeledpipe = require('labeledpipe');
  2. var through = require('through2');
  3. function returnError (name) {
  4. return through.obj(function (obj, enc, done) {
  5. done(new Error(name));
  6. });
  7. }
  8. var stream = labeledpipe()
  9. .pipe(returnError, 'A')
  10. .on('error', function (error) {
  11. console.log('Pipeline Hander:', error.message);
  12. })
  13. ()
  14. ;
  15. stream.on('error', function (error) {
  16. console.log('Stream Hander:', error.message);
  17. });
  18. stream.write('my data');
  19. stream.end();

Output:

  1. Pipeline Hander: A

The same rules apply to sub pipelines

  1. var labeledpipe = require('labeledpipe');
  2. var through = require('through2');
  3. function returnError (name) {
  4. return through.obj(function (obj, enc, done) {
  5. done(new Error(name));
  6. });
  7. }
  8. var stream = labeledpipe()
  9. .pipe(labeledpipe()
  10. .pipe(returnError, 'A')
  11. .on('error', function (error) {
  12. console.log('Sub-pipeline Hander:', error.message);
  13. this.push('More data');
  14. })
  15. .pipe(returnError, 'B')
  16. )
  17. .on('error', function (error) {
  18. console.log('Pipeline Hander:', error.message);
  19. this.push('More data');
  20. })
  21. .pipe(returnError, 'C')
  22. ()
  23. ;
  24. stream.on('error', function (error) {
  25. console.log('Stream Hander:', error.message);
  26. });
  27. stream.write('my data');
  28. stream.end();

Output:

  1. Sub-pipeline Hander: A
  2. Pipeline Hander: B
  3. Stream Hander: C

Debugging

To assist users of labeledpipe with debugging the behavior of this module we use the debug utility package to print information to the console. To enable debug message printing, the environment variable DEBUG, which is the variable used by the debug package, must be set to a value configured by the package containing the debug messages to be printed.

To print debug messages on a unix system set the environment variable DEBUG with the name of this package prior to executing a tool that invokes this module:

  1. DEBUG=labeledpipe [CONSUMING TOOL]

On the Windows command line you may do:

  1. set DEBUG=labeledpipe
  2. [CONSUMING TOOL]

Node Support Policy

We only support Long-Term Support versions of Node.

We specifically limit our support to LTS versions of Node, not because this package won’t work on other versions, but because we have a limited amount of time, and supporting LTS offers the greatest return on that investment.

It’s possible this package will work correctly on newer versions of Node. It may even be possible to use this package on older versions of Node, though that’s more unlikely as we’ll make every effort to take advantage of features available in the oldest LTS version we support.

As each Node LTS version reaches its end-of-life we will remove that version from the node engines property of our package’s package.json file. Removing a Node version is considered a breaking change and will entail the publishing of a new major version of this package. We will not accept any requests to support an end-of-life version of Node. Any merge requests or issues supporting an end-of-life version of Node will be closed.

We will accept code that allows this package to run on newer, non-LTS, versions of Node. Furthermore, we will attempt to ensure our own changes work on the latest version of Node. To help in that commitment, our continuous integration setup runs against all LTS versions of Node in addition the most recent Node release; called current.

JavaScript package managers should allow you to install this package with any version of Node, with, at most, a warning if your version of Node does not fall within the range specified by our node engines property. If you encounter issues installing this package, please report the issue to your package manager.

Contributing

Please read our contributing guide to see how you may contribute to this project.

Copyright 2018 FactSet Research Systems Inc

Licensed under the Apache License, Version 2.0 (the “License”);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

  1. http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an “AS IS” BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.