go>> ank>> 返回
项目作者: sunary

项目描述 :
Python data pipeline
高级语言: Python
项目地址: git://github.com/sunary/ank.git
创建时间: 2016-06-21T23:31:37Z
项目社区:https://github.com/sunary/ank

开源协议:Other

下载


ANK - Python Streaming system

Overview:

Python Streaming system, REST-API and Schedule task using queue message(rabbitMQ, zeroMQ, kafka)
Each processor is a chain, put it together to make a pipeline.

Support chain model:

  • 1 - 1:
    1. processor - processor
  • 1 - n:
    1. / processor
    2. processor - processor
    3. \ processor
  • n - 1:
    1. processor \
    2. processor - processor
    3. processor /
  • n - n:
    1. processor - processor processor - processor processor - processor
    2. processor - processor processor / processor <
    3. processor - processor processor - processor processor - processor
  • join message:
    1. message1 \
    2. message2 - [message1, message2, message3]
    3. message3 /
  • split message:
    1. / message1
    2. [message1, message2, message3] - message2
    3. \ message3

Requirements:

  • Python 2.x
  • Python 3.x

How to use:

  • Install:

    • From github:
      • clone this project: git clone git@github.com:sunary/ank.git
      • install: python setup.py install
    • From pypi:
      • pip install ank
  • Create new Service:

    1. $ ank -c TestService
    2. $ cd TestService

    then continue using below commands

  • Edit app (processor.py):

    • Example:

      1. from ank.components.pipe_app import PipeApp
      2. class ExampleApp(PipeApp):
      3. def start(self):
      4. for i in range(100):
      5. self.chain_process(i)
      6. def process(self, message=None):
      7. '''
      8. Args:
      9. message: {'content': (*) 'content of message',
      10. 'flags': (list|tuple) 'define next process will be use'}
      11. raise TypeError if you don't declare this in return of before branching-processor
      12. if 'flags' == [True, True]: process both in next branching-processors
      13. if 'flags' == [True, False]: process 1st processor in next branching-processors
      14. if 'flags' == [False, True]: process 2nd processor in next branching-processors
      15. if 'flags' == [False, False]: no processor, stop chain
      16. is None: stop chain
      17. '''
      18. return message['content'] + 1
  • Edit services and chains (services.yml):

    • Syntax:

      1. services:
      2. Object1:
      3. - class: module.name.ClassName
      4. - arguments: [$Object, %variable%]
      5. AnkChain2:
      6. - class: ank.components.module_name.XXXApps
      7. - arguments: ~
      8. chains:
      9. - Object1
      10. - AnkChain2
    • Example:

      1. services:
      2. StartApp:
      3. class: processor.StartApp
      4. arguments: [$MongoClient, $Redis, '%batch_size%']
      5. Mongodb:
      6. class: pymongo.MongoClient
      7. arguments: ['%mongo_host%', '%mongo_port%']
      8. Redis:
      9. class: redis.client.StrictRedis
      10. arguments: ['%redis_host%', '%redis_port%']
      11. OtherApp:
      12. class: processor.OtherApp
      13. arguments: ~
      14. LogApp:
      15. class: components.log_app.LogApp
      16. arguments: ~
      17. chains:
      18. - StartApp
      19. - LogApp
      20. - OtherApp
      21. - [OddApp, EvenApp] # will be processed depend on key `flags`
      22. - OtherApp

      ANK will read top-down chains, find correspond services and get parameters from settings.yml.

  • Generate and edit setting (settings.yml):

    1. $ ank create -c BaseApp
    • Example:

      1. parameters:
      2. mongo_host: localhost
      3. mongo_port: 27017
      4. redis_host: localhost
      5. redis_port: 6379
      6. queue_uri: 'amqp://admin:admin@localhost:5672/'
      7. exchange_name: InputExchange
      8. routing_key: ExchangeToQueue
      9. batch_size: 100

      Help you create settings template file. Just rename from _settings.yml to settings.yml and fill in values.

  • Build Service (create docker image) and run:

    1. $ ank build
    2. $ docker run --entrypoint /bin/sh $docker_image_id
  • Generate processor (_processor.py):

    1. $ ank gen_processor -fs settings.yml
  • Generate setting (_setting.yml):

    1. $ ank gen_setting -fs _setting.yml
  • Run Service:

    1. from ank import program_loader
    2. program_loader.main(file_setting='settings.yml')

    or

    1. $ ank run -fs settings.yml

Base Apps:

  • PipeApp: Pipeline App.
  • APIApp: REST-API interface using flask.
  • ScheduleApp: Using crondtab-time format to set schedule.

Component Apps:

  • LogApp: Log every messages.
  • JoinApp: Join messages into one.
  • SplitApp: Split message.
  • —-Consumer: Get message from queue.
  • —-Producer: Push message to queue.

TODO

  • base apps: Base, API, Schedule
  • supported chain: join, split
  • supported consumer/producer: redis, kafka, zmq, rabbitmq
  • supported callback success/fail
  • generate: processor.py, settings.yml
  • management/admin
  • build/deploy
  • sample: PipeApp, ScheduleApp, APIApp
  • test