项目作者: syucream

项目描述 :
Google Cloud Pub/Sub input plugin for Embulk.
高级语言: Scala
项目地址: git://github.com/syucream/embulk-input-pubsub.git
创建时间: 2020-02-22T16:57:14Z
项目社区:https://github.com/syucream/embulk-input-pubsub

开源协议:MIT License

下载


embulk-input-pubsub

Gem Version

Google Cloud Pub/Sub input plugin for Embulk.

Overview

  • Plugin type: input
  • Guess supported: no

Configuration

  • project_id: GCP project_id (string, required)
  • subscription_id: Pub/Sub subscription name (string, required)
  • json_keyfile: A path to GCP credential json file (string, required)
  • max_messages: A max number of messages on once pubsub call (integer, optional)
  • checkpoint_basedir: A path to checkpoint dir (string, optional)
  • checkpoint: A path to checkpoint file (string, optional)

Checkpoint

Goocle Cloud Pub/Sub removes stored messages by ack calls or expiration.
So embulk-input-pubsub ensures to recovery data-loss with checkpoints which’s a fashion used in Apache Flink / Apache Beam.
It 1) pulls messages from Pub/Sub, 2) preserves a checkpoint which contains the messages and 3) ack to pubsub.
If you got failures on Embulk tasks, you can embulk-resume with the checkpoints. And also you can do simply embulk-run with checkpoint.

If you want checkpointing, you need to set checkpoint_basedir to preserve checkpoint files on local filesystem. if none, it uses on-memory store.
If you want to recover state from checkpoint, you need to set checkpoint. It restores transaction states from given checkpoint instead of pulling message from pubsub.

The checkpoint is implemented as a Protocol Buffers message.

Example

  • pubsub -> stdout config example
  1. in:
  2. type: pubsub
  3. project_id: <your-project-id>
  4. subscription_id: <your-subscription-name>
  5. json_keyfile: /path/to/credential.json
  6. max_messages: 100
  7. checkpoint_basedir: /tmp/embulk-input-pubsub/
  8. out:
  9. type: stdout

You execute the example, then you’ll get the result:

  1. $ embulk run examples/pubsub2stdout.yaml
  2. 2020-05-06 00:44:05.093 +0900: Embulk v0.9.23
  3. 2020-05-06 00:44:06.540 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
  4. 2020-05-06 00:44:10.743 +0900 [INFO] (main): Gem's home and path are set by default: "/Users/ryo/.embulk/lib/gems"
  5. 2020-05-06 00:44:12.551 +0900 [INFO] (main): Started Embulk v0.9.23
  6. 2020-05-06 00:44:12.858 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-pubsub (0.0.1)
  7. 2020-05-06 00:44:18.332 +0900 [INFO] (0001:transaction): Created a new checkpoint! : /tmp/embulk-input-pubsub/checkpoint--1576110815
  8. 2020-05-06 00:44:18.336 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
  9. 2020-05-06 00:44:18.354 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
  10. aaa,{}
  11. 2020-05-06 00:44:18.428 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
  12. 2020-05-06 00:44:18.436 +0900 [INFO] (main): Committed.
  13. 2020-05-06 00:44:18.436 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}

Development

shell script $ ./gradlew gem

TODO

  • Change it to a FileInputPlugin to be applicable for parser plugins
  • Remote filesystem based checkpointing