ELK>> mel>> 返回
项目作者: GrottoPress

项目描述 :
A scalable asynchronous event-driven jobs engine
高级语言: Crystal
项目地址: git://github.com/GrottoPress/mel.git
创建时间: 2021-03-26T21:41:29Z
项目社区:https://github.com/GrottoPress/mel

开源协议:MIT License

下载


Mel

Mel is an asychronous event-driven jobs processing engine designed to scale. Mel simplifies jobs management by abstracting away the nuances of scheduling and running jobs.

In Mel, a scheduled job is called a task. A single job may be scheduled in multiple ways, yielding multiple tasks from the same job.

Mel schedules all tasks in the chosen storage backend as a set of task ids sorted by their times of next run. For recurring tasks, the next run is scheduled right after the current run completes.

This makes the storage backend the source of truth for schedules, allowing to easily scale out Mel to multiple instances (called workers), or replace or stop workers without losing schedules.

Mel supports bulk scheduling of jobs as a single atomic unit. There’s also support for sequential scheduling to track a series of jobs and perform some action after they are all complete.

Types of tasks

  1. Instant Tasks: These are tasks that run only once after they are scheduled, either immediately or at some specified time in the future.

  2. Periodic Tasks: These are tasks that run regularly at a specified interval. They may run forever, or till some specified time in the future.

  3. Cron Tasks: These are tasks that run according to a specified schedule in Unix Cron format. They may run forever, or till some specified time in the future.

Installation

  1. Add the dependency to your shard.yml:

    1. dependencies:
    2. mel:
    3. github: GrottoPress/mel
    4. #redis: # Uncomment if using the Redis backend
    5. # github: jgaskins/redis
  2. Run shards update

  3. Require and configure Mel in your app (we’ll configure workers later):

    1. # ->>> src/app/config.cr
    2. # ...
    3. require "mel"
    4. require "../jobs/**"
    5. Mel.configure do |settings|
    6. settings.error_handler = ->(error : Exception) { puts error.message }
    7. settings.timezone = Time::Location.load("Africa/Accra")
    8. end
    9. Log.setup(Mel.log.source, :info, Log::IOBackend.new)
    10. # Redis::Connection::LOG.level = :info # Uncomment if using the Redis backend
    11. # ...
    • Using the Redis backend

      1. # ->>> src/app/config.cr
      2. # ...
      3. require "mel/redis"
      4. Mel.configure do |settings|
      5. # ...
      6. settings.store = Mel::Redis.new(
      7. "redis://localhost:6379/0",
      8. namespace: "mel"
      9. )
      10. # ...
      11. end
      12. # ...
    • Using the Memory backend (Not for production use)

      1. # ->>> src/app/config.cr
      2. # ...
      3. require "mel"
      4. Mel.configure do |settings|
      5. # ...
      6. settings.store = Mel::Memory.new
      7. # ...
      8. end
      9. # ...
    • Skip storage

      You may disable storage altogether by setting Mel.settings.store to nil (This is the default).

Usage

  1. Define job:

    1. # ->>> src/jobs/do_some_work.cr
    2. struct DoSomeWork
    3. include Mel::Job # <= Required
    4. def initialize(@arg_1 : Int32, @arg_2 : String)
    5. end
    6. # <= Instance vars must be JSON-serializable
    7. # (Required)
    8. #
    9. # Main operation to be performed.
    10. # Called in a new fiber.
    11. def run
    12. # << Do work here >>
    13. end
    14. # Called in the main fiber, before spawning the fiber
    15. # that calls the `#run` method above.
    16. def before_run
    17. # ...
    18. end
    19. # Called in the same fiber that calls `#run`.
    20. # `success` is `true` only if the run succeeded.
    21. def after_run(success)
    22. if success
    23. # ...
    24. else
    25. # ...
    26. end
    27. end
    28. # Called in the main fiber before enqueueing the task in
    29. # the store.
    30. def before_enqueue
    31. # ...
    32. end
    33. # Called in the main fiber after enqueueing the task in
    34. # the store. `success` is `true` only if the enqueue succeeded.
    35. def after_enqueue(success)
    36. if success
    37. # ...
    38. else
    39. # ...
    40. end
    41. end
    42. end
  2. Schedule job:

    • Run job now:

      1. # ->>> src/app/some_file.cr
      2. DoSomeWork.run(arg_1: 5, arg_2: "value")
      3. # <= Alias: DoSomeWork.run_now(...)
    • Run job after given delay:

      1. # ->>> src/app/some_file.cr
      2. DoSomeWork.run_in(5.minutes, arg_1: 5, arg_2: "value")

      The given Time::Span can be negative. Eg: DoSomeWork.run_in(-5.minutes, ...). This may be useful for prioritizing certain tasks.

    • Run job at specific time:

      1. # ->>> src/app/some_file.cr
      2. DoSomeWork.run_at(10.minutes.from_now, arg_1: 5, arg_2: "value")

      The specified Time can be in the past. Eg: DoSomeWork.run_at(-10.minutes.from_now, ...). This may be useful for prioritizing certain tasks.

    • Run periodically:

      1. # ->>> src/app/some_file.cr
      2. DoSomeWork.run_every(10.minutes, for: 1.hour, arg_1: 5, arg_2: "value")

      This will do the first run 10 minutes from now. If you would like to do the first run some other time, specify that in a from: argument:

      1. # ->>> src/app/some_file.cr
      2. DoSomeWork.run_every(10.minutes, from: Time.local, for: 1.hour, arg_1: 5, arg_2: "value")

      Instead of for:, you may use till: and specify a Time. Leave those out to run forever.

    • Run on a Cron schedule:

      1. # ->>> src/app/some_file.cr
      2. DoSomeWork.run_on("0 */2 * * *", for: 6.hours, arg_1: 5, arg_2: "value")

      This will do the first run relative to now. For instance, if the time now is 03:00, the first run would be at 04:00, the next run at 06:00, and so on. If you would like to do the first run relative to some other time, specify that in a from: argument:

      1. # ->>> src/app/some_file.cr
      2. DoSomeWork.run_on("0 */2 * * *", from: 3.days.from_now, for: 6.hours, arg_1: 5, arg_2: "value")

      Instead of for:, you may use till: and specify a Time. Leave those out to run forever.

    The DoSomeWork.run_* methods accept the following additional arguments:

    • retries: Number of times to attempt a task after it fails, before giving up. This could be specified as a simple integer (eg: 3), or a list of backoffs (eg: {2, 4, 1}, or {2.seconds, 4.seconds, 1.second}). Default: {2, 4, 8, 16}. A task fails when any exception is raised during run.
  3. Start Mel:

    • As its own process (compiled separately):

      1. # ->>> src/worker.cr
      2. require "mel"
      3. require "./app/**"
      4. Mel.configure do |settings|
      5. settings.batch_size = -100
      6. settings.poll_interval = 3.seconds
      7. end
      8. Mel.start
      9. # <= Blocks forever, polls for due tasks and runs them.
      10. # <= You may stop Mel by sending `Signal::INT` or `Signal::TERM`.
      11. # <= Mel will wait for all running tasks to complete before exiting.
    • As part of your app (useful for testing):

      1. # ->>> spec/spec_helper.cr
      2. # ...
      3. require "mel/spec"
      4. Mel.configure do |settings|
      5. settings.batch_size = -1
      6. settings.poll_interval = 1.millisecond
      7. end
      8. Spec.before_each do
      9. Mel::RunPool.delete
      10. Mel.settings.store.try(&.truncate)
      11. end
      12. Spec.after_suite do
      13. Mel.stop
      14. Mel::RunPool.delete
      15. Mel.settings.store.try(&.truncate)
      16. end
      17. # <= `Mel.stop` waits for all running tasks to complete before exiting
      18. Mel.start_async
      19. # ...
  4. Configure compile targets:

    1. # ->>> shard.yml
    2. # ...
    3. targets:
    4. app:
    5. main: src/app.cr
    6. worker:
    7. main: src/worker.cr
    8. # ...

Job templates

A job’s .run_* methods allow scheduling that single job in multiple ways. However, there may be situations where you need to schedule a job the same way, every time.

Mel comes with Mel::Job::Now, Mel::Job::In, Mel::Job::At, Mel::Job::Every and Mel::Job::On templates to do exactly this:

  1. # Define job
  2. struct DoSomeWorkNow
  3. include Mel::Job::Now # <= Required
  4. def initialize(@arg_1 : Int32, @arg_2 : String)
  5. end
  6. # (Required)
  7. def run
  8. # << Do work here >>
  9. end
  10. end
  11. # Schedule job
  12. DoSomeWorkNow.run(arg_1: 5, arg_2: "value")
  13. # <= Alias: `DoSomeWorkNow.run_now(...)`
  1. # Define job
  2. struct DoSomeWorkIn
  3. include Mel::Job::In # <= Required
  4. def initialize(@arg_1 : Int32, @arg_2 : String)
  5. end
  6. # (Required)
  7. def run
  8. # << Do work here >>
  9. end
  10. end
  11. # Schedule job
  12. DoSomeWorkIn.run_in(10.minutes, arg_1: 5, arg_2: "value")
  1. # Define job
  2. struct DoSomeWorkAt
  3. include Mel::Job::At # <= Required
  4. def initialize(@arg_1 : Int32, @arg_2 : String)
  5. end
  6. # (Required)
  7. def run
  8. # << Do work here >>
  9. end
  10. end
  11. # Schedule job
  12. DoSomeWorkAt.run_at(Time.local(2021, 6, 9, 5), arg_1: 5, arg_2: "value")
  1. # Define job
  2. struct DoSomeWorkEvery
  3. include Mel::Job::Every # <= Required
  4. def initialize(@arg_1 : Int32, @arg_2 : String)
  5. end
  6. # (Required)
  7. def run
  8. # << Do work here >>
  9. end
  10. end
  11. # Schedule job
  12. DoSomeWorkEvery.run_every(2.hours, arg_1: 5, arg_2: "value")
  13. # <= Overload: `.run_every 2.hours, for: 5.hours`
  14. # <= Overload: `.run_every 2.hours, till: 9.hours.from_now`
  1. # Define job
  2. struct DoSomeWorkOn
  3. include Mel::Job::On # <= Required
  4. def initialize(@arg_1 : Int32, @arg_2 : String)
  5. end
  6. # (Required)
  7. def run
  8. # << Do work here >>
  9. end
  10. end
  11. # Schedule job
  12. DoSomeWorkOn.run_on("0 8 1 * *", arg_1: 5, arg_2: "value")
  13. # <= Overload: `.run_on "0 8 1 * *", for: 100.weeks`
  14. # <= Overload: `.run_on "0 8 1 * *", till: Time.local(2099, 12, 31)`

A template excludes all methods not relevant to that template. For instance, calling .run_every or .run_now for a Mel::Job::At template won’t compile.

All other methods and callbacks usable in a regular job may be used in a template, including before_* and after_* callbacks.

You may include more than one template in a single job. For instance, including Mel::Job::At and Mel::Job::Every in a job means you can call .run_at and .run_every methods for that job.

Additionally, Mel comes with two grouped templates: Mel::Job::Instant and Mel::Job::Recurring.

Mel::Job::Instant is equivalent to Mel::Job::Now, Mel::Job::In and Mel::Job::At combined. Mel::Job::Recurring is the equivalent of Mel::Job::Every and Mel::Job::On combined.

Mel::Job is itself a grouped template that combines all the other templates.

Specifying task IDs

You may specify an ID whenever you schedule a new job, thus: DoSomeWork.run_*(... id: "1001", ...). If not specified, Mel automatically generates a unique dynamic ID for the task.

Dynamic task IDs may be OK for triggered jobs (jobs triggered by some kind of user interaction), such as a job that sends an email notification whenever a user logs in.

However, there may be jobs that are scheduled unconditionally when your app starts (global jobs). For example, sending invoices at the beginning of every month. You should specify unique static IDs for such tasks.

Otherwise, every time the app (re)starts, jobs are scheduled again, each time with a different set of IDs. The store would accept the new schedules because the IDs are different, resulting in duplicated scheduling of the same jobs.

This is particularly important if you run multiple instances of your app. Hardcoding IDs for global jobs means that all instances hold the same IDs, so cannot reschedule a job that has already been scheduled by another instance.

A task ID may be a mixture of static and dynamic parts. For instance, you may include the current month and year for a global job that runs once a month, to ensure it is never scheduled twice within the same month.

Bulk scheduling

A common pattern is to break up long-running tasks into smaller tasks. For example:

  1. struct SendAllEmails
  2. include Mel::Job
  3. def initialize(@users : Array(User))
  4. end
  5. def run
  6. @users.each { |user| send_email(user) }
  7. end
  8. private def send_email(user)
  9. # Send email
  10. end
  11. end
  12. # Schedule job
  13. users = # ...
  14. SendAllEmails.run(users: users)

The above job would run in a single fiber, managed by whichever worker pulls this task at run time. This could mean too much work for a single worker if the number of users is sufficiently large.

Moreover, some mails may be sent multiple times if the task is retried as a result of failure. Ideally, jobs should be idempotent, and as atomic as possible.

The preferred approach is to define a job that sends email to one user, and schedule that job for as many users as needed:

  1. struct SendAllEmails
  2. include Mel::Job
  3. def initialize(@users : Array(User))
  4. end
  5. def run
  6. return if @users.empty?
  7. # Pushes all jobs atomically
  8. #
  9. Mel.transaction do |store|
  10. # Pass `store` to `.run_*`.
  11. @users.each { |user| SendEmail.run(store: store, user: user) }
  12. end
  13. end
  14. struct SendEmail
  15. include Mel::Job
  16. def initialize(@user : User)
  17. end
  18. def run
  19. send_email(@user)
  20. end
  21. private def send_email(user)
  22. # Send email
  23. end
  24. end
  25. end
  26. # Schedule job
  27. users = # ...
  28. SendAllEmails.run(users: users)
  29. # <= Any `.run_*` method could be called here, as with any job.

Sequential scheduling

Bulk scheduling works OK as a fire-and-forget mechanism. However, you may need to keep track of a series of jobs as a single unit, and perform some action only after the last job is done.

This is where sequential scheduling comes in handy. Mel‘s event-driven design allows chaining jobs, by scheduling the next after the current one completes:

  1. struct SendAllEmails
  2. include Mel::Job
  3. def initialize(@users : Array(User))
  4. end
  5. def run
  6. @users[0]?.try do |user|
  7. send_email(user) # <= Send first email
  8. end
  9. end
  10. def after_run(success)
  11. return unless success
  12. if @users[1]?
  13. self.class.run(users: @users[1..]) # <= Schedule next email
  14. else # <= All emails have been sent
  15. # Do something
  16. end
  17. end
  18. private def send_email(user)
  19. # Send email
  20. end
  21. end
  22. # Schedule job
  23. users = # ...
  24. SendAllEmails.run(users: users)

Although the example above involves a single job, sequential scheduling can be applied to multiple different jobs, each representing a step in a workflow, with each job scheduling the next job in its #after_run callback:

  1. struct SomeJob
  2. include Mel::Job
  3. def run
  4. # Do something
  5. end
  6. def after_run(success)
  7. SomeStep.run if success
  8. end
  9. struct SomeStep
  10. include Mel::Job
  11. def run
  12. # Do something
  13. end
  14. def after_run(success)
  15. SomeOtherStep.run if success
  16. end
  17. end
  18. struct SomeOtherStep
  19. include Mel::Job
  20. def run
  21. # Do something
  22. end
  23. def after_run(success)
  24. # All done; do something
  25. end
  26. end
  27. end

Tracking progress

Mel provides a progress tracker for jobs. This is particularly useful for tracking multiple jobs representing a series of steps in a workflow:

  1. # ->>> src/app/config.cr
  2. # ...
  3. Mel.configure do |settings|
  4. settings.progress_expiry = 1.day
  5. end
  6. # ...
  1. # ->>> src/jobs/some_job.cr
  2. struct SomeJob
  3. include Mel::Job
  4. def initialize
  5. @progress = Mel::Progress.start(id: "some_job", description: "Awesome job")
  6. end
  7. # ...
  8. def after_run(success)
  9. return @progress.fail unless success
  10. Mel.transaction do |store|
  11. SomeStep.run(store: store, progress: @progress)
  12. @progress.move(50, store) # <= Move to 50%
  13. end
  14. end
  15. struct SomeStep
  16. include Mel::Job::Now
  17. def initialize(@progress : Mel::Progress)
  18. end
  19. # ...
  20. def after_run(success)
  21. return @progress.fail unless success
  22. Mel.transaction do |store|
  23. SomeOtherStep.run(store: store, progress: @progress)
  24. @progress.move(80, store) # <= Move to 80%
  25. end
  26. end
  27. end
  28. struct SomeOtherStep
  29. include Mel::Job::Now
  30. def initialize(@progress : Mel::Progress)
  31. end
  32. # ...
  33. def after_run(success)
  34. return @progress.fail unless success
  35. @progress.succeed # <= Move to 100%
  36. end
  37. end
  38. end
  39. # Schedule job
  40. SomeJob.run
  41. # Track progress
  42. #
  43. # This may, for instance, be used in a route in a web application.
  44. # Client-side javascipt can query this route periodically, and
  45. # show response using a progress tracker UI.
  46. #
  47. report = Mel::Progress.track("some_job")
  48. report.try do |_report|
  49. _report.description
  50. _report.id
  51. _report.value
  52. _report.failure?
  53. _report.running?
  54. _report.success?
  55. _report.started?
  56. _report.ended?
  57. end

You may delete progress data in specs thus:

  1. # ->>> spec/spec_helper.cr
  2. # ...
  3. Spec.before_each do
  4. # ...
  5. Mel.settings.store.try(&.truncate_progress)
  6. # ...
  7. end
  8. Spec.after_suite do
  9. # ...
  10. Mel.settings.store.try(&.truncate_progress)
  11. # ...
  12. end
  13. # ...

Jobs security

A Mel worker waits for all running tasks to complete before exiting, if it received a Signal::INT or a Signal::TERM, or if you called Mel.stop somewhere in your code. This means jobs are never lost mid-flight.

Jobs are not lost even if there is a force shutdown of the worker process, since Mel does not delete a task from the store until it is complete. A running task is assumed to be orphaned if its timestamp in the queue has not been updated after 3 polls. Once a task is orphaned, any available worker can it pick up and run it.

Smart polling

Mel‘s batch_size setting allow setting a limit on the number of due tasks to retrieve and run each poll, and, consequently, the number of fibers spawned to handle those tasks.

If the setting is a positive integer N, Mel would pull and run N due tasks each poll.

If it is a negative integer -N (other than -1), the number of due tasks pulled and ran each poll would vary such that the total number of running tasks would not be greater than N.

-1 sets no limits. Mel would pull as many tasks as are due each poll, and run all of them.

Integrations

Carbon mailer

Link: https://github.com/luckyframework/carbon

  1. Require mel/carbon, after your emails:

    1. # ->>> src/app.cr
    2. # ...
    3. require "emails/base_email"
    4. require "emails/**"
    5. require "mel/carbon"
    6. # ...
  2. Set up base email:

    1. # ->>> src/emails/base_email.cr
    2. abstract class BaseEmail < Carbon::Email
    3. # ...
    4. include JSON::Serializable
    5. # ...
    6. end
  3. Configure deliver later strategy:

    1. # ->>> config/email.cr
    2. BaseEmail.configure do |settings|
    3. # ...
    4. settings.deliver_later_strategy = Mel::Carbon::DeliverLater.new
    5. # ...
    6. end

Development

Create a .env.sh file:

  1. #!/bin/bash
  2. export REDIS_URL='redis://localhost:6379/0'

Update the file with your own details. Then run tests with source .env.sh && crystal spec -Dpreview_mt.

Contributing

  1. Fork it
  2. Switch to the master branch: git checkout master
  3. Create your feature branch: git checkout -b my-new-feature
  4. Make your changes, updating changelog and documentation as appropriate.
  5. Commit your changes: git commit
  6. Push to the branch: git push origin my-new-feature
  7. Submit a new Pull Request against the GrottoPress:master branch.