Typed, Functional Scala SQS Consumer
Typed, Functional Scala SQS Consumer and Composable Functional Effects
Do one thing and do it well micro birds library series
libraryDependencies += "us.oyanglul" %% "zhuyu" % ZhuyuVersion
You can quickly scaffold a zhuyu project by using jcouyang/zhuyu.g8
sbt new jcouyang/zhuyu.g8
Once you have the project, let us start form a simple example.
Before we dive into the example, there are some glossaries(types) you need to notice:
Envelop[A]
: model of message in SQSJob[A, Deps]
: Job description for A
, requires Deps
Worker[A, Deps]
: provided Deps
and Job[A,Deps]
exists, worker is able to work on the job.Say we have a long work flow, each step is safe to retry(idempotent) without notice user
Ideally we could put all tasks from the work flow into sqs to guarantee each of them will finish eventually.
Here is the work flow:
1 and 2 have to finish by sequence, but 3 and 4 can be done at the same time.
sealed trait Event
case class InitPayment(id: Int) extends Event
case class DebitPayment(id: Int, cardnum: String) extends Event
case class NotifyPaymentResult(result: Int) extends Event
case class PrepareOrder(id: Int) extends Event
Let us start implement the tasks, or shall we call them Job
s
It is good to have nice convension so the implementation will be much more predictable.
So we can simply prefix On i.e. OnInitPayment
import effects._
trait OnInitPayment {
implicit val onInitPayment =
new Job[InitPayment, HasSQS with HasDoobie] { // <- (1)
override def distribute(message: Envelop[InitPayment]) =
for {
cardnum <- Doobie(sql"select cardnum from credit_card where id = ${message.content.id}".query[String].unique)
_ <- message.spread[Event](DebitPayment(cardnum)) // <- (2)
} yield ()
}
}
Job
to handle InitPayment
event, which requires HasSQS
and HasDoobie
effectsspread
the Event
of DebitPayment(cardnum)
, the spreaded event will be distribute
d by Job[DebitPayment, ?]
Implicit not found: send Message from us.oyanglul.zhuyu.models.InitPayment to us.oyanglul.zhuyu.models.InitPayment will cause cycle loop, please check the order of us.oyanglul.zhuyu.models.InitPayment and us.oyanglul.zhuyu.models.InitPayment in us.oyanglul.zhuyu.models.Event
[error] _ <- spread[Event](DebitPayment(cardnum))
that
spread
is typelevel safe from cycle loop, which means
if you want to spread[Event](InitPayment)
in Job[InitPayment, HasSQS]
you have to define HasOrder
type class HasOrder[Event]
about order of InitPayment
and DebitPayment
.
object Event {
type EventOrder =
InitPayment :+:
DebitPayment :+:
NotifyPaymentResult :+:
PrepareOrder :+: CNil
implicit val orderedEvent: HasOrder.Aux[Event, EventOrder] =
new HasOrder[Event] {
type Order = EventOrder
}
}
spread
event in wrong order will cause compile error(that to shapeless so we can do counting at typelevel)
OnInitPayment
Once implemented the new Job, register it in pacakge.scala
so Worker
knows where to look for jobs.
package object jobs
extends OnInitPayment
with OnDebitPayment
with OnNotifyUser
with OnPrepareOrder
Now we have 4 jobs ready for pick up, where are our workers?
import jobs._
object impl extends HasSQS with HasHttp4s with HasS3 with HasDoobie {...}
Worker.work[Event, HasSQS with HasHttp4s with HasS3 with HasDoobie].run(impl)
everytime our Worker run
:
Event
from sqsJob
the Event
belongs toJob
by instruction from Job.distribute
methodWorker
is type level safe as well, for any Event
that the Worker
cannot find correct Job
, compile error will occur. Thus you never encounter runtime error for unprepared Job
, all Event
Worker
work on will definitly have Job
defined.
Job
s
import effects._
trait OnDebitPayment {
implicit val onDebitPayment =
new Job[DebitPayment, HasSQS with HasHttp4s] {
override def distribute(message: Envelop[DebitPayment]) =
for {
result <- Http4s(_.expect[Int](uri"http://payment-gateway.com/pay/${message.content.cardnum}")
_ <- message.spread[Event](NotifyPaymentResult(result))
_ <- message.spread[Event](PrepareOrder(message.content.id))
} yield ()
}
}
The previous example is 1 way message only, the requester put message in the queue never expect any response.
But there is some common use case of the request-response messaging pattern, where we can use AWS requester creates a temporary queue for receiving each response message1.
import effects._
trait OnDebitPayment {
implicit val onDebitPayment =
new Job[DebitPayment, HasSQS with HasHttp4s] {
override def distribute(message: Envelop[DebitPayment]) =
for {
result <- Http4s(_.expect[Int](uri"http://payment-gateway.com/pay/${message.content.cardnum}")
+ _ <- message.respond(result)
_ <- message.spread[Event](NotifyPaymentResult(result))
_ <- message.spread[Event](PrepareOrder(message.content.id))
} yield ()
}
}
There are few builtin effect implementations, you can also simply create your own effect.
Just may sure your effect has type Kleisli[IO, HasSomething, A]
libraryDependencies += "us.oyanglul" %% "zhuyu-effect-http4s" % ZhuyuVersion
effects.Http4s(_.status(GET(uri"https://blog.oyanglul.us")))
// Kleisli[IO, HasHttp4s, Status]
libraryDependencies += "us.oyanglul" %% "zhuyu-effect-s3" % ZhuyuVersion
effects.S3(_.putObject("example-bucket", "filename", "content"))
// Kleisli[IO, HasS3, PutObjectResult]
libraryDependencies += "us.oyanglul" %% "zhuyu-effect-doobie" % ZhuyuVersion
effects.Doobie(sql"select 250".query[Int].unique)
// Kleisli[IO, HasDoobie, Int]
All these effects can be composed
val program = for {
_ <- effects.Http4s(_.status(GET(uri"https://blog.oyanglul.us")))
_ <- effects.S3(_.putObject("example-bucket", "filename", "content"))
_ <- effects.Doobie(sql"select 250".query[Int].unique)
} yield()
// Kleisli[IO, HasHttp4s with HasS3 with HasDoobie, Unit]
to run effects simply provide impletations
object impl
with HasHttp4s
with HasS3
with HasDoobie {
//...implement what ever compiler complains
}
program.run(impl) // IO[Unit]
for more detail, look at example Main.scala
and jobs