项目作者: gjbae1212

项目描述 :
Async Worker that bulk insert, update, delete to ElasticSearch for Golang. 🚀
高级语言: Go
项目地址: git://github.com/gjbae1212/go-esworker.git
创建时间: 2019-07-30T08:55:51Z
项目社区:https://github.com/gjbae1212/go-esworker

开源协议:MIT License

下载


go-esworker

go-esworker is an async worker that documents can bulk insert, update, delete to the elasticsearch using Golang.
It is support to an infrastructure on AWS, GCP, Elastic Cloud, and so on.






license

Installation

  1. go get -u github.com/gjbae1212/go-esworker

Usage

  1. import (
  2. "context"
  3. "log"
  4. "github.com/gjbae1212/go-esworker"
  5. )
  6. func main() {
  7. // Create dispatcher
  8. dispatcher, err := esworker.NewDispatcher(
  9. esworker.WithESVersionOption(esworker.V6),
  10. esworker.WithAddressesOption([]string{"http://localhost:9200"}),
  11. esworker.WithUsernameOption("user"),
  12. esworker.WithPasswordOption("password"),
  13. esworker.WithErrorHandler(func(err error) {
  14. log.Println(err)
  15. }),
  16. )
  17. if err != nil {
  18. log.Panic(err)
  19. }
  20. // Start dispatcher
  21. if err := dispatcher.Start(); err != nil {
  22. log.Panic(err)
  23. }
  24. // Process operations in bulk.
  25. ctx := context.Background()
  26. // create doc
  27. dispatcher.AddAction(ctx, &esworker.StandardAction{
  28. op: esworker.ES_CREATE,
  29. index: "allan",
  30. id: "1",
  31. doc: map[string]interface{}{"field1": 10},
  32. })
  33. // update doc
  34. dispatcher.AddAction(ctx, &esworker.StandardAction{
  35. op: esworker.ES_UPDATE,
  36. index: "allan",
  37. id: "1",
  38. doc: map[string]interface{}{"field1": 20},
  39. })
  40. // delete doc
  41. dispatcher.AddAction(ctx, &esworker.StandardAction{
  42. op: esworker.ES_DELETE,
  43. index: "allan",
  44. id: "1",
  45. })
  46. }

Dispatcher Parameters

It should pass parameters for dependency injection when you are creating a go-esworker dispatcher.
A list to support the parameters below.

method name description value state
WithESVersionOption ElasticSearch Version esworker.V5, esworker.V6, esworker.V7 default V6
WithAddressesOption ElasticSearch Address default http://localhost:9200
WithUsernameOption ElasticSearch Username for HTTP basic authentication optional
WithPasswordOption ElasticSearch Password for HTTP basic authentication optional
WithCloudIdOption ID for Elastic Cloud optional
WithApiKeyOption Base64-Encoded value for authorization(api-key) optional(if set, overrides username and password)
WithTransportOption Http transport default http default transport
WithLoggerOption Logger optional
WithGlobalQueueSizeOption Global queue max size default 5000
WithWorkerSizeOption Worker size default 5
WithWorkerQueueSizeOption Worker max queue size default 5
WithWorkerWaitInterval Deal with data in worker queue after every interval time default 2 * time.Second
WithErrorHandler A function that deals with an error when an error is raised optional

Action Interface

To deal with operation as insert and update and delete to, you would use to the StandardAction struct or a struct which is implementing esworker.Action interface.

  1. // generate and start dispatcher
  2. dispatcher, _ := esworker.NewDispatcher()
  3. dispatcher.Start()
  4. // Ex) Standard Action Example
  5. act := &esworker.StandardAction{
  6. Op: ES_CREATE
  7. Index: "sample",
  8. DocType: "_doc",
  9. Id: "test-id",
  10. Doc: map[string]interface{}{"field": 1},
  11. }
  12. dispatcher.AddAction(context.Background(), act)
  13. // Ex) Custom Action Example
  14. sampleAction struct {}
  15. func (act *sampleAction) GetOperation() esworker.ESOperation {
  16. // return esworker.ES_CREATE
  17. // return esworker.ES_INDEX
  18. // return esworker.ES_UPDATE
  19. // return esworker.ES_DELETE
  20. }
  21. func (act *sampleAction) GetIndex() string {
  22. // return "your index name"
  23. }
  24. func (act *sampleAction) GetDocType() string {
  25. //return ""
  26. //return "doc type"
  27. }
  28. func (act *sampleAction) GetID() string {
  29. //return ""
  30. //return "doc id"
  31. }
  32. func (act *sampleAction) GetDoc() map[string]interface{} {
  33. //return map[string]interface{}{}
  34. }
  35. dispatcher.AddAction(context.Background(), &sampleAction{})

If you will make to a custom struct which is implementing esworker.Action interface, it must implement 5 methods.

name description
GetOperation ES_CREATE, ES_INDEX, ES_UPDATE, ES_DELETE
GetIndex index name
GetDocType doc type (if it is returned an empty string, default _doc or doc)
GetID doc id (if an operation is ES_INDEX, possible empty string)
GetDoc doc data

Elastic Cloud

If you use to infrastructure on Elastic Cloud, you could access to ElasticSearch without endpoint and basic authentication.
(How to use API-KEY)

  1. dispatcher, err := esworker.NewDispatcher(
  2. esworker.WithESVersionOption(esworker.V7),
  3. esworker.WithCloudIdOption("your-cloud-id"),
  4. esworker.WithApiKeyOption("api-key"),
  5. )
  6. dispatcher.Start()

LICENSE

This project is following The MIT.