A Kafka connector for CouchDB
kafka-connect-couchdb is a Kafka Connect
plugin for transferring data between CouchDB and Kafka.
The project consists of two parts, namely a sink connector and a source connector. The sink connector is used
to store data from Kafka into CouchDB. The source connector is used to publish data from CouchDB to Kafka.
The sink connector stores data from Kafka into CouchDB. Described below are it’s features and configuration.
By providing a mapping from Kafka topic to CouchDB database name using the sink-topics-to-databases-mapping
config value, you can specify which topics should be written into which database. Make sure you also add all
topics under the topics
config value, otherwise they will be ignored.
Every topic you specify under sink-topics-to-databases-mapping
should also be specified undertopics-to-id-fields-mapping
so that the connector knows what field to use to identify unique documents.
CouchDB uses a conflict mechanism to deal with concurrent updates. When one of these conflicts occur the sink
connector will resolve it. You have control over how this is done by writing your own implementation ofcom.xebia.kafka.connect.couchdb.merging.Merger
, adding it to the classpath and providing it’s complete path
using the merger
config value.
By default the com.xebia.kafka.connect.couchdb.merging.LatestWinsMerger
is used. As the name implies it
simply uses the latest (e.g. incoming) document as the new leading document and deletes the rest.
During conflict resolution the data might change and the process will have to start over. How many times this
process is tried can be configured using the max-conflicting-docs-fetch-retries
config value.
The source connector publishes data from CouchDB to Kafka. Described below are it’s features and
configuration.
By providing a mapping from Kafka topic to CouchDB database name using thesource-topics-to-databases-mapping
config value, you can specify which CouchDB databases should be published
to which Kafka topic. This is done by listening to the changes feed of the provided databases.
Specific metadata is added to each produced Kafka record, namely:
As the CouchDB revisions are used as Kafka offsets, Kafka will know not to send old revisions to consumers who
have already received them. Take note, however, that these documents will still be sent to Kafka and will
take up memory if Kafka is configured to persist for a certain amount of time.
How far back the changes feed should be consulted can be configured per database using thedatabases-to-changes-since-mapping
config value. The default is 0
, meaning all changes that have ever
occurred in that specific database will be published. A value of now
specifies all changes from the moment
the connector starts listening. A specific update sequence ID can also be supplied so that all changes since
that update sequence will be published.
The changes from the CouchDB databases’ changes feeds are streamed into the connector. Kafka itself, however,
polls these changes. Therefore the changes are collected in memory until Kafka polls them. At that moment all
changes in memory are returned until there are no more or the amount in the config valuemax-source-batch-size
is reached.
If there are a lot of changes coming in it might take a long time for the
memory to be depleted, if it happens at all. Therefore the max batch size will ensure that once that size is
reached the batch is given to Kafka for processing.