项目作者: HolyRoy

项目描述 :
flume的rabbitmq sink
高级语言: Java
项目地址: git://github.com/HolyRoy/flume-rabbitMq.git
创建时间: 2016-11-17T07:39:09Z
项目社区:https://github.com/HolyRoy/flume-rabbitMq

开源协议:

下载


flume rabbitmq

这是一个flume里的 rabbitmq的插件,包含rabbitmq source和sink

这个插件在 Flume 1.5.0测试没问题.

原作者为:(https://github.com/aweber/rabbitmq-flume-plugin/releases)

但是其中对于routingKey部分代码有问题.对此处进行了修改.

Installation Instructions

To install the plugin, copy the jar file to the $FLUME_LIB directory. For
example, the $FLUME_LIB directory for Cloudera (CDH5) installed Flume, the
$FLUME_LIB is /usr/lib/flume/lib.

Behavior

Source

The RabbitMQ Source component enables RabbitMQ to act as a source for Flume events.
You must create and bind the queue you would like the source to consume from manually
in RabbitMQ, prior to starting Flume.

When the Source consumes a message, any message properties() that are set on the message
will be added to the Flume Event headers, including the exchange and routing key
for the message. (
) This excludes the headers property which is a free-form key/value table.
This is currently excluded due to the complexity of correctly parsing and dealing with the
different data types that could be returned as values in the headers table.

Using the Flume Event headers, you could use the type property, exchange,
or routing-key as part of the file path or name when using the HDFS sink.

By default, there is a single consumer thread in the RabbitMQ source.

Configuration

Each component has its own configuration directives, but the Source and Sink
share common RabbitMQ connection parameters.

Source

The Source component has the following configuration options:

Variable Default Description
host localhost The RabbitMQ host to connect to
port 5672 The port to connect on
ssl false Connect to RabbitMQ via SSL
virtual-host / The virtual host name to connect to
username guest The username to connect as
password guest The password to use when connecting
queue Required field specifying the name of the queue to consume from
auto-ack false Enable auto-acknowledgement for higher throughput with the chance of message loss
prefetch-count 0 The Basic.QoS prefetch count to specify for consuming
timeout -1 The timeout the consumer will wait for rabbitmq to deliver a message before retrying
threads 1 The number of consumer threads to create

Possible event header keys

  • exchange
  • routing-key
  • app-id
  • content-encoding
  • content-type
  • correlation-id
  • delivery-mode
  • expires
  • message-id
  • priority
  • reply-to
  • timestamp
  • type
  • user-id

Example

  1. a1.sources.r1.channels = c1
  2. a1.sources.r1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
  3. a1.sources.r1.hostname = localhost
  4. a1.sources.r1.port = 5672
  5. a1.sources.r1.virtual-host = /
  6. a1.sources.r1.username = flume
  7. a1.sources.r1.password = rabbitmq
  8. a1.sources.r1.queue = events_for_s3
  9. a1.sources.r1.prefetch_count = 10

Sink

The RabbitMQ Sink allows for Flume events to be published to RabbitMQ. The sink

Variable Default Description
host localhost The RabbitMQ host to connect to
port 5672 The port to connect on
ssl false Connect to RabbitMQ via SSL
virtual-host / The virtual host name to connect to
username guest The username to connect as
password guest The password to use when connecting
exchange amq.topic The exchange to publish the message to
routing-key The routing key to use when publishing
auto-properties true Automatically populate AMQP message properties
mandatory-publish false Enable mandatory publishing
publisher-confirms false Enable publisher confirmations

Headers

When publishing an event message, the RabbitMQ Sink will first look to the event
headers for a routing-key entry. If it is set, it will use that value when
publishing the message. If it is not set, it will fall back to the configured
routing-key value which defaults to an empty string.

If the auto-properties configuration option is enabled (default), the event
headers will be examined for standard AMQP Basic.Properties entries (sans the
headers AMQP table). If the property is set in the event headers, it will be set
in the message properties. Additionally, if the app-id value is not set in the
headers, it will default to RabbitMQSink. If timestamp is not set in the
headers, it will default to the current system time.

Available property keys
  • app-id
  • content-encoding
  • content-type
  • correlation-id
  • delivery-mode
  • expires
  • message-id
  • priority
  • reply-to
  • timestamp
  • type
  • user-id

Example

  1. a1.sinks.k1.channels = c1
  2. a1.sinks.k1.type = com.aweber.flume.sink.rabbitmq.RabbitMQSink
  3. a1.sinks.k1.hostname = localhost
  4. a1.sinks.k1.port = 5672
  5. a1.sinks.k1.virtual-host = /
  6. a1.sinks.k1.username = flume
  7. a1.sinks.k1.password = rabbitmq
  8. a1.sinks.k1.exchange = amq.topic
  9. a1.sinks.k1.routing-key = flume.event
  10. a1.sinks.k1.publisher-confirms = true

Version History

  • v1.0.x - Not yet released
    • Timeout for consumer to allow clean shutdown and configuration reload
  • v1.0.2 - Released 2015-01-12
    • Fix an issue where the counter names were null for both Source and Sink

Build Instructions

To build from source, use maven:

  1. mvn package

This will download all of the dependencies required for building the plugin and
provide a jar file in the target/ directory.

flume-rabbitMq