项目作者: nadjieb

项目描述 :
Processing Background Jobs using Go Channels
高级语言: Go
项目地址: git://github.com/nadjieb/gawe.git
创建时间: 2021-01-26T04:29:59Z
项目社区:https://github.com/nadjieb/gawe

开源协议:Apache License 2.0

下载


Gawe

Build Status
Go Report Card
Maintainability
Codecov

Description

Gawe is a Go library for processing background jobs using Go channels as FIFO queue to control job execution and worker instantiation.

Installation

  1. go get -u github.com/nadjieb/gawe

Usage

  1. package main
  2. import (
  3. "context"
  4. "time"
  5. "github.com/nadjieb/gawe"
  6. )
  7. // RecordHistoryJob is a struct that comply to gawe.Job interface
  8. type RecordHistoryJob struct {
  9. ID string
  10. Data string
  11. }
  12. var _ gawe.Job = (*RecordHistoryJob)(nil)
  13. // JobID returns ID of the job (Usually used for logging)
  14. func (j *RecordHistoryJob) JobID() string {
  15. return j.ID
  16. }
  17. // JobType returns type of the job (Usually used for logging)
  18. func (j *RecordHistoryJob) JobType() string {
  19. return "record-history"
  20. }
  21. // Tags returns tags of the job (Usually used for logging)
  22. func (j *RecordHistoryJob) Tags() []string {
  23. return []string{"record", "history"}
  24. }
  25. // Exec execute the job
  26. func (j *RecordHistoryJob) Exec(ctx context.Context) error {
  27. var err error
  28. // record history
  29. return err
  30. }
  31. func main() {
  32. engine := gawe.NewEngine(
  33. gawe.WithMaxAttempts(3), // max attempts of job executions if failed
  34. gawe.WithMaxQueueSize(100), // max queue size for jobs
  35. gawe.WithMaxWorkers(4), // max workers run in the background
  36. gawe.WithInactivityTimeout(5*time.Second), // a worker will stop running since last defined inactivity timeout after last job execution
  37. )
  38. engine.Start()
  39. job := &RecordHistoryJob{ID: "123abc", Data: "record"}
  40. err := engine.Enqueue(context.Background(), job)
  41. if err != nil {
  42. // handle error
  43. }
  44. engine.Stop()
  45. }

Plugins

To create a plugin for the engine, create a struct that fulfill the Plugin interface then add it to gawe engine as an Option.

  1. // Logger is a struct that comply to gawe.Plugin interface
  2. type Logger struct{}
  3. var _ gawe.Plugin = (*Logger)(nil)
  4. // OnJobStart is called just before the job execution
  5. func (l *Logger) OnJobStart(ctx context.Context, job gawe.IdentifiableJob) context.Context {
  6. // return the (new) context to pass it to the next plugin/job
  7. return ctx
  8. }
  9. // OnJobEnd is called once the job has successfully executed
  10. func (l *Logger) OnJobEnd(ctx context.Context, job gawe.IdentifiableJob) {
  11. // do stuffs
  12. }
  13. // OnJobError is called if the job execution failed
  14. func (l *Logger) OnJobError(ctx context.Context, job gawe.IdentifiableJob, err error) context.Context {
  15. // return the (new) context to pass it to the next plugin/job
  16. return ctx
  17. }
  18. ...
  19. logger := &Logger{}
  20. engine := gawe.NewEngine(gawe.WithPlugins(logger))

License

Released under the Apache License 2.0