项目作者: marier-nico

项目描述 :
Pythonic event-processing library based on decorators
高级语言: Python
项目地址: git://github.com/marier-nico/event-processor.git
创建时间: 2021-03-18T20:39:00Z
项目社区:https://github.com/marier-nico/event-processor

开源协议:MIT License

下载


Process Events In Style

build
coverage
code-size
docs

This library aims to simplify the common pattern of event processing. It simplifies the process of filtering,
dispatching and pre-processing events as well as injecting dependencies in event processors.

The only requirement is that your events are regular python dictionaries.

Take a look at the following examples to get an overview of the features available! Of course, you can mix and combine
them in any way you like to create more complex scenarios.

  1. from event_processor import EventProcessor, Event
  2. from event_processor.filters import Eq
  3. event_processor = EventProcessor()
  4. @event_processor.processor(Eq("service.type", "service_a"))
  5. def process_service_a(event: Event):
  6. return event["service"]["status"] == "up"
  7. @event_processor.processor(Eq("service.type", "service_b"))
  8. def process_service_b(event: Event):
  9. return event["authorized"]
  10. service_a_event = {
  11. "service": {
  12. "type": "service_a",
  13. "status": "down"
  14. }
  15. }
  16. service_b_event = {
  17. "service": {
  18. "type": "service_b",
  19. "authorized": False
  20. }
  21. }
  22. event_processor.invoke(service_a_event) # False
  23. event_processor.invoke(service_b_event) # False

Documentation

Find the full documentation on Read the Docs.