Skip to content

improve examples #272

@ZenenTreadwell

Description

@ZenenTreadwell

One of the next patterns I'm trying to implement is a notification system. I'm not familiar with async programming as a paradigm so it would be great to have some example of how this might be done with SSE.

I'd be happy to build out an example but I am still adjusting. This is my first attempt:

class Listener:
    def __init__(self):
        self.messages = []

    def send(self, message):
        self.messages.append(message)

    async def receive(self):
        while len(self.messages) == 0:
            await asyncio.sleep(0.1)

        return self.messages.pop()


class Notifier:
    def __init__(self):
        self.listeners = []

    async def listen(self):
        listener = Listener()
        self.listeners.append(listener)

        while True:
            notification = await listener.receive()
            yield (b'notify', notification)

    def notify(self, message):
        for l in self.listeners:
            l.send(message)

notifier = Notifier()

@app.route('/notifications$')
async def notify(sse=None, **server):
    if sse:
        await sse.send(b'listening', event="notify")

        async for ev, data in notifier.listen():
            await sse.send(data, event=ev)
        await sse.close()


@app.route('/ping$')
async def ping():
        notifier.notify(b'ping!')

I'm opening an EventSource at /notifications and successfully pinging all listening clients, but the event sends multiple times and I generally feel unhappy with how the implementation feels. I feel like there should be some way to call send() and have that resolve the awaiting receive() in Listener without using a message queue and sleep loop, but I'm in unfamiliar territory here.

Suggestions?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions