Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Allow users to control which groups a client is added to #7

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [1.1.0] - 2020-02-24
### Added
- Added support for user control over Channel Layer groups in subscriptions and made it the default
### Removed
- Removed `SubscriptionEvent` and `ModelSubscriptionEvent` classes as well as `post_save_subscription` and `post_delete_subscription` signal handlers.

## [1.0.2] - 2019-12-11
### Added
- Fixed bug causing subscriptions with variables to fail
Expand Down
179 changes: 62 additions & 117 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,42 +61,43 @@ A plug-and-play GraphQL subscription implementation for Graphene + Django built
})
```

5. Connect signals for any models you want to create subscriptions for
5. Add `SubscriptionModelMixin` to any models you want to enable subscriptions for

```python
# your_app/signals.py
from django.db.models.signals import post_save, post_delete
from graphene_subscriptions.signals import post_save_subscription, post_delete_subscription
# your_app/models.py
from graphene_subscriptions.models import SubscriptionModelMixin

from your_app.models import YourModel

post_save.connect(post_save_subscription, sender=YourModel, dispatch_uid="your_model_post_save")
post_delete.connect(post_delete_subscription, sender=YourModel, dispatch_uid="your_model_post_delete")

# your_app/apps.py
from django.apps import AppConfig

class YourAppConfig(AppConfig):
name = 'your_app'

def ready(self):
import your_app.signals
class YourModel(SubscriptionModelMixin, models.Model):
# ...
```

6. Define your subscriptions and connect them to your project schema

```python
#your_project/schema.py
import graphene
from graphene_django.types import DjangoObjectType

from your_app.models import YourModel


class YourModelType(DjangoObjectType):
class Meta:
model = YourModel


from your_app.graphql.subscriptions import YourSubscription
class YourModelCreatedSubscription(graphene.ObjectType):
your_model_created = graphene.Field(YourModelType)

def resolve_your_model_created(root, info):
return root.subscribe('yourModelCreated')


class Query(graphene.ObjectType):
base = graphene.String()


class Subscription(YourSubscription):
class Subscription(YourModelCreatedSubscription):
pass


Expand Down Expand Up @@ -125,143 +126,87 @@ class Subscription(graphene.ObjectType):
.map(lambda i: "hello world!")
```

## Responding to Model Events

Each subscription that you define will receive a an `Observable` of `SubscriptionEvent`'s as the `root` parameter, which will emit a new `SubscriptionEvent` each time one of the connected signals are fired.

A `SubscriptionEvent` has two attributes: the `operation` that triggered the event, usually `CREATED`, `UPDATED` or `DELETED`) and the `instance` that triggered the signal.
## Subscribing to Events

Since `root` is an `Observable`, you can apply any `rxpy` operations before returning it.

### Model Created Subscriptions

For example, let's create a subscription called `yourModelCreated` that will be fired whenever an instance of `YourModel` is created. Since `root` receives a new event *every time a connected signal is fired*, we'll need to filter for only the events we want. In this case, we want all events where `operation` is `created` and the event `instance` is an instance of our model.
Most of the time you will want your subscriptions to be able to listen for events that occur in other parts of your application. When you define a subscription resolver, you can use the `subscribe` method of the `root` value to subscribe to a set of events. `subscribe` takes a unique group name as an argument, and returns an `Observable` of all events that are sent to that group. Since the return value of `root.subscribe` is an `Observable`, you can apply any `rxpy` operations and return the result.

```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import CREATED

from your_app.models import YourModel


class YourModelType(DjangoObjectType)
class Meta:
model = YourModel

class CustomSubscription(graphene.ObjectType):
custom_subscription = graphene.String()

class Subscription(graphene.ObjectType):
your_model_created = graphene.Field(YourModelType)

def resolve_your_model_created(root, info):
return root.filter(
lambda event:
event.operation == CREATED and
isinstance(event.instance, YourModel)
).map(lambda event: event.instance)
def resolve_custom_subscription(root, info):
return root.subscribe('customSubscription')
```

### Model Updated Subscriptions

You can also filter events based on a subscription's arguments. For example, here's a subscription that fires whenever a model is updated:
You can then trigger events from other parts of your application using the `trigger_subscription` helper. `trigger_subscription` takes two arguments: the name of the group to send the event to, and the value to send. Make sure that the value you pass to `trigger_subscription` is compatible with the return type you've defined for your subscription resolver, and is either a Django model or a JSON serializable value.

```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import UPDATED
from graphene_subscriptions.events import trigger_subscription

from your_app.models import YourModel
trigger_subscription('trigger_subscription', 'hello world!')
```


class YourModelType(DjangoObjectType)
class Meta:
model = YourModel
## Model Events

Often you'll want to define subscriptions that fire when a Django model is created, updated, or deleted. `graphene-subscriptions` includes a handy model mixin that configures the triggering of these events for you. You can use it by configuring your model to inherit from `SubscriptionModelMixin`.

class Subscription(graphene.ObjectType):
your_model_updated = graphene.Field(YourModelType, id=graphene.ID())
```python
# your_app/models.py
from graphene_subscriptions.models import SubscriptionModelMixin

def resolve_your_model_updated(root, info, id):
return root.filter(
lambda event:
event.operation == UPDATED and
isinstance(event.instance, YourModel) and
event.instance.pk == int(id)
).map(lambda event: event.instance)
class YourModel(SubscriptionModelMixin, models.Model):
# ...
```

### Model Updated Subscriptions

Defining a subscription that is fired whenever a given model instance is deleted can be accomplished like so
`SubscriptionModelMixin` will create unique group names for created, updated, and deleted events based on the name of your model, and will send events to these groups automatically.

```python
import graphene
from graphene_django.types import DjangoObjectType
from graphene_subscriptions.events import DELETED

from your_app.models import YourModel
## Model Created Subscriptions

`SubscriptionModelMixin` automatically sends model created events to a unique group called `"<yourModelName>Created"`. For example, if your model is called `YourModel`, then model created events will be sent to the group `"yourModelCreated"`.

class YourModelType(DjangoObjectType)
class Meta:
model = YourModel
You can create a model created subscription that listens for events in this group and returns them to the client by using the `root.subscribe` helper, like so:

```python
class YourModelCreatedSubscription(graphene.ObjectType):
your_model_created = graphene.Field(YourModelType)

class Subscription(graphene.ObjectType):
your_model_deleted = graphene.Field(YourModelType, id=graphene.ID())

def resolve_your_model_deleted(root, info, id):
return root.filter(
lambda event:
event.operation == DELETED and
isinstance(event.instance, YourModel) and
event.instance.pk == int(id)
).map(lambda event: event.instance)
def resolve_your_model_created(root, info):
return root.subscribe('yourModelCreated')
```


## Custom Events
### Model Updated Subscriptions

Sometimes you need to create subscriptions which responds to events other than Django signals. In this case, you can use the `SubscriptionEvent` class directly. (Note: in order to maintain compatibility with Django channels, all `instance` values must be json serializable)
Much like model created events, `SubscriptionModelMixin` automatically sends model updated events to a group called `"<yourModelName>Updated.<your_model_id>"`. For example, if your model is called `YourModel` and an instance with `pk == 1` is updated, then a model updated event will be sent to the group `"yourModelUpdated.1"`.

For example, a custom event subscription might look like this:
Your subscription resolver can send model updated events from this group to the client by using the `root.subscribe` helper:

```python
import graphene
class YourModelUpdatedSubscription(graphene.ObjectType):
your_model_updated = graphene.Field(YourModelType, id=graphene.String())

CUSTOM_EVENT = 'custom_event'
def resolve_your_model_updated(root, info, id):
return root.subscribe(f'yourModelUpdated.{id}')
```

class CustomEventSubscription(graphene.ObjectType):
custom_subscription = graphene.Field(CustomType)

def resolve_custom_subscription(root, info):
return root.filter(
lambda event:
event.operation == CUSTOM_EVENT
).map(lambda event: event.instance)
### Model Deleted Subscriptions

In a similar manner, `SubscriptionModelMixin` automatically sends model deleted events to a group called `"<yourModelName>Deleted.<your_model_id>"`. For example, if your model is called `YourModel` and an instance with `pk == 1` is deleted, then a model deleted event will be sent to the group `"yourModelDeleted.1"`.

# elsewhere in your app:
from graphene_subscriptions.events import SubscriptionEvent
Your subscription resolver can send model deleted events from this group to the client by using the `root.subscribe` helper:

event = SubscriptionEvent(
operation=CUSTOM_EVENT,
instance=<any json-serializable value>
)
```python
class YourModelDeletedSubscription(graphene.ObjectType):
your_model_deleted = graphene.Field(YourModelType, id=graphene.String())

event.send()
def resolve_your_model_deleted(root, info, id):
return root.subscribe(f'yourModelDeleted.{id}')
```


## Production Readiness

This implementation was spun out of an internal implementation I developed which we've been using in production for the past 6 months at [Jetpack](https://www.tryjetpack.com/). We've had relatively few issues with it, and I am confident that it can be reliably used in production environments.

However, being a startup, our definition of production-readiness may be slightly different from your own. Also keep in mind that the scale at which we operate hasn't been taxing enough to illuminate where the scaling bottlenecks in this implementation may hide.

If you end up running this in production, please [reach out](https://twitter.com/jayden_windle) and let me know!


## Contributing

PRs and other contributions are very welcome! To set up `graphene_subscriptions` in a development envrionment, do the following:
Expand Down
74 changes: 37 additions & 37 deletions graphene_subscriptions/consumers.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
import functools
import json

from django.utils.module_loading import import_string
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from graphene_django.settings import graphene_settings
from graphql import parse
from asgiref.sync import async_to_sync
from channels.consumer import SyncConsumer
from channels.exceptions import StopConsumer
from rx import Observable
from channels.generic.websocket import JsonWebsocketConsumer
from rx.subjects import Subject
from django.core.serializers import deserialize

from graphene_subscriptions.events import SubscriptionEvent


stream = Subject()
from graphene_subscriptions.events import deserialize_value


# GraphQL types might use info.context.user to access currently authenticated user.
Expand All @@ -34,18 +23,28 @@ def get(self, item):
return self.data.get(item)


class GraphqlSubscriptionConsumer(SyncConsumer):
def websocket_connect(self, message):
async_to_sync(self.channel_layer.group_add)("subscriptions", self.channel_name)
class GraphqlSubscriptionConsumer(JsonWebsocketConsumer):
groups = {}

def subscribe(self, name):
stream = Subject()
if name not in self.groups:
self.groups[name] = stream
async_to_sync(self.channel_layer.group_add)(name, self.channel_name)

return stream

self.send({"type": "websocket.accept", "subprotocol": "graphql-ws"})
def connect(self):
self.accept("graphql-ws")

def websocket_disconnect(self, message):
self.send({"type": "websocket.close", "code": 1000})
raise StopConsumer()
def disconnect(self, close_code):
for group in self.groups:
async_to_sync(self.channel_layer.group_discard)(
group,
self.channel_name
)

def websocket_receive(self, message):
request = json.loads(message["text"])
def receive_json(self, request):
id = request.get("id")

if request["type"] == "connection_init":
Expand All @@ -62,7 +61,7 @@ def websocket_receive(self, message):
operation_name=payload.get("operationName"),
variables=payload.get("variables"),
context=context,
root=stream,
root=self,
allow_subscriptions=True,
)

Expand All @@ -74,24 +73,25 @@ def websocket_receive(self, message):
elif request["type"] == "stop":
pass

def signal_fired(self, message):
stream.on_next(SubscriptionEvent.from_dict(message["event"]))
def subscription_triggered(self, message):
group = message['group']

if group in self.groups:
stream = self.groups[group]
value = deserialize_value(message['value'])

stream.on_next(value)

def _send_result(self, id, result):
errors = result.errors

self.send(
self.send_json(
{
"type": "websocket.send",
"text": json.dumps(
{
"id": id,
"type": "data",
"payload": {
"data": result.data,
"errors": list(map(str, errors)) if errors else None,
},
}
),
"id": id,
"type": "data",
"payload": {
"data": result.data,
"errors": list(map(str, errors)) if errors else None,
},
}
)
Loading