项目作者: syfun

项目描述 :
A Python3.7+ port of Apollo Graphql Subscriptions.
高级语言: Python
项目地址: git://github.com/syfun/gql-subscriptions.git
创建时间: 2020-04-21T13:33:23Z
项目社区:https://github.com/syfun/gql-subscriptions

开源协议:MIT License

下载


gql-subscriptions

A Python3.7+ port of Apollo Graphql Subscriptions.

This package contains a basic asyncio pubsub system which should be used only in demo, and other pubsub system(like Redis).

Requirements

Python 3.7+

Installation

pip install gql-subscriptions

This package should be used with a network transport, for example starlette-graphql

Getting started with your first subscription

To begin with GraphQL subscriptions, start by defining a GraphQL Subscription type in your schema:

  1. type Subscription {
  2. somethingChanged: Result
  3. }
  4. type Result {
  5. id: String
  6. }

Next, add the Subscription type to your schema definition:

  1. schema {
  2. query: Query
  3. mutation: Mutation
  4. subscription: Subscription
  5. }

Now, let’s create a simple PubSub instance - it is simple pubsub implementation, based on asyncio.Queue.

  1. from gql_subscriptions import PubSub
  2. pubsub = PubSub()

Now, implement your Subscriptions type resolver, using the pubsub.async_iterator to map the event you need(use python-gql):

  1. from gql_subscriptions import PubSub, subscribe
  2. pubsub = PubSub()
  3. SOMETHING_CHANGED_TOPIC = 'something_changed'
  4. @subscribe
  5. async def something_changed(parent, info):
  6. return pubsub.async_iterator(SOMETHING_CHANGED_TOPIC)

Now, the GraphQL engine knows that somethingChanged is a subscription, and every time we use pubsub.publish over this topic - it will publish it using the transport we use:

  1. pubsub.publish(SOMETHING_CHANGED_TOPIC, {'somethingChanged': {'id': "123" }})

Note that the default PubSub implementation is intended for demo purposes. It only works if you have a single instance of your server and doesn’t scale beyond a couple of connections. For production usage you’ll want to use one of the PubSub implementations backed by an external store. (e.g. Redis).

Filters

When publishing data to subscribers, we need to make sure that each subscriber gets only the data it needs.

To do so, we can use with_filter decorator, which wraps the subscription resolver with a filter function, and lets you control each publication for each user.

  1. ResolverFn = Callable[[Any, Any, Dict[str, Any]], Awaitable[AsyncIterator]]
  2. FilterFn = Callable[[Any, Any, Dict[str, Any]], bool]
  3. def with_filter(filter_fn: FilterFn) -> Callable[[ResolverFn], ResolverFn]
  4. ...

ResolverFn is a async function which returned a typing.AsyncIterator.

  1. async def something_changed(parent, info) -> typing.AsyncIterator

FilterFn is a filter function, executed with the payload(published value), operation info, arugments, and must return bool.

For example, if something_changed would also accept a argument with the ID that is relevant, we can use the following code to filter according to it:

  1. from gql_subscriptions import PubSub, subscribe, with_filter
  2. pubsub = PubSub()
  3. SOMETHING_CHANGED_TOPIC = 'something_changed'
  4. def filter_thing(payload, info, relevant_id):
  5. return payload['somethingChanged'].get('id') == relevant_id
  6. @subscribe
  7. @with_filter(filter_thing)
  8. async def something_changed(parent, info, relevant_id):
  9. return pubsub.async_iterator(SOMETHING_CHANGED_TOPIC)

Channels Mapping

You can map multiple channels into the same subscription, for example when there are multiple events that trigger the same subscription in the GraphQL engine.

  1. from gql_subscriptions import PubSub, subscribe, with_filter
  2. pubsub = PubSub()
  3. SOMETHING_UPDATED = 'something_updated'
  4. SOMETHING_CREATED = 'something_created'
  5. SOMETHING_REMOVED = 'something_removed'
  6. @subscribe
  7. async def something_changed(parent, info):
  8. return pubsub.async_iterator([SOMETHING_UPDATED, SOMETHING_CREATED, SOMETHING_REMOVED])

PubSub Implementations

It can be easily replaced with some other implements of PubSubEngine abstract class.

This package contains a Redis implements.

  1. from gql import subscribe
  2. from gql_subscriptions.pubsubs.redis import RedisPubSub
  3. pubsub = RedisPubSub()
  4. SOMETHING_CHANGED_TOPIC = 'something_changed'
  5. @subscribe
  6. async def something_changed(parent, info):
  7. return pubsub.async_iterator(SOMETHING_CHANGED_TOPIC)

You can also implement a PubSub of your own, by using the inherit PubSubEngine from this package, this is a Reids example.