项目作者: phip123

项目描述 :
Simple stream processing library for synchronous or parallel and non-distributed execution.
高级语言: Python
项目地址: git://github.com/phip123/glimmer.git
创建时间: 2019-12-23T20:21:21Z
项目社区:https://github.com/phip123/glimmer

开源协议:MIT License

下载


glimmer

glimmer is a library that helps developers build processing pipelines that do not have the need to be computed in a distributed fashion but are easy and fast to setup and still profit of the
declerative way of designing processing pipelines and the flexibility of combining different sources, sinks and operators.

API

Pypeline consists of the following basic blocks:

  • Nodes are the processing units and there are three different types:

    1. Source: Nodes that generate data to process
    2. Operator: Nodes that receive and output data
    3. Sink: Nodes that only consume incoming data.
  • Topology is the internal representation for the dataflow and represents a DAG,
    which describes the way nodes interact with each other.

  • Environments execute a given topology.

glimmer currently offers two modes of execution: synchronous and parallel.

  • In synchronous mode there can be only one source, one sink and multiple operators between.
  • The parallel mode supports every valid DAG, which means: multiple sources, multiple sinks, joins, merges, …

The following part will show real quick to get either of them running.

Synchronous mode first (you can find this also under examples/readme/synchronous.py):
This example just reverses the string ‘hello, world!’ and prints it.
To keep it short we use the function-based approach, which has its drawbacks but is sufficient for
stateless operations

  1. import glimmer.processing.factory as factory
  2. hello_source = factory.mk_src(lambda: 'hello')
  3. world_op = factory.mk_op(lambda x: f'{x}, world!')
  4. rev_op = factory.mk_op(lambda x: x[::-1])
  5. print_sink = factory.mk_sink(lambda x: print(x))
  6. # Connect the nodes, in glimmer each nodes keeps track of its in- and outputs
  7. # The '|' operator modifies the state of each node
  8. (hello_source | world_op | rev_op | print_sink)
  9. # Because we can step through the whole graph with one node, we made a helper function to
  10. # generate a read-to-use environment, which also generates the topology for us
  11. env = factory.mk_synchronous_env(hello_source)
  12. # Start the environment in its own process
  13. env.start()
  14. # Hit enter to stop and close env
  15. input()
  16. env.stop()
  17. env.close()

You can get a synchronous environment up rather quickly thanks to some factory functions.
In case you want more sophisticated nodes, want to use a logger, potentially your own, you need to checkout our examples
located in examples. Most notable is that if you decide to go with our class-based approach, you can keep track of state
and use Context to easily access env-variables and pass custom properties to the nodes.

Next up is the parallel environment which does the same as the synchronous one, but uses multiple sources.
(you can find this also under examples/readme/parallel.py):

  1. from time import sleep
  2. import glimmer.processing.factory as factory
  3. world_idx = -1
  4. hello_idx = -1
  5. def world():
  6. sleep(0.1)
  7. global world_idx
  8. world_idx += 1
  9. return f', world! - {world_idx}'
  10. def hello():
  11. sleep(1)
  12. global hello_idx
  13. hello_idx += 1
  14. return f'{hello_idx} - hello'
  15. hello_source = factory.mk_src(hello, node_name='hello')
  16. world_source = factory.mk_src(world, node_name='world')
  17. # In case a node receives multiple nodes, it gets a dict that contains each nodes output
  18. # To access a node's output just use its name as key.
  19. combine_op = factory.mk_op(lambda x: x[hello_source.name] + x[world_source.name], node_name='combine')
  20. rev_op = factory.mk_op(lambda x: x[::-1], node_name='rev')
  21. print_sink = factory.mk_sink(lambda x: print(x), node_name='print')
  22. print_len_sink = factory.mk_sink(lambda x: f'Length: {print(len(x))}', node_name='print_len')
  23. # While the '|' operator takes the left nodes output and sets it as the right ones input
  24. # We provide methods that cover the opposite direction, this is helpful in cases where
  25. # you want to provide one node with multiple inputs
  26. combine_op.receive_from([hello_source, world_source])
  27. # Because nodes keep track of their in- and outputs, we can continue to connect nodes starting
  28. # from the 'combine_op' node
  29. # It's also possible to pass a list of nodes to the '|' operator
  30. (combine_op | rev_op | [print_sink, print_len_sink])
  31. # As in the synchronous example it is enough to pass all sources to generate an environment
  32. env = factory.mk_parallel_env([hello_source, world_source])
  33. # Start the environment in its own process
  34. env.start()
  35. # Hit enter to stop and close env
  36. input()
  37. env.stop()

If you run the code, you will see that the combine_op will wait for both inputs. This means that the order of outputs is kept
over the complete topology.

Context

An important aspect for us is the configurability of each individual nodes.

To allow the desired level of configurability, we created the Context.
This class is used for providing each node with
environment variables, which can be OS env variables or loaded from a .yaml file. This allows us to open on every node a different connection to for example redis.

Examples

More examples for using glimmer are located in the examples folder.

Build

Install with make

  1. make venv

or create and activate a new virtual environment

  1. virtualenv .venv
  2. source .venv/bin/activate

Install requirements

  1. pip install -r requirements.txt

For running tests you need to run additionally

  1. pip install -r requirements-dev.txt