项目作者: null-vector

项目描述 :
Akka Persistence Plugin using reactivemongo driver.
高级语言: Scala
项目地址: git://github.com/null-vector/akka-reactivemongo-plugin.git
创建时间: 2018-08-06T12:57:26Z
项目社区:https://github.com/null-vector/akka-reactivemongo-plugin

开源协议:

下载


Akka Persistence Plugin for MongoDB

CircleCI
codecov

This implementation use the reactivemongo drive.

Installation

This plugin support scala 2.13, akka 2.6.x and reactivemongo 1.0.x.

Add in your build.sbt the following lines:

  1. resolvers += "null-vector" at "https://nullvector.jfrog.io/artifactory/releases"
  1. libraryDependencies += "null-vector" %% "akka-reactivemongo-plugin" % "1.6.0"

Configuration

To active the plugin an set the mongodb uri you have to add in your application.conf the following lines:

  1. akka.persistence.journal.plugin = "akka-persistence-reactivemongo-journal"
  2. akka.persistence.snapshot-store.plugin = "akka-persistence-reactivemongo-snapshot"
  3. akka-persistence-reactivemongo {
  4. mongo-uri = "mongodb://host/database?rm.failover=900ms:21x1.30"
  5. }

See Connect to a database for more information.

Configuration for DurableStateStore

  1. akka.persistence.state.plugin = "akka-persistence-reactivemongo-crud"

Events Adapters

Before save any event from your persistent actor it is needed to register the corresponding EventAdapter.

  1. case class ProductId(id: String) extends AnyVal
  2. case class InvoiceItem(productId: ProductId, price: BigDecimal, tax: BigDecimal)
  3. case class InvoiceItemAdded(invoiceItem: InvoiceItem)
  4. val eventAdapter = EventAdapterFactory.adapt[InviceItemAdded](withManifest = "InvoiceItemAdded")
  5. ReactiveMongoEventSerializer(actorSystem).addEventAdapter(eventAdapter)

It is also possible to override mappings or add unsupported mappings. All added mappings must extends from BSONReader[_] or BSONWriter[_] or both.

  1. implicit val reader = new BSONReader[Type1] {...}
  2. implicit val writer = new BSONWriter[Type2] {...}
  3. implicit val readerAndWriter = new BSONReader[Type3] with BSONWriter[Type3] {...}
  4. val eventAdapter = EventAdapterFactory.adapt[Type4](withManifest = "SomeEvent")

You can also add tags asociated to the Event:

  1. val eventAdapter = EventAdapterFactory.adapt[Type4](withManifest = "SomeEvent", Set("Tag_1", "Tag_2"))

Traits famlily (sealed trait), aka: sum types, are mapped automatically:

  1. sealed trait InvoiceLineType
  2. case object ProductLine extends InvoiceLineType
  3. ...
  4. case class InvoiceLine(lineType: InvoiceLineType, ...)
  5. case class InvoiceLineAdded(line: InvoiceLine)
  6. ...
  7. implicit val conf = MacroConfiguration(discriminator = "_type", typeNaming = TypeNaming.SimpleName)
  8. val eventAdapter = EventAdapterFactory.adapt[InvoceLineAdded](withManifest = "InvoiceLineAdded")

Behind the scene EventAdapterFactory use the ReactiveMongo Macros, so you can configure the BSON mappings:

  1. implicit val conf: Aux[MacroOptions] = MacroConfiguration(discriminator = "_type", typeNaming = TypeNaming.SimpleName)

Custom mappings

You can create mappings by hand:

  1. implicit val a: BSONDocumentMapping[SolarPlanet] = EventAdapterFactory.mappingOf[SolarPlanet]
  2. val eventAdapter = new EventAdapterMapping[SolarPlanet]("planet")
  3. serializer.addEventAdapter(eventAdapter)

Persistence Id

By default, the persistence id has the following form: <Aggregate>-<Id>, and the aggregate will be the name of the MongoDB collection.

You can change the persistence id separator character:

  1. akka-persistence-reactivemongo {
  2. mongo-uri = "mongodb://localhost/test?rm.failover=900ms:21x1.30"
  3. persistence-id-separator = |
  4. }

Persistence Query

Here are some examples of how to use persistence query:

  1. val readJournal = ReactiveMongoJournalProvider(system).scaladslReadJournal
  2. val tagsSource: Source[EventEnvelope, NotUsed] = readJournal.currentEventsByTag("some_tag", NoOffset)
  3. tagsSource.runWith(Sink.foreach{ envelope => envelope.event match {
  4. case UserAdded(name, age) => // Do Something
  5. }})

Sometime is necessary to create an Offset:

  1. val offset = ObjectIdOffset.fromDateTime(DateTime.now()) // A Joda DateTime

For streams that never complete like #persistenceIds, #eventsByTag, etc. it is possible to configure the interval that pulls from the journal:

  1. akka-persistence-reactivemongo {
  2. mongo-uri = "mongodb://localhost/test?rm.failover=900ms:21x1.30"
  3. read-journal {
  4. refresh-interval = 2s
  5. }
  6. }

If you want different refresh intervals from different query, you can add a RefreshInterval Attribute in the Source definition:

  1. readJournal
  2. .eventsByTag("some_tag", NoOffset)
  3. .addAttributes(RefreshInterval(700.millis))
  4. .runWith(Sink.foreach(println))

Filter Events by some Event’s Attribute

From regular stream

  1. val readJournal = ReactiveMongoJournalProvider(system).readJournalFor(Seq("Orders"))
  2. readJournal
  3. .currentEventsByTags(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> customerId), None)
  4. .mapAsyc(envelope => someEventualWork(envelope))
  5. .run()

From non-termination stream

  1. val readJournal = ReactiveMongoJournalProvider(system).readJournalFor(Seq("Orders"))
  2. readJournal
  3. .eventsByTags(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> customerId), None, 5.seconds)
  4. .mapAsyc(envelope => someEventualWork(envelope) )
  5. .run()

Test Driven Development

Here is a great feature for TDD lovers: it is possible to configure the plugin to persist in memory and reduce the test latency more than half.

  1. akka-persistence-reactivemongo {
  2. persist-in-memory = true
  3. }