项目作者: slavahatnuke

项目描述 :
Issues related to Confluent KAFKA REST Proxy
高级语言: Shell
项目地址: git://github.com/slavahatnuke/kafka-rest-scalability.git
创建时间: 2018-08-22T22:06:04Z
项目社区:https://github.com/slavahatnuke/kafka-rest-scalability

开源协议:

下载


KAFKA REST Proxy scalability issues

Cases describe issues related to "auto.commit.enable": "false" and scalability.

Case 1

First KAFKA REST Proxy 1 locks Second KAFKA REST Proxy 2.

  1. +---------------------+
  2. | CURL produce |
  3. | topic: jsontest |
  4. +----------+----------+
  5. | [ok] produce 10 records
  6. |
  7. +-------------------+ | +-------------------+
  8. | CURL consumer 1 | | | CURL consumer 2 |
  9. | | | | |
  10. +-------+-----------+ | +------+------------+
  11. [ok] create consumer | | | [ok] create consumer
  12. [ok] subscribe | | | [ok] subscribe
  13. [ok] consume records | | | [hung] consume records
  14. | | |
  15. +-----v-------+ | +----v--------+
  16. | Kafka REST <--------+ | Kafka REST |
  17. | port:18082 | | port:28082 |
  18. +------+------+ +------+------+
  19. | |
  20. | |
  21. | |
  22. +--------v----------------------------v------------+
  23. | Kafka |
  24. | port:9092 |
  25. +----------------+---------------------------------+
  26. |
  27. +----------------v---------------------------------+
  28. | Zookeeper |
  29. | port:2181 |
  30. +--------------------------------------------------+

Steps:

  • 1/ Start services

docker-compose.yml

  1. version: "3.5"
  2. services:
  3. zookeeper:
  4. image: confluentinc/cp-zookeeper:5.0.0
  5. ports:
  6. - "2181:2181"
  7. environment:
  8. ZOOKEEPER_CLIENT_PORT: 2181
  9. ZOOKEEPER_TICK_TIME: 2000
  10. kafka:
  11. image: confluentinc/cp-kafka:5.0.0
  12. depends_on:
  13. - zookeeper
  14. ports:
  15. - "9092:9092"
  16. environment:
  17. KAFKA_BROKER_ID: 1
  18. KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
  19. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
  20. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
  21. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  22. KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  23. kafka-rest-1:
  24. image: confluentinc/cp-kafka-rest:5.0.0
  25. depends_on:
  26. - kafka
  27. ports:
  28. - 18082:8082
  29. environment:
  30. KAFKA_REST_ID: "1"
  31. KAFKA_REST_HOST_NAME: kafka-rest-1
  32. KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:9092'
  33. KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
  34. KAFKA_REST_PRODUCER_THREADS: "10"
  35. kafka-rest-2:
  36. image: confluentinc/cp-kafka-rest:5.0.0
  37. depends_on:
  38. - kafka
  39. ports:
  40. - 28082:8082
  41. environment:
  42. KAFKA_REST_ID: "2"
  43. KAFKA_REST_HOST_NAME: kafka-rest-2
  44. KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:9092'
  45. KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
  46. KAFKA_REST_PRODUCER_THREADS: "10"
  • Start services.
    docker-compose up

  • 2/ Create topic with partitions

  • topic + 10 partitions: docker-compose exec kafka bash -c "kafka-topics --zookeeper zookeeper:2181 --topic jsontest --create --partitions 10 --replication-factor 1"
  • describe to be sure: docker-compose exec kafka bash -c "kafka-topics --zookeeper zookeeper:2181 --topic jsontest --describe"
  • 3/ produce records
  • 10 simple records: produce 10 records: {v: 0} … {v: 9}
  • KAFKA REST First port 18082

    1. curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
    2. -H "Accept: application/vnd.kafka.v2+json" \
    3. --data '{"records":[{"value":{"v":"0"}}, {"value":{"v":"1"}}, {"value":{"v":"2"}}, {"value":{"v":"3"}}, {"value":{"v":"4"}}, {"value":{"v":"5"}}, {"value":{"v":"6"}}, {"value":{"v":"7"}}, {"value":{"v":"8"}}, {"value":{"v":"9"}}]}' \
    4. "http://localhost:18082/topics/jsontest"
  • 4/ Create CURL consumer 1

  • It creates consumer instance and subscribe topic jsontest.
    Kafka REST 1 port: 18082
  1. # create consumer 1
  2. # "auto.commit.enable": "false"
  3. curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
  4. --data '{"name": "my_consumer_instance_1", "auto.commit.enable": "false", "format": "json", "auto.offset.reset": "earliest"}' \
  5. http://localhost:18082/consumers/my_json_consumer
  6. printf "\n"
  7. # subscribe
  8. curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
  9. --data '{"topics":["jsontest"]}' \
  10. http://localhost:18082/consumers/my_json_consumer/instances/my_consumer_instance_1/subscription
  • 5/ Consumer 1 reads records
  • It consumes from Kafka REST 1 port 18082
  1. # consume 1
  2. curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
  3. http://localhost:18082/consumers/my_json_consumer/instances/my_consumer_instance_1/records?max_bytes=10
  • Results looks like:

    1. [{"topic":"jsontest","key":null,"value":{"v":"3"},"partition":4,"offset":0}]
    2. [{"topic":"jsontest","key":null,"value":{"v":"2"},"partition":5,"offset":0}]
    3. [{"topic":"jsontest","key":null,"value":{"v":"8"},"partition":6,"offset":0}]
  • Or it could read multiple records if max_bytes=20

  1. ## Mesages from multiple partitions
  2. [{"topic":"jsontest","key":null,"value":{"v":"3"},"partition":4,"offset":0},{"topic":"jsontest","key":null,"value":{"v":"2"},"partition":5,"offset":0}]
  • What do you think is it ok that consumer reads multiple partitions at once?
    when we use "auto.commit.enable": "false". Seems it could be the issue.

  • 6/ Create CURL consumer 2

  • It creates consumer instance and subscribe topic jsontest.
    Kafka REST 2 port: 28082
  1. # create consumer 2
  2. # "auto.commit.enable": "false"
  3. curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
  4. --data '{"name": "my_consumer_instance_2", "auto.commit.enable": "false", "format": "json", "auto.offset.reset": "earliest"}' \
  5. http://localhost:28082/consumers/my_json_consumer
  6. printf "\n"
  7. # subscribe
  8. curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
  9. --data '{"topics":["jsontest"]}' \
  10. http://localhost:28082/consumers/my_json_consumer/instances/my_consumer_instance_2/subscription
  • 7/ Consumer 2 DOES NOT read records
  • It just hung and does not give any answer for long time (~5 mins).
  • It seems like the first kafka instance locked (assigned) all topic partitions and second one waits.
  • There is a problem with scalability, if we have multiple Kakfa REST proxies it does not bring value.
  • And it looks like Kakfa REST Proxy only vertical scalable now.
  1. # consume 2
  2. curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
  3. http://localhost:28082/consumers/my_json_consumer/instances/my_consumer_instance_2/records?max_bytes=10
  • Opinion
  • I guess it could be wrong kafka + kafka rest configuration from my side that leads to behaviour described before.
  • From my observations KAFKA Rest consumer instance 1 reads records / messages from multiple partitions, it means that simple consumers (kafka clients) “take” partitions and the second consumer instance 2 does not have ability read messages because all partitions are “busy”.
  • When I delete consumer instance 1 second consumer consumer instance 2 works as expected.

  • Questions

  • If I am wrong with kafka or/and kafka rest configuration could you suggest or correct this one to fix the issue?
  • If it’s the issue: What information can I add to easily reproduce a case or help?