项目作者: just1689

项目描述 :
Unopinionated secure queue-backed pub sub over websockets
高级语言: Go
项目地址: git://github.com/just1689/entity-sync.git
创建时间: 2019-06-02T15:13:15Z
项目社区:https://github.com/just1689/entity-sync

开源协议:MIT License

下载


Entity Sync

codebeat badge version


Want to push entities on change to websocket clients while scaling horizontally? The goal of this library is to make that easy. Wire Entity Sync into your application and you can keep clients up to date. Each client can subscribe to particular entities. You can change something on one server and all the websocket clients subscribed in the cluster will be pushed the updated entity.

Features

  • Stateless server. Servers do not need to know about each other or which clients are connected to other servers. This allows the server to scale without synchronizing them.
  • When you change something on the server side, provide the EntityKey to the bridge and all clients will be pushed the entity.
  • Multiple subscriptions. Each client can subscribe to multiple entities and multiples keys in each entity.
  • Multiple responses. You can send back several rows. This is great if updating the client means sending them rows from tables in foreign keys etc.
  • Database / repository agnostic. This library can take a function that you implement to use whichever database, driver, client or interface you choose to implement.
  • A helper package for a simple one call setup (see es/entitysync.go)
  • Queue agnostic. Comes with working NSQ integration but you can choose to provide anything you can wrap in shared.EntityHandler and shared.EntityByteHandler.
  • Client secrets. The client can send a secret and when processed you can choose to use the secret to determine identity and authorization for example.
  • Function for incoming websocket requests that don’t match any concern for this library to pass through.

Roadmap

  • Enable custom websocket listen path.
  • Consider improving the security model.
  • Consider a different fetch model (fetch per user vs fetch per server).
  • Queue prefix - provide a prepend string to ensure es topics are kept apart from others.

Example

Server setup

Connect the server to EntitySync. Wire the your mux to the bridge and provide a method that can resolve an EntityKey.

  1. // Provide a configuration
  2. config := es.Config{
  3. Mux: mux.NewRouter(),
  4. NSQAddr: *nsqAddr,
  5. }
  6. //Setup entitySync with that configuration
  7. entitySync := es.Setup(config)
  8. //Register an entity and tell the library how to fetch and what to write to the client
  9. entitySync.RegisterEntityAndDBHandler(entityType, func(entityKey shared.EntityKey, secret string, handler shared.ByteHandler) {
  10. item := fetch(entityKey)
  11. b, _ := json.Marshal(item)
  12. handler(b)
  13. })
  14. //Start a listener and provide the mux for routes / handling
  15. l, _ = net.Listen("tcp", *listenLocal)
  16. http.Serve(l, config.Mux)

Connect clients

Connect any number of clients:

  1. Connect to the server over websocket ws://host:port/ws/entity-sync/
  2. Send a subscription request
  1. {
  2. "action": "subscribe",
  3. "body": {
  4. "id": "100",
  5. "entity": "items"
  6. }
  7. }

Mutate entity & notify

Make some change to the item in question where it is persisted and then call
bridge.NotifyAllOfChange(entityKey) where entityKey is a shared.EntityKey.

All connected clients over websockets will receive messages for the EntityKey/s to which they are subscribed.

Sending a secret from the client

  1. {
  2. "action": "secret",
  3. "body": "my-super-secret-secret-123"
  4. }

Other examples

Sending multiple rows to the client

  1. entitySync.RegisterEntityAndDBHandler("report", func(entityKey shared.EntityKey, secret string, handler shared.ByteHandler) {
  2. item := controller.GetReportByID(entityKey.ID)
  3. b, _ := json.Marshal(item)
  4. handler(b)
  5. user := controller.GetUserByID(item.CreatedBy)
  6. b, _ = json.Marshal(user)
  7. handler(b)
  8. department := controller.GetDeparmentByID(item.departmentID)
  9. b, _ = json.Marshal(department)
  10. handler(b)
  11. })

Checking if a user is allowed to receive the push notification

  1. entitySync.RegisterEntityAndDBHandler("report", func(entityKey shared.EntityKey, secret string, handler shared.ByteHandler) {
  2. session, err := contoller.GetSessionBySecret(secret)
  3. if err != nil {
  4. logrus.Errorln(err)
  5. return
  6. }
  7. item := controller.GetReportByID(entityKey.ID)
  8. b, _ := json.Marshal(item)
  9. handler(b)
  10. })

Provide your own queue

EntitySync is built using NSQ. In theory you can use whatever you like. You will need to provide two functions to the library. The one will allow it to produce a publisher and return a method that will be called to publish. The other is a subscriber and is provided a method for sending.

  1. var queueAddr = "localhost:4000"
  2. func setup() {
  3. mux := http.NewServeMux()
  4. databaseHub := esdb.NewDatabaseHub()
  5. // The bridge matches communication from ws to nsq and from nsq to ws.
  6. // It also calls on the db to resolve entityKey
  7. bridge := esbridge.BuildBridge(
  8. BuildPublisher(queueAddr),
  9. BuildSubscriber(queueAddr),
  10. databaseHub.PullDataAndPush,
  11. )
  12. //Pass the mux and a client builder to the libraries handlers
  13. esweb.SetupMuxBridge(mux, bridge.ClientBuilder)
  14. ...
  15. }
  16. var BuildPublisher shared.AddressableEntityHandler = func(addr string) shared.EntityHandler {
  17. return func(entityType shared.EntityType) shared.ByteHandler {
  18. return func(b []byte) {
  19. //TODO: setup the publisher client
  20. ...
  21. qPublisher.publish(entityType.GetQueueName(), b)
  22. }
  23. }
  24. }
  25. var BuildSubscriber shared.AddressableEntityByteHandler = func(addr string) shared.EntityByteHandler {
  26. return func(entityType shared.EntityType, callback shared.ByteHandler) {
  27. func subscribeNSQ(qAddr string, entityType shared.EntityType, f shared.ByteHandler) {
  28. }(qArr, entityType, f func(b []byte {
  29. //TODO: connect to the nats client
  30. ...
  31. natsHandler(in []byte) {
  32. f(in)
  33. }
  34. }))
  35. }
  36. }

Pass through ws

You can provide a method that will allow for pass-through handling of websocket messages.

  1. // Provide a configuration
  2. config := es.Config{
  3. Mux: mux.NewRouter(),
  4. NSQAddr: *nsqAddr,
  5. WSPassThrough: func(secret string, b []byte) {
  6. //TODO: handle incoming websocket message
  7. }
  8. }
  9. //Setup entitySync with that configuration
  10. entitySync := es.Setup(config)
  11. //Register an entity and tell the library how to fetch and what to write to the client
  12. entitySync.RegisterEntityAndDBHandler(entityType, func(entityKey shared.EntityKey, secret string, handler shared.ByteHandler) {
  13. item := fetch(entityKey)
  14. b, _ := json.Marshal(item)
  15. handler(b)
  16. })
  17. //Start a listener and provide the mux for routes / handling
  18. l, _ = net.Listen("tcp", *listenLocal)
  19. http.Serve(l, config.Mux)