项目作者: GeorgeSapkin

项目描述 :
Pub/Sub Store
高级语言: JavaScript
项目地址: git://github.com/GeorgeSapkin/pubsub-store.git
创建时间: 2017-04-30T21:39:45Z
项目社区:https://github.com/GeorgeSapkin/pubsub-store

开源协议:MIT License

下载


Pub/Sub Store pubsub-store

NPM version
Build status
Test coverage
Downloads

Pub/sub store and provider that use Mongoose-like schema to separate clients
from data stores using a consistent protocol. At the same time maintaining the
benefits of underlying pub/sub bus by allowing other listeners to subscribe to
CRUD events.

Multiple stores, possibly with different underlying databases, can service
requests as long as they expose the same protocol.

Providers can be used to create/update entities while others providers subscribe
to notifications.

Providers support duplex streaming of entities.

Integrates nicely with graphql-schema-builder. See
Examples for GraphQL client example.

NB: Currently assuming providers and underlying DB backends use the same query
language.

Table of contents

Requirements

Library requires > Node 8.x with native async/await support.

Installation

  1. yarn add pubsub-store

or

  1. npm install --save pubsub-store

API

Provider

Exposes the underlying store in a convenient format.

Implements
Duplex stream
to create and receive created entities. See Streaming for more
details.

Methods

constructor({ schema, transport, getSubjects, options: { batchSize, highWaterMark, noAckStream, timeout }}

  • schema

    A schema object. See Schema for details.

  • transport

    A connected transport instance. Must have request, subscribe and
    unsubscribe methods with following signatures:

    1. const transport = {
    2. request(subject, msg, options, cb) {
    3. // ...
    4. },
    5. subscribe(subject, cb) {
    6. // ...
    7. return subscriptionId;
    8. },
    9. unsubscribe(subscriptionId) {
    10. // ...
    11. }
    12. }
  • getSubjects

    Optional function that returns protocol subjects. Default implementation in
    subjects.js.

  • options optional

    • batchSize

      Maximum result batch size. If there are more query results than batchSize,
      results will be loaded in batches of that size.

    • highWaterMark

      When set, the stream will push messages in chunks of that size.

    • noAckStream

      When true, allows piping to provider without acknowledgement, i.e. fire
      and forget.

    • timeout

      Query timeout in milliseconds (default: 1000).

count(conditions)

Returns a number of entities matching conditions.

  • conditions

    Conditions to count entities based on.

countAll()

Returns a number of all entities in store (excluding those marked as deleted).

create(object, projection)

Creates an entity based on object and returns projected fields of the new
entity.

  • object

    Object with the fields to set.

  • projection

    Projection of the fields from created entity to be returned.

delete(conditions, projection)

Deletes entities based on conditions and returns projected fields of deleted
entities.

  • conditions

    Conditions to delete entities based on.

  • projection

    Projection of the fields from deleted entities to be returned.

deleteById(id, projection)

Deletes an entity based on id and returns projected fields of deleted entity.

  • id

    ID to delete an entity based on.

  • projection

    Projection of the fields from deleted entity to be returned.

find(conditions, projection, options)

Find entities based on conditions and returns projected fields of found
entities.

  • conditions

    Conditions to find entities based on.

  • projection

    Projection of the fields from found entities to be returned.

  • options optional

    Query options (e.g. limit).

findAll(projection, options)

Find all entities and returns projected fields of found entities.

  • projection

    Projection of the fields from found entities to be returned.

  • options optional

    Query options (e.g. limit).

findById(id, projections)

Find entities based on id and returns projected fields of found entity.

  • id

    ID to find an entity based on.

  • projection

    Projection of the fields from found entity to be returned.

updateById(id, object, projection)

Updates an entity based on id using object and returns projected fields of
the updated entity.

  • id

    ID to update an entity based on.

  • object

    Object that is used to update the matching entity.

  • projection

    Projection of the fields from updated entity to be returned.

Events

create

Emitted when an entity create event is received from the underlying message bus.

update

Emitted when an entity update event is received from the underlying message bus.

create and update event listeners have the following signature:

stream-error

Emitted from either Readable or Writable side of the Duplex stream instead
of an error. In case of Writable this prevents any upstreams from unpiping.

  1. function listener(err, query) { /* ... */ }

Streaming

Since Provider implements
Duplex stream
class, entities can be piped to and from a provider instance.

  1. const provider = new SomeProvider({ /* */ });
  2. // Entities received from the message bus will be piped to someWritableStream
  3. provider.pipe(someWritableStream);
  4. // Entities from someReadableStream will be piped to the message bus
  5. someReadableStream.pipe(provider);

See client-nats-streaming example for more
details.

Store

Exposes count, create, find and update methods over the pub/sub bus to be
consumed by providers.

Methods

constructor({ buildModel, schema, transport, getSubjects })

  • buildModel

    A function that builds a model based on a schema. A model must have count,
    create, find and update methods that accept protocol arguments.

    create must handle object being both a single object or an array.

    See server-nats-mongo example for more details.

    1. function buildModel(schema) {
    2. return {
    3. count(conditions) { /* */ },
    4. create(object, projection) { /* */ },
    5. find(conditions, projection, options) { /* */ },
    6. update(conditions, object, options) { /* */ }
    7. };
    8. }
  • schema

    A schema object. See Schema for details.

  • transport

    A connected transport instance. Must have subscribe and unsubscribe
    methods with following signatures:

    1. const transport = {
    2. subscribe(subject, cb) {
    3. // ...
    4. return subscriptionId;
    5. },
    6. unsubscribe(subscriptionId) {
    7. // ...
    8. }
    9. }
  • getSubjects

    Optional function that returns protocol subjects. Default implementation in
    subjects.js.

open()

Subscribes to all subjects, effectively starting the store.

close()

Unsubscribes from all subjects, effectively stopping the store.

Events

Events are emitted on corresponding request errors.

  • create-error

  • find-error

  • update-error

getSubjects

getSubjects(name, { prefixes, suffix })

Function that can be passed to both Provider and Store
constructors and returns protocol subjects based on schema name.

  • name

    Schema name.

  • prefixes

    Object with subject prefixes. Defaults to:

    1. const Prefixes = {
    2. count: 'count',
    3. create: 'create',
    4. find: 'find',
    5. update: 'update'
    6. };
  • suffix optional

    Subject suffix (default: '', empty string)

Protocol

Protocol is implemented by Provider and Store and is presented here for
reference.

NB: Currently assuming providers and underlying DB backends use the same query
language.

NB: Projections cannot have both included and excluded fields.

Result

  1. {
  2. result: resultObject // or an array, or a value
  3. }

Error

  1. {
  2. error: {
  3. message: "Error details"
  4. }
  5. }

Count Method

Count request is published to count.schema-name subject by default. Returns
the number of entities matching conditions.

  1. {
  2. conditions: {
  3. field1: 'value 2',
  4. // etc.
  5. }
  6. }

Create Method

Create request is published to create.schema-name subject by default. Returns
a newly-created entity or a list of entities with projection applied.

  1. {
  2. object: {
  3. field1: 'value 1',
  4. field2: 2
  5. // etc.
  6. },
  7. projection: {
  8. field1: 1
  9. field2: 1
  10. // etc.
  11. }
  12. }

Find Method

Find request is published to find.schema-name subject by default. Returns a
list of entities matching conditions with projection applied or an empty list.

  1. {
  2. conditions: {
  3. field1: 'value 2',
  4. // etc.
  5. },
  6. projection: {
  7. field1: 1
  8. field2: 1
  9. // etc.
  10. },
  11. options: {
  12. limit: 1
  13. // etc.
  14. }
  15. }

Update Method

Update request is published to update.schema-name subject by default. Returns
an updated entity with projection applied or an empty list.

  1. {
  2. conditions: {
  3. field1: 'value 2',
  4. // etc.
  5. },
  6. object: {
  7. $set: {
  8. field2: 3
  9. }
  10. // etc.
  11. },
  12. projection: {
  13. field1: 1
  14. field2: 1
  15. // etc.
  16. },
  17. options: {
  18. multi: true
  19. // etc.
  20. }
  21. }

Schema

fields can be either an object or a function accepting { Mixed, ObjectId }.
See Mongoose Guide for more details
about Schema definition.

Schema format is shared with graphql-schema-builder.

  1. const schemas = {
  2. Asset: {
  3. name: 'Asset',
  4. description: 'An asset.',
  5. fields: ({ Mixed, ObjectId }) => ({
  6. customer: {
  7. description: 'Customer that this asset belongs to.',
  8. type: ObjectId,
  9. ref: 'Customer',
  10. required: true
  11. },
  12. parent: {
  13. type: ObjectId,
  14. ref: 'Asset',
  15. required: false
  16. },
  17. name: {
  18. type: String,
  19. required: true
  20. }
  21. }),
  22. dynamicFields: ({ ObjectId }) => ({
  23. sensors: {
  24. type: [ObjectId],
  25. ref: 'Sensor'
  26. }
  27. })
  28. },
  29. Customer: {
  30. name: 'Customer',
  31. description: 'A customer.',
  32. fields: {
  33. name: {
  34. description: 'The name of the customer.',
  35. type: String,
  36. required: true
  37. },
  38. // Will result in subtype
  39. metadata: {
  40. created: {
  41. type: Date,
  42. required: true
  43. }
  44. }
  45. },
  46. dynamicFields: ({ Mixed, ObjectId }) => ({
  47. assets: {
  48. type: [ObjectId],
  49. ref: 'Asset'
  50. }
  51. })
  52. },
  53. Sensor: {
  54. name: 'Sensor',
  55. description: 'A sensor that must be connected to an asset.',
  56. fields: ({ Mixed, ObjectId }) => ({
  57. externalId: {
  58. type: String,
  59. required: false
  60. },
  61. asset: {
  62. description: 'An asset that this sensor is connected to.',
  63. type: ObjectId,
  64. ref: 'Asset',
  65. required: true
  66. },
  67. name: {
  68. type: String,
  69. required: false
  70. }
  71. })
  72. }
  73. };

Examples

See examples for NATS,
Mongo/Mongoose,
GraphQL and streaming examples.

TODO

  • Abstract pub/sub bus interface into transport adapters
  • In-code documentation
  • Implement bulk update
  • Implement deleting as opposed to marking as deleted
  • Implement aggregate

License

MIT