Python data pipeline
Python Streaming system, REST-API and Schedule task using queue message(rabbitMQ, zeroMQ, kafka)
Each processor is a chain, put it together to make a pipeline.
processor - processor
/ processor
processor - processor
\ processor
processor \
processor - processor
processor /
processor - processor processor - processor processor - processor
processor - processor processor / processor <
processor - processor processor - processor processor - processor
message1 \
message2 - [message1, message2, message3]
message3 /
/ message1
[message1, message2, message3] - message2
\ message3
Install:
git clone git@github.com:sunary/ank.git
python setup.py install
pip install ank
Create new Service:
$ ank -c TestService
$ cd TestService
then continue using below commands
Edit app (processor.py):
Example:
from ank.components.pipe_app import PipeApp
class ExampleApp(PipeApp):
def start(self):
for i in range(100):
self.chain_process(i)
def process(self, message=None):
'''
Args:
message: {'content': (*) 'content of message',
'flags': (list|tuple) 'define next process will be use'}
raise TypeError if you don't declare this in return of before branching-processor
if 'flags' == [True, True]: process both in next branching-processors
if 'flags' == [True, False]: process 1st processor in next branching-processors
if 'flags' == [False, True]: process 2nd processor in next branching-processors
if 'flags' == [False, False]: no processor, stop chain
is None: stop chain
'''
return message['content'] + 1
Edit services and chains (services.yml):
Syntax:
services:
Object1:
- class: module.name.ClassName
- arguments: [$Object, %variable%]
AnkChain2:
- class: ank.components.module_name.XXXApps
- arguments: ~
chains:
- Object1
- AnkChain2
Example:
services:
StartApp:
class: processor.StartApp
arguments: [$MongoClient, $Redis, '%batch_size%']
Mongodb:
class: pymongo.MongoClient
arguments: ['%mongo_host%', '%mongo_port%']
Redis:
class: redis.client.StrictRedis
arguments: ['%redis_host%', '%redis_port%']
OtherApp:
class: processor.OtherApp
arguments: ~
LogApp:
class: components.log_app.LogApp
arguments: ~
chains:
- StartApp
- LogApp
- OtherApp
- [OddApp, EvenApp] # will be processed depend on key `flags`
- OtherApp
ANK will read top-down chains
, find correspond services
and get parameters from settings.yml
.
Generate and edit setting (settings.yml):
$ ank create -c BaseApp
Example:
parameters:
mongo_host: localhost
mongo_port: 27017
redis_host: localhost
redis_port: 6379
queue_uri: 'amqp://admin:admin@localhost:5672/'
exchange_name: InputExchange
routing_key: ExchangeToQueue
batch_size: 100
Help you create settings
template file. Just rename from _settings.yml
to settings.yml
and fill in values.
Build Service (create docker image) and run:
$ ank build
$ docker run --entrypoint /bin/sh $docker_image_id
Generate processor (_processor.py):
$ ank gen_processor -fs settings.yml
Generate setting (_setting.yml):
$ ank gen_setting -fs _setting.yml
Run Service:
from ank import program_loader
program_loader.main(file_setting='settings.yml')
or
$ ank run -fs settings.yml