项目作者: isopropylcyanide

项目描述 :
A Flink job that reads a Json file (either one-time or continous poll) as its source and dumps it to couchbase as a sink using the asynchronous Couchbase SDK.
高级语言: Java
项目地址: git://github.com/isopropylcyanide/flink-couchbase-data-sink.git


Flink-couchbase-data-sink


  • A file containing the list of json documents with id is present that needs to be inserted into couchbase
  • A Flink job takes the file as its source and dumps it to a couchbase sink
  • Couchbase sink puts the incoming documents to the cluster specified in the config files
  • Flink job has the ability to poll the file for changes at a duration specified in the config

flinkimage

Prerequisites

  • Java 1.8
  • Zookeeper 3.6.0
  • Apache Flink 1.10.0 Download
  • Couchbase Server 6.5.1

Config properties

Edit the following properties to match your target instance

Property Value
couchbase.node Location of couchbase cluster. By default, localhost
couchbase.username Username of couchbase dashboard
couchbase.password Password of couchbase dashboard
startup.documents.path Path of the json document file. By default, it is present in src/main/resources
startup.documents.poll.continuous Flag to enable polling or not. By default set to false
startup.documents.poll.duration Duration in ms after which file will be polled for changes if enabled

Setting up the project

  1. # Start Zookeeper (required for Flink)
  2. $ ./zkServer start
  3. # Start Couchbase server instance
  4. $ sudo /etc/init.d/couchbase-server start
  5. # Create a default bucket. Change port accordingly
  6. $ View couchbase dashboard at http://127.0.0.1:8091. Enter your credentials and create a bucket called "data"
  7. # Start Flink cluster in the FLINK_BIN directory
  8. $ ./start-cluster.sh
  9. # Submit the job by packaging the jar and supplying its path. The config lies in src/main/resources
  10. $ flink.sh run -c com.aman.flink.job.FlinkDatabaseStartupJob <jar-location> --config <config-file-location>
  11. $ ./flink run -c <main-class> <jar> <config-properties>
  12. e.g ./flink run -c com.github.isopropylcyanide.flinkcouchbasesink.FlinkDatabaseStartupJob \
  13. flink-couchbase-data-starter/target/flink-couchbase-sink-1.0.jar \
  14. flink-couchbase-data-starter/src/main/resources/config.properties
  15. # Verify the documents were inserted properly
  16. $ View the dashboard at http://127.0.0.1:8091 and verify the documents in the bucket "data"
  17. # Stop the cluster once job is done
  18. $ ./stop-cluster.sh

Note: Replace .sh files with .bat files when working in a Windows environment.

Output

  • Flink jobs submitted with startup.documents.poll.continuous: true will run continuously.
  • Flink jobs submitted with startup.documents.poll.continuous: false will finish once run

image

  • Verify data in Couchbase
    image

  • Open-source platform for distributed stream and batch data processing.
  • Provides data distribution, communication, and fault tolerance for distributed computations over data streams.
  • Builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.

Couchbase

  • Open source, distributed, NoSQL document-oriented engagement database.
  • Exposes a fast key-value store with managed cache for sub-millisecond data operations
  • Specialized to provide low-latency data management for large-scale interactive web, mobile, and IoT applications