项目作者: oridonner

项目描述 :
Kafka Twitter Source Connector
高级语言: Shell
项目地址: git://github.com/oridonner/kafka-connect-twitter.git
创建时间: 2018-12-02T10:52:02Z
项目社区:https://github.com/oridonner/kafka-connect-twitter

开源协议:Apache License 2.0

下载


Build Twitter Connector acoordinng to the instructions posted on Robin Moffat’s blog.

Use jcustenborder connector and not Eneco’s connector, last one works only with kafka 0.10 !!!

Build Twitter Connector

Clone source code from Git:

git clone https://github.com/jcustenborder/kafka-connect-twitter.git

Pay attention to this bug-fix:

Change:

<twitter4j.version>[4.0,)</twitter4j.version>
To:

<twitter4j.version>4.0.6</twitter4j.version>

Get inside the new folder and build package with Maven :

cd kafka-connect-twitter

mvn clean package

Untar package:

cd target

tar -xvf kafka-connect-twitter-0.2-SNAPSHOT.tar.gz

Copy .jar files from /usr/share/kafka-connect/kafka-connect-twitter/ to libs/ folder:

annotations-2.0.1.jar

connect-utils-0.3.140.jar

freemarker-2.3.25-incubating.jar

guava-18.0.jar

jackson-annotations-2.8.0.jar

jackson-core-2.8.5.jar

jackson-databind-2.8.5.jar

javassist-3.19.0-GA.jar

kafka-connect-twitter-0.2-SNAPSHOT.jar

reflections-0.9.10.jar

twitter4j-core-4.0.6.jar

twitter4j-stream-4.0.6.jar

Test Twitter Connector

We will test Twitter Connector with Kafka Connect in a standalone mode. First we will start a Zookeeper server and a single Kafka broker.

Start Zookeeper

Start Zookeeper on localhost:2181 :

./bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka Broker

Start a single kafka broker on localhost:9092 :

./bin/kafka-server-start.sh config/server.properties

Twitter Access Tokens

Get/create Twitter Access Token here. Twitter App name: Ori-Kafka-Connect-Demo

Start Kafka Connect in standalone mode

Start Kafka Connect in standalone mode, based on twitter-source-connector.properties connector config file:

./bin/connect-standalone.sh config/connect-standalone.properties config/twitter-source-connector.properties

Test

Test if topic twitter-data was created:

./bin/kafka-topics.sh --zookeeper localhost:2181 --list

Test Kafka Consumer topic output:

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic twitter-data --from-beginning

Output

This is a data sample of twitter-data topic. The data consists of schema and payload key:value pairs.

Better viewed on JSON online parser.

  1. {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"created_at"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"screen_name"},{"type":"string","optional":true,"field":"location"},{"type":"boolean","optional":false,"field":"verified"},{"type":"int32","optional":false,"field":"friends_count"},{"type":"int32","optional":false,"field":"followers_count"},{"type":"int32","optional":false,"field":"statuses_count"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.User","field":"user"},{"type":"string","optional":true,"field":"text"},{"type":"string","optional":true,"field":"lang"},{"type":"boolean","optional":false,"field":"is_retweet"},{"type":"struct","fields":[{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":true,"field":"text"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.Hashtag"},"optional":true,"field":"hashtags"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":true,"field":"display_url"},{"type":"string","optional":true,"field":"expanded_url"},{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"type"},{"type":"string","optional":true,"field":"url"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.Medium"},"optional":true,"field":"media"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":true,"field":"display_url"},{"type":"string","optional":true,"field":"expanded_url"},{"type":"string","optional":true,"field":"url"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.Url"},"optional":true,"field":"urls"},{"type":"array","items":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"screen_name"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.UserMention"},"optional":true,"field":"user_mentions"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.Entities","field":"entities"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.Tweet"},"payload":{"id":1052500156539658240,"created_at":"2018-10-17T10:02:46.000+0000","user":{"id":298914263,"name":"Jesús Sánchez","screen_name":"jesussp_","location":"México","verified":false,"friends_count":869,"followers_count":188,"statuses_count":1825},"text":"RT @javacodegeeks: How much faster is #Java 8? https://t.co/QMI2Fuj0Em","lang":"en","is_retweet":true,"entities":{"hashtags":[{"text":"Java"}],"media":[],"urls":[{"display_url":"buff.ly/2JH7qE5","expanded_url":"https://buff.ly/2JH7qE5","url":"https://t.co/QMI2Fuj0Em"}],"user_mentions":[{"id":150820027,"name":"Java Code Geeks","screen_name":"javacodegeeks"}]}}}