Akka Persistence Plugin using reactivemongo driver.
This implementation use the reactivemongo drive.
This plugin support scala 2.13, akka 2.6.x and reactivemongo 1.0.x.
Add in your build.sbt the following lines:
resolvers += "null-vector" at "https://nullvector.jfrog.io/artifactory/releases"
libraryDependencies += "null-vector" %% "akka-reactivemongo-plugin" % "1.6.0"
To active the plugin an set the mongodb uri you have to add in your application.conf the following lines:
akka.persistence.journal.plugin = "akka-persistence-reactivemongo-journal"akka.persistence.snapshot-store.plugin = "akka-persistence-reactivemongo-snapshot"akka-persistence-reactivemongo {mongo-uri = "mongodb://host/database?rm.failover=900ms:21x1.30"}
See Connect to a database for more information.
akka.persistence.state.plugin = "akka-persistence-reactivemongo-crud"
Before save any event from your persistent actor it is needed to register the corresponding EventAdapter.
case class ProductId(id: String) extends AnyValcase class InvoiceItem(productId: ProductId, price: BigDecimal, tax: BigDecimal)case class InvoiceItemAdded(invoiceItem: InvoiceItem)val eventAdapter = EventAdapterFactory.adapt[InviceItemAdded](withManifest = "InvoiceItemAdded")ReactiveMongoEventSerializer(actorSystem).addEventAdapter(eventAdapter)
It is also possible to override mappings or add unsupported mappings. All added mappings must extends from BSONReader[_] or BSONWriter[_] or both.
implicit val reader = new BSONReader[Type1] {...}implicit val writer = new BSONWriter[Type2] {...}implicit val readerAndWriter = new BSONReader[Type3] with BSONWriter[Type3] {...}val eventAdapter = EventAdapterFactory.adapt[Type4](withManifest = "SomeEvent")
You can also add tags asociated to the Event:
val eventAdapter = EventAdapterFactory.adapt[Type4](withManifest = "SomeEvent", Set("Tag_1", "Tag_2"))
Traits famlily (sealed trait), aka: sum types, are mapped automatically:
sealed trait InvoiceLineTypecase object ProductLine extends InvoiceLineType...case class InvoiceLine(lineType: InvoiceLineType, ...)case class InvoiceLineAdded(line: InvoiceLine)...implicit val conf = MacroConfiguration(discriminator = "_type", typeNaming = TypeNaming.SimpleName)val eventAdapter = EventAdapterFactory.adapt[InvoceLineAdded](withManifest = "InvoiceLineAdded")
Behind the scene EventAdapterFactory use the ReactiveMongo Macros, so you can configure the BSON mappings:
implicit val conf: Aux[MacroOptions] = MacroConfiguration(discriminator = "_type", typeNaming = TypeNaming.SimpleName)
You can create mappings by hand:
implicit val a: BSONDocumentMapping[SolarPlanet] = EventAdapterFactory.mappingOf[SolarPlanet]val eventAdapter = new EventAdapterMapping[SolarPlanet]("planet")serializer.addEventAdapter(eventAdapter)
By default, the persistence id has the following form: <Aggregate>-<Id>, and the aggregate will be the name of the MongoDB collection.
You can change the persistence id separator character:
akka-persistence-reactivemongo {mongo-uri = "mongodb://localhost/test?rm.failover=900ms:21x1.30"persistence-id-separator = |}
Here are some examples of how to use persistence query:
val readJournal = ReactiveMongoJournalProvider(system).scaladslReadJournalval tagsSource: Source[EventEnvelope, NotUsed] = readJournal.currentEventsByTag("some_tag", NoOffset)tagsSource.runWith(Sink.foreach{ envelope => envelope.event match {case UserAdded(name, age) => // Do Something}})
Sometime is necessary to create an Offset:
val offset = ObjectIdOffset.fromDateTime(DateTime.now()) // A Joda DateTime
For streams that never complete like #persistenceIds, #eventsByTag, etc. it is possible to configure the interval that pulls from the journal:
akka-persistence-reactivemongo {mongo-uri = "mongodb://localhost/test?rm.failover=900ms:21x1.30"read-journal {refresh-interval = 2s}}
If you want different refresh intervals from different query, you can add a RefreshInterval Attribute in the Source definition:
readJournal.eventsByTag("some_tag", NoOffset).addAttributes(RefreshInterval(700.millis)).runWith(Sink.foreach(println))
val readJournal = ReactiveMongoJournalProvider(system).readJournalFor(Seq("Orders"))readJournal.currentEventsByTags(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> customerId), None).mapAsyc(envelope => someEventualWork(envelope)).run()
val readJournal = ReactiveMongoJournalProvider(system).readJournalFor(Seq("Orders"))readJournal.eventsByTags(Seq("TAG"), NoOffset, BSONDocument("events.p.customerId" -> customerId), None, 5.seconds).mapAsyc(envelope => someEventualWork(envelope) ).run()
Here is a great feature for TDD lovers: it is possible to configure the plugin to persist in memory and reduce the test latency more than half.
akka-persistence-reactivemongo {persist-in-memory = true}