项目作者: ciena

项目描述 :
Kafka client written in Twisted Python
高级语言: Python
项目地址: git://github.com/ciena/afkak.git
创建时间: 2016-03-14T22:24:30Z
项目社区:https://github.com/ciena/afkak

开源协议:Apache License 2.0

下载


Afkak: Twisted Python Kafka Client" class="reference-link"> Afkak: Twisted Python Kafka Client

PyPI
calver: YY.MM.MICRO
Apache 2.0
Documentation

Afkak is a Twisted-native Apache Kafka client library.
It provides support for:

  • Producing messages, with automatic batching and optional compression.
  • Consuming messages, with group coordination and automatic commit.

Learn more in the documentation, download from PyPI, or review the contribution guidelines.
Please report any issues on GitHub.

Status

Afkak supports these Pythons:

  • CPython 3.7, 3.8, and 3.9
  • PyPy3

We aim to support Kafka 1.1.x and later.
Integration tests are run against these Kafka broker versions:

  • 0.9.0.1
  • 1.1.1

Testing against 2.0.0 is planned (see #45).

Newer broker releases will generally function, but not all Afkak features will work on older brokers.
In particular, the coordinated consumer won’t work before Kafka 0.9.0.1.
We don’t recommend deploying such old releases of Kafka anyway, as they have serious bugs.

Usage

High level

Note: This code is not meant to be runnable. See producer_example and consumer_example for runnable example code.

  1. from afkak.client import KafkaClient
  2. from afkak.consumer import Consumer
  3. from afkak.producer import Producer
  4. from afkak.common import (OFFSET_EARLIEST, PRODUCER_ACK_ALL_REPLICAS,
  5. PRODUCER_ACK_LOCAL_WRITE)
  6. kClient = KafkaClient("localhost:9092")
  7. # To send messages
  8. producer = Producer(kClient)
  9. d1 = producer.send_messages("my-topic", msgs=[b"some message"])
  10. d2 = producer.send_messages("my-topic", msgs=[b"takes a list", b"of messages"])
  11. # To get confirmations/errors on the sends, add callbacks to the returned deferreds
  12. d1.addCallbacks(handleResponses, handleErrors)
  13. # To wait for acknowledgements
  14. # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to
  15. # a local log before sending response
  16. # [ the default ]
  17. # PRODUCER_ACK_ALL_REPLICAS : server will block until the message is committed
  18. # by all in sync replicas before sending a response
  19. producer = Producer(kClient,
  20. req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE,
  21. ack_timeout=2000)
  22. responseD = producer.send_messages("my-topic", msgs=[b"message"])
  23. # Using twisted's @inlineCallbacks:
  24. responses = yield responseD
  25. if response:
  26. print(response[0].error)
  27. print(response[0].offset)
  28. # To send messages in batch: You can use a producer with any of the
  29. # partitioners for doing this. The following producer will collect
  30. # messages in batch and send them to Kafka after 20 messages are
  31. # collected or every 60 seconds (whichever comes first). You can
  32. # also batch by number of bytes.
  33. # Notes:
  34. # * If the producer dies before the messages are sent, the caller would
  35. # * not have had the callbacks called on the send_messages() returned
  36. # * deferreds, and so can retry.
  37. # * Calling producer.stop() before the messages are sent will
  38. # errback() the deferred(s) returned from the send_messages call(s)
  39. producer = Producer(kClient, batch_send=True,
  40. batch_send_every_n=20,
  41. batch_send_every_t=60)
  42. responseD1 = producer.send_messages("my-topic", msgs=[b"message"])
  43. responseD2 = producer.send_messages("my-topic", msgs=[b"message 2"])
  44. # To consume messages
  45. # define a function which takes a list of messages to process and
  46. # possibly returns a deferred which fires when the processing is
  47. # complete.
  48. def processor_func(consumer, messages):
  49. # Store_Messages_In_Database may return a deferred
  50. result = store_messages_in_database(messages)
  51. # record last processed message
  52. consumer.commit()
  53. return result
  54. the_partition = 3 # Consume only from partition 3.
  55. consumer = Consumer(kClient, "my-topic", the_partition, processor_func)
  56. d = consumer.start(OFFSET_EARLIEST) # Start reading at earliest message
  57. # The deferred returned by consumer.start() will fire when an error
  58. # occurs that can't handled by the consumer, or when consumer.stop()
  59. # is called
  60. yield d
  61. consumer.stop()
  62. kClient.close()

Keyed messages

  1. from afkak.client import KafkaClient
  2. from afkak.producer import Producer
  3. from afkak.partitioner import HashedPartitioner, RoundRobinPartitioner
  4. kafka = KafkaClient("localhost:9092")
  5. # Use the HashedPartitioner so that the producer will use the optional key
  6. # argument on send_messages()
  7. producer = Producer(kafka, partitioner_class=HashedPartitioner)
  8. producer.send_messages("my-topic", "key1", [b"some message"])
  9. producer.send_messages("my-topic", "key2", [b"this method"])

Low level

  1. from afkak.client import KafkaClient
  2. kafka = KafkaClient("localhost:9092")
  3. req = ProduceRequest(topic="my-topic", partition=1,
  4. messages=[KafkaProtocol.encode_message(b"some message")])
  5. resps = afkak.send_produce_request(payloads=[req], fail_on_error=True)
  6. kafka.close()
  7. resps[0].topic # b"my-topic"
  8. resps[0].partition # 1
  9. resps[0].error # 0 (hopefully)
  10. resps[0].offset # offset of the first message sent in this request

Install

Afkak releases are available on PyPI.

Because the Afkak dependencies Twisted and python-snappy have binary extension modules you will need to install the Python development headers for the interpreter you wish to use:





Debian/Ubuntu:
sudo apt-get install build-essential python3-dev pypy3-dev libsnappy-dev
OS X
brew install python pypy snappy

pip install virtualenv

Then Afkak can be installed with pip as usual:

License

Copyright 2013, 2014, 2015 David Arthur under Apache License, v2.0. See LICENSE

Copyright 2014, 2015 Cyan, Inc. under Apache License, v2.0. See LICENSE

Copyright 2015–2021 Ciena Corporation under Apache License, v2.0. See LICENSE

This project began as a port of the kafka-python library to Twisted.

See AUTHORS.md for the full contributor list.

Tests

In order to run Afkak’s tests, you need to install the
dependencies as specified in the install section.

The Afkak test suite uses Tox to execute the tests
in all the supported Python versions.
The preferred method to run the tests is to install Tox in a virtual
environment before running the tests:

  1. make venv

Testing Strategy

Afkak has two types of tests:

  • Unit tests — unit tests are fast tests.
    They don’t do I/O.

  • Integration tests — tests that run against a real Kafka broker.

Run the unit tests

To run all unit tests in all the supported Python versions (requires all
the versions to be installed in the system where the tests will run):

  1. make toxu

Alternatively, you might want to run unit tests in a list of specific
Python versions:

  1. .env/bin/tox -e py35-unit-snappy,py38-unit-snappy

Please run the tests on the minimum and maximum supported Python versions
before submitting a pull request.

Run the integration tests

The integration tests will actually start up real local ZooKeeper
instance and Kafka brokers, and send messages in using the client.

The makefile knows how to download several versions of Kafka.
This will run just the integration tests against Kafka 1.1.1:

  1. KAFKA_VER=1.1.1 make toxi

Run all the tests against the default Kafka version

  1. make toxa

Run the integration tests against all the Kafka versions the Makefile knows about

  1. make toxik