项目作者: RADAR-base

项目描述 :
Kafka MongoDb sink connector
高级语言: Java
项目地址: git://github.com/RADAR-base/MongoDb-Sink-Connector.git
创建时间: 2016-11-28T09:57:40Z
项目社区:https://github.com/RADAR-base/MongoDb-Sink-Connector

开源协议:Apache License 2.0

下载


Kafka MongoDb Sink Connector

Build Status
Codacy Badge

The MongoDB-Sink-Connector is a Kafka-Connector for scalable and reliable data streaming from a Kafka topic or number of Kafka topics to a MongoDB collection or number of MongoDB collections.
It consumes Avro data from Kafka topics, converts them into Documents and inserts them into MongoDB collections.

Currently, it supports records that have an Avro schema.

Installation

This connector can be used inside a Docker stack or installed as a general Kafka Connect plugin.

Docker installation

Use the radarbase/kafka-connect-mongodb-sink Docker image to connect it inside a Docker infrastructure. For example, RADAR-Docker uses a Docker Compose file. The Kafka Connect Docker image requires environment to be set up. In RADAR-Docker, the following environment variables are set:

  1. environment:
  2. CONNECT_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-1:9092,PLAINTEXT://kafka-2:9092,PLAINTEXT://kafka-3:9092
  3. CONNECT_REST_PORT: 8083
  4. CONNECT_GROUP_ID: "mongodb-sink"
  5. CONNECT_CONFIG_STORAGE_TOPIC: "mongodb-sink.config"
  6. CONNECT_OFFSET_STORAGE_TOPIC: "mongodb-sink.offsets"
  7. CONNECT_STATUS_STORAGE_TOPIC: "mongodb-sink.status"
  8. CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
  9. CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
  10. CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry-1:8081"
  11. CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry-1:8081"
  12. CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  13. CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  14. CONNECT_OFFSET_STORAGE_FILE_FILENAME: "/tmp/mongdb-sink.offset"
  15. CONNECT_REST_ADVERTISED_HOST_NAME: "radar-mongodb-connector"
  16. CONNECT_ZOOKEEPER_CONNECT: zookeeper-1:2181
  17. CONNECT_LOG4J_LOGGERS: "org.reflections=ERROR"
  18. KAFKA_BROKERS: 3

Before starting the streams, the Docker image waits for KAFKA_BROKERS number of brokers to be available as well as the schema registry.

System installation

This connector requires the following setup:

  • Confluent platform installed. This connector uses the Confluent 4.0.0 platform.
  • MongoDB instance installed and running with access credentials.

To install the connector, follow the next steps:

  • Build MongoDB-Sink-Connector from source or you can download the latest release from here.

    1. git clone https://github.com/RADAR-base/MongoDb-Sink-Connector.git
    2. cd MongoDb-Sink-Connector
    3. ./gradlew clean build
  • Follow Confluent platform Quick start to start zookeeper, kafka-server, schema-registry and kafka-rest to easily stream data to a Kafka topic

    1. # Start zookeeper
    2. zookeeper-server-start /etc/kafka/zookeeper.properties
    3. # Start kafka-broker
    4. kafka-server-start /etc/kafka/server.properties
    5. # Start schema-registry
    6. schema-registry-start /etc/schema-registry/schema-registry.properties
    7. # Start kafka-rest
    8. kafka-rest-start /etc/kafka-rest/kafka-rest.properties
  • Install and start MongoDB

  • Add build/libs/* and build/third-party/* to a new directory in the Connect plugin path

    1. mkdir /usr/local/share/kafka-connect/plugins/kafka-connect-mongodb-sink
    2. cp build/libs/* build/third-party/* /usr/local/share/kafka-connect/plugins/kafka-connect-mongodb-sink

Usage

Modify sink.properties file according your environment. The following properties are supported:
































NameDescriptionTypeDefaultValid ValuesImportance
topicsList of topics to be streamed.listhigh
record.converter.classRecordConverterFactory that returns classes to convert Kafka SinkRecords to BSON documents.classorg.radarcns.connect.mongodb.serialization.RecordConverterFactoryorg.radarcns.connect.util.ValidClass@48cf768cmedium
batch.flush.msFlush a batch after this amount of milliseconds.int15000[0,…]low
batch.sizeBatch size to initiate a MongoDB write operation. If the buffer does not reach this capacity within batch.flush.ms, it will be written anyway.int2500[1,…]low
buffer.capacityMaximum number of items in a MongoDB writer buffer. Once the buffer becomes full, the task fails.int20000[1,…]low
mongo.hostMongoDB host name to write data tostringorg.radarcns.connect.util.NotEmptyString@59f95c5dhigh
mongo.portMongoDB portint27017[1,…]low
mongo.databaseMongoDB database namestringorg.radarcns.connect.util.NotEmptyString@5ccd43c2high
mongo.usernameUsername to connect to MongoDB database. If not set, no credentials are used.stringnullmedium
mongo.passwordPassword to connect to MongoDB database. If not set, no credentials are used.stringnullmedium
mongo.collection.formatA format string for the destination collection name, which may contain ${topic} as a placeholder for the originating topic name.
For example, kafka_${topic} for the topic orders will map to the collection name kafka_orders.
string{$topic}org.radarcns.connect.util.NotEmptyString@4aa8f0b4medium
  • A sample configuration may look as below.

    1. # Kafka consumer configuration
    2. name=kafka-connector-mongodb-sink
    3. # Kafka connector configuration
    4. connector.class=org.radarcns.connect.mongodb.MongoDbSinkConnector
    5. tasks.max=1
    6. # Topics that will be consumed
    7. topics=avrotest
    8. # MongoDB server
    9. mongo.host=localhost
    10. mongo.port=27017
    11. # MongoDB configuration
    12. mongo.username=mongodb-username
    13. mongo.password=***
    14. mongo.database=mongodb-database
    15. # Collection name for putting data into the MongoDB database. The {$topic} token will be replaced
    16. # by the Kafka topic name.
    17. #mongo.collection.format={$topic}
    18. # Factory class to do the actual record conversion
    19. record.converter.class=org.radarcns.connect.mongodb.serialization.RecordConverterFactory
  • Run the MongoDB-Sink-Connector in

    • standalone mode

      1. connect-standalone /etc/schema-registry/connect-avro-standalone.properties ./sink.properties
    • distributed mode

      1. connect-distributed /patht/cluster.properties ./sink.properties
  • Stream sample data to configured topics in sink.properties. You may use, rest-proxy to do this easily.

    ```shell
    curl -X POST -H “Content-Type: application/vnd.kafka.avro.v2+json” \

    1. -H "Accept: application/vnd.kafka.v2+json" \
    2. --data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}", "records": [{"value": {"name": "testUser"}}]}' \
    3. "http://localhost:8082/topics/avrotest"
  1. # You should get the following response:
  2. {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}
  3. ```
  • If your all of your environment properties are configured properly, when you run the mongodb-sink-connector, you may see output as below.

    1. SLF4J: Class path contains multiple SLF4J bindings.
    2. SLF4J: Found binding in [jar:file:/usr/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    3. SLF4J: Found binding in [jar:file:/usr/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    4. SLF4J: Found binding in [jar:file:/usr/share/java/kafka-connect-hdfs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    5. SLF4J: Found binding in [jar:file:/usr/share/java/kafka/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    6. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    7. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    8. [2017-04-14 15:23:35,626] INFO StandaloneConfig values:
    9. cluster = connect
    10. rest.advertised.host.name = null
    11. task.shutdown.graceful.timeout.ms = 5000
    12. rest.host.name = null
    13. rest.advertised.port = null
    14. bootstrap.servers = [localhost:9092]
    15. offset.flush.timeout.ms = 5000
    16. offset.flush.interval.ms = 60000
    17. rest.port = 8083
    18. internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
    19. access.control.allow.methods =
    20. access.control.allow.origin =
    21. offset.storage.file.filename = /tmp/connect.offsets
    22. internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
    23. value.converter = class io.confluent.connect.avro.AvroConverter
    24. key.converter = class io.confluent.connect.avro.AvroConverter
    25. (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:178)
    26. [2017-04-14 15:23:35,767] INFO Logging initialized @509ms (org.eclipse.jetty.util.log:186)
    27. [2017-04-14 15:23:36,094] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:52)
    28. [2017-04-14 15:23:36,094] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:71)
    29. [2017-04-14 15:23:36,094] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:102)
    30. [2017-04-14 15:23:36,104] INFO ProducerConfig values:
    31. metric.reporters = []
    32. metadata.max.age.ms = 300000
    33. ...
    34. ...
    35. partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    36. linger.ms = 0
    37. (org.apache.kafka.clients.producer.ProducerConfig:178)
    38. [2017-04-14 15:23:36,138] INFO ProducerConfig values:
    39. metric.reporters = []
    40. metadata.max.age.ms = 300000
    41. reconnect.backoff.ms = 50
    42. sasl.kerberos.ticket.renew.window.factor = 0.8
    43. bootstrap.servers = [localhost:9092]
    44. ...
    45. ...
    46. value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    47. ssl.keymanager.algorithm = SunX509
    48. metrics.sample.window.ms = 30000
    49. partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    50. linger.ms = 0
    51. (org.apache.kafka.clients.producer.ProducerConfig:178)
    52. [2017-04-14 15:23:36,140] INFO Kafka version : 0.10.0.1-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
    53. [2017-04-14 15:23:36,141] INFO Kafka commitId : e7288edd541cee03 (org.apache.kafka.common.utils.AppInfoParser:84)
    54. [2017-04-14 15:23:36,141] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
    55. [2017-04-14 15:23:36,143] INFO Worker started (org.apache.kafka.connect.runtime.Worker:124)
    56. [2017-04-14 15:23:36,144] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:73)
    57. [2017-04-14 15:23:36,144] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
    58. [2017-04-14 15:23:36,261] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
    59. [2017-04-14 15:23:36,931] INFO Started o.e.j.s.ServletContextHandler@d1f74b8{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
    60. [2017-04-14 15:23:36,940] INFO Started ServerConnector@2ad3a1bb{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
    61. [2017-04-14 15:23:36,941] INFO Started @1685ms (org.eclipse.jetty.server.Server:379)
    62. [2017-04-14 15:23:36,942] INFO REST server listening at http://127.0.1.1:8083/, advertising URL http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
    63. [2017-04-14 15:23:36,942] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:58)
    64. [2017-04-14 15:23:36,945] INFO ConnectorConfig values:
    65. connector.class = org.radarcns.connect.mongodb.MongoDbSinkConnector
    66. tasks.max = 1
    67. name = kafka-connector-mongodb-sink
    68. (org.apache.kafka.connect.runtime.ConnectorConfig:178)
    69. [2017-04-14 15:23:36,955] INFO Creating connector kafka-connector-mongodb-sink of type org.radarcns.connect.mongodb.MongoDbSinkConnector (org.apache.kafka.connect.runtime.Worker:168)
    70. [2017-04-14 15:23:36,957] INFO Instantiated connector kafka-connector-mongodb-sink with version 0.10.0.1-cp1 of type org.radarcns.connect.mongodb.MongoDbSinkConnector (org.apache.kafka.connect.runtime.Worker:176)
    71. [2017-04-14 15:23:36,959] INFO User configuration are:
    72. mongo.port: 27017
    73. connector.class: org.radarcns.connect.mongodb.MongoDbSinkConnector
    74. mongo.password: ***
    75. mongo.database: mongodb-database
    76. record.converter.class: org.radarcns.connect.mongodb.serialization.RecordConverterFactory
    77. mongo.username: mongodb-username
    78. mongo.host: localhost
    79. topics: avrotest
    80. tasks.max: 1
    81. name: kafka-connector-mongodb-sink (org.radarcns.connect.mongodb.MongoDbSinkConnector:116)
    82. [2017-04-14 15:23:36,961] INFO Finished creating connector kafka-connector-mongodb-sink (org.apache.kafka.connect.runtime.Worker:181)
    83. [2017-04-14 15:23:36,961] INFO SinkConnectorConfig values:
    84. connector.class = org.radarcns.connect.mongodb.MongoDbSinkConnector
    85. tasks.max = 1
    86. topics = [avrotest]
    87. name = kafka-connector-mongodb-sink
    88. (org.apache.kafka.connect.runtime.SinkConnectorConfig:178)
    89. [2017-04-14 15:23:36,963] INFO At most 1 will be started (org.radarcns.connect.mongodb.MongoDbSinkConnector:126)
    90. [2017-04-14 15:23:36,971] INFO TaskConfig values:
    91. task.class = class org.radarcns.connect.mongodb.MongoDbSinkTask
    92. (org.apache.kafka.connect.runtime.TaskConfig:178)
    93. [2017-04-14 15:23:36,972] INFO Creating task kafka-connector-mongodb-sink-0 (org.apache.kafka.connect.runtime.Worker:315)
    94. [2017-04-14 15:23:36,972] INFO Instantiated task kafka-connector-mongodb-sink-0 with version 0.10.0.1-cp1 of type org.radarcns.connect.mongodb.MongoDbSinkTask (org.apache.kafka.connect.runtime.Worker:326)
    95. [2017-04-14 15:23:36,987] INFO ConsumerConfig values:
    96. metric.reporters = []
    97. ...
    98. ...
    99. send.buffer.bytes = 131072
    100. value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    101. group.id = connect-kafka-connector-mongodb-sink
    102. retry.backoff.ms = 100
    103. ...
    104. ...
    105. key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    106. ...
    107. ...
    108. (org.apache.kafka.clients.consumer.ConsumerConfig:178)
    109. [2017-04-14 15:23:37,001] INFO ConsumerConfig values:
    110. metric.reporters = []
    111. metadata.max.age.ms = 300000
    112. partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    113. ...
    114. ...
    115. auto.offset.reset = earliest
    116. (org.apache.kafka.clients.consumer.ConsumerConfig:178)
    117. [2017-04-14 15:23:37,038] INFO Kafka version : 0.10.0.1-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
    118. [2017-04-14 15:23:37,039] INFO Kafka commitId : e7288edd541cee03 (org.apache.kafka.common.utils.AppInfoParser:84)
    119. [2017-04-14 15:23:37,041] INFO Created connector kafka-connector-mongodb-sink (org.apache.kafka.connect.cli.ConnectStandalone:91)
    120. [2017-04-14 15:23:37,042] INFO 0 have been processed (org.radarcns.connect.mongodb.MongoDbSinkTask:56)
    121. [2017-04-14 15:23:37,115] INFO Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster:71)
    122. [2017-04-14 15:23:37,132] INFO 0 have been written in MongoDB 0 records need to be processed. (org.radarcns.connect.mongodb.MongoDbWriter:58)
    123. [2017-04-14 15:23:37,142] INFO No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, serverDescriptions=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster:71)
    124. [2017-04-14 15:23:37,413] INFO Opened connection [connectionId{localValue:1, serverValue:5}] to localhost:27017 (org.mongodb.driver.connection:71)
    125. [2017-04-14 15:23:37,415] INFO Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 10]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=722850} (org.mongodb.driver.cluster:71)
    126. [2017-04-14 15:23:37,510] INFO Opened connection [connectionId{localValue:2, serverValue:6}] to localhost:27017 (org.mongodb.driver.connection:71)
    127. [2017-04-14 15:23:37,534] INFO Sink task WorkerSinkTask{id=kafka-connector-mongodb-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)
    128. [2017-04-14 15:23:37,534] INFO Started MongoDbWriter (org.radarcns.connect.mongodb.MongoDbWriter:97)
    129. [2017-04-14 15:23:37,687] INFO Discovered coordinator nivethika-XPS-15-9550:9092 (id: 2147483647 rack: null) for group connect-kafka-connector-mongodb-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:528)
    130. [2017-04-14 15:23:37,688] INFO Revoking previously assigned partitions [] for group connect-kafka-connector-mongodb-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:292)
    131. [2017-04-14 15:23:37,688] INFO (Re-)joining group connect-kafka-connector-mongodb-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:349)
    132. [2017-04-14 15:23:37,731] INFO Successfully joined group connect-kafka-connector-mongodb-sink with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:457)
    133. [2017-04-14 15:23:37,732] INFO Setting newly assigned partitions [avrotest-0] for group connect-kafka-connector-mongodb-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:231)
    134. [2017-04-14 15:23:42,197] INFO Reflections took 5994 ms to scan 254 urls, producing 12617 keys and 82584 values (org.reflections.Reflections:229)
    135. [2017-04-14 15:24:07,043] INFO 4 have been processed (org.radarcns.connect.mongodb.MongoDbSinkTask:56)
    136. [2017-04-14 15:24:07,131] INFO 4 have been written in MongoDB 0 records need to be processed. (org.radarcns.connect.mongodb.MongoDbWriter:58)
    137. [2017-04-14 15:24:36,975] INFO [FLUSH-WRITER] Time-elapsed: 3.6946E-5 s (org.radarcns.connect.mongodb.MongoDbWriter:205)
    138. [2017-04-14 15:24:36,996] INFO [FLUSH] Time elapsed: 0.020898882 s (org.radarcns.connect.mongodb.MongoDbSinkTask:153)
    139. [2017-04-14 15:24:36,996] INFO WorkerSinkTask{id=kafka-connector-mongodb-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
    140. [2017-04-14 15:24:37,043] INFO 2 have been processed (org.radarcns.connect.mongodb.MongoDbSinkTask:56)
    141. [2017-04-14 15:24:37,131] INFO 2 have been written in MongoDB 0 records need to be processed. (org.radarcns.connect.mongodb.MongoDbWriter:58)
  • You can view the data in MongoDB. For the above example you may see results as follows.

    1. # log-in to mongo cli
    2. mongo
    3. MongoDB shell version: 3.2.12
    4. connecting to: test
    5. Server has startup warnings:
    6. 2017-04-14T13:14:29.177+0000 I CONTROL [initandlisten] ** WARNING: You are running this process as the root user, which is not recommended.
    7. 2017-04-14T13:14:29.178+0000 I CONTROL [initandlisten]
    8. 2017-04-14T13:14:29.178+0000 I CONTROL [initandlisten]
    9. 2017-04-14T13:14:29.178+0000 I CONTROL [initandlisten] ** WARNING: /sys/kernel/mm/transparent_hugepage/enabled is 'always'.
    10. 2017-04-14T13:14:29.178+0000 I CONTROL [initandlisten] ** We suggest setting it to 'never'
    11. 2017-04-14T13:14:29.178+0000 I CONTROL [initandlisten]
    12. 2017-04-14T13:14:29.178+0000 I CONTROL [initandlisten] ** WARNING: /sys/kernel/mm/transparent_hugepage/defrag is 'always'.
    13. 2017-04-14T13:14:29.178+0000 I CONTROL [initandlisten] ** We suggest setting it to 'never'
    14. 2017-04-14T13:14:29.178+0000 I CONTROL [initandlisten]
    15. # view available databases
    16. > show dbs
    17. admin 0.000GB
    18. mongodb-database 0.000GB
    19. local 0.000GB
    20. # switch to your database
    21. > use mongodb-database
    22. switched to db mongodb-database
    23. # view available collections. You should see collections with configured topic names and additional `OFFSETS` collection
    24. > show collections
    25. OFFSETS
    26. avrotest
    27. # query the data inside collections
    28. > db.avrotest.find()
    29. { "_id" : null, "name" : "somee" }
    30. > db.OFFSETS.find()
    31. { "_id" : { "topic" : "avrotest", "partition" : 0 }, "offset" : NumberLong(5) }
    32. >
  • To stop your connector press CTRL-C

Developer guide

This MongoDB-Sink-Connector works based on RecordConverters to convert a SinkRecord to a Document. The default RecordConverter is GenericRecordConverter, which converts a record-key as _id and adds a field for every field-name from record-value. The GenericRecordConverter supports conversion of most of the primitive types and collections.

For Avro records with complex schemas, or for custom collection format it is recommended to write your own RecordConverter and register it to an extended RecordConverterFactory. Writing a custom RecordConverter is relatively straight forward. The interface requires two methods to be implemented.

  1. /**
  2. * Converts Kafka records to MongoDB documents.
  3. */
  4. public interface RecordConverter {
  5. /**
  6. * <p>The schema names used are the fully qualified (including namespace) and case-sensitive
  7. * names. If the converter requires records with both a key and a value schema, the returned
  8. * format is "KeySchemaName-ValueSchemaName". If the key is not required, only "ValueSchemaName"
  9. * may be returned. KeySchemaName and ValueSchemaName may be substituted by the Object class
  10. * that it supports. If the converter supports all types of data, return null.
  11. */
  12. Collection<String> supportedSchemaNames();
  13. /**
  14. * Convert a Kafka record to a BSON document.
  15. *
  16. * @param record record to convert
  17. * @return BSON document
  18. * @throws DataException if the record cannot be converted by the current converter.
  19. */
  20. Document convert(SinkRecord record) throws DataException;
  21. }

Sample RecordConverter Implementation

  1. Implement a custom RecordConverter. An example is given below. Consider a record consisting key-schema

    1. {
    2. "namespace": "org.radarcns.key",
    3. "type": "record",
    4. "name": "MeasurementKey",
    5. "doc": "Measurement key in the RADAR-base project",
    6. "fields": [
    7. {"name": "userId", "type": "string", "doc": "user ID"},
    8. {"name": "sourceId", "type": "string", "doc": "device source ID"}
    9. ]
    10. }

    and a value-schema as below.

    1. {
    2. "namespace": "org.radarcns.application",
    3. "type": "record",
    4. "name": "ApplicationRecordCounts",
    5. "doc": "Number of records cached or created.",
    6. "fields": [
    7. {"name": "time", "type": "double", "doc": "device timestamp in UTC (s)"},
    8. {"name": "timeReceived", "type": "double", "doc": "device receiver timestamp in UTC (s)"},
    9. {"name": "recordsCached", "type": "int", "doc": "number of records currently being cached", "default": -1},
    10. {"name": "recordsSent", "type": "int", "doc": "number of records sent since application start"},
    11. {"name": "recordsUnsent", "type": "int", "doc": "number of unsent records", "default": -1}
    12. ]
    13. }

    These samples would give us the KeySchemaName as org.radarcns.key.MeasurementKey and ValueSchemaName as org.radarcns.application.ApplicationRecordCounts. Lets call our custom RecordConverter as CountsStatusRecordConverter. The implementation can be as simple as below.

    1. /**
    2. * RecordConverter to convert a StatusCounts record to a MongoDB Document.
    3. */
    4. public class CountsStatusRecordConverter implements RecordConverter {
    5. /**
    6. * Returns the list of supported schemas, which behaves as the id to select suitable
    7. * RecordConverter for a SinkRecord.
    8. *
    9. * @return a list of supported Schemas
    10. */
    11. @Override
    12. public Collection<String> supportedSchemaNames() {
    13. return Collections.singleton("org.radarcns.key.MeasurementKey" + "-"
    14. + "org.radarcns.application.ApplicationRecordCounts");
    15. }
    16. /**
    17. * Converts a ServerStatus SinkRecord into a MongoDB Document.
    18. *
    19. * @param sinkRecord record to be converted
    20. * @return converted MongoDB Document to write
    21. */
    22. @Override
    23. public Document convert(SinkRecord sinkRecord) throws DataException {
    24. Struct key = (Struct) sinkRecord.key();
    25. Struct value = (Struct) sinkRecord.value();
    26. return new Document("_id", key.get("userId") + "-" + key.get("sourceId"))
    27. .append("user", key.getString("userId"))
    28. .append("source", key.getString("sourceId"))
    29. .append("recordsCached", value.getInt32("recordsCached"))
    30. .append("recordsSent", value.getInt32("recordsSent"))
    31. .append("timestamp", Converter.toDateTime(value.get("timeReceived")));
    32. }
    33. }
  2. Register implemented RecordConverter to an extended RecordConverterFactory.

    1. package org.radarcns.connect.mongodb.example;
    2. /**
    3. * Extended RecordConverterFactory to allow customized RecordConverter class that are needed
    4. */
    5. public class RecordConverterFactoryExample extends RecordConverterFactory {
    6. /**
    7. * Overrides genericConverter to append custom RecordConverter class to RecordConverterFactory
    8. *
    9. * @return list of RecordConverters available
    10. */
    11. protected List<RecordConverter> genericConverters() {
    12. List<RecordConverter> recordConverters = new ArrayList<RecordConverter>();
    13. recordConverters.addAll(super.genericConverters());
    14. recordConverters.add(new CountsStatusRecordConverter());
    15. return recordConverters;
    16. }
    17. }
  3. Use extended RecordConverterFactoryExample in sink.properties

    1. # Factory class to do the actual record conversion
    2. record.converter.class=org.radarcns.connect.mongodb.example.RecordConverterFactoryExample

Notes

The only available setting is the number of records returned in a single call to poll() (i.e. consumer.max.poll.records param inside standalone.properties)

Connectors can be run inside any machine where Kafka has been installed. Therefore, you can fire them also inside a machine that does not host a Kafka broker.

To reset a connector running in standalone mode you have to stop it and then modify name and offset.storage.file.filename respectively inside sink.properties and standalone.properties

Javadoc

More info and Javadocs of the connector are available at -

Contributing

All of the contribution code should be formatted using the Google Java Code Style Guide.
If you want to contribute a feature or fix browse our issues, and please make a pull request.