Skip to content

Commit

Permalink
feat: support a mode to drain pulse queues rather than listen
Browse files Browse the repository at this point in the history
  • Loading branch information
ahal committed Jun 10, 2024
1 parent dfc1e14 commit d62c1cd
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 50 deletions.
25 changes: 21 additions & 4 deletions src/fxci_etl/console.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import sys
from dataclasses import asdict
from pathlib import Path
Expand All @@ -10,7 +9,7 @@

from fxci_etl.config import Config
from fxci_etl.metric.export import export_metrics
from fxci_etl.pulse.listen import listen
from fxci_etl.pulse.consume import drain, listen

APP_NAME = "fxci-etl"

Expand Down Expand Up @@ -42,8 +41,26 @@ def handle(self):
)
return 1

loop = asyncio.get_event_loop()
loop.run_until_complete(listen(config, queue))
listen(config, queue)
return 0


class PulseDrainCommand(ConfigCommand):
name = "pulse drain"
description = "Process events in the specified pulse queue and exit."
arguments = [argument("queue", description="Pulse queue to drain.")]

def handle(self):
config = self.parse_config(self.option("config"))
queue = self.argument("queue")

if queue not in config.pulse.queues:
self.line_error(
f"Queue '{queue}' is not configured! Choose from: {', '.join(config.pulse.queues)}"
)
return 1

drain(config, queue)
return 0


Expand Down
59 changes: 59 additions & 0 deletions src/fxci_etl/pulse/consume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from kombu import Connection, Exchange, Queue

from fxci_etl.config import Config
from fxci_etl.pulse.handlers.base import handlers


def get_connection(config: Config):
pulse = config.pulse
return Connection(
hostname=pulse.host,
port=pulse.port,
userid=pulse.user,
password=pulse.password,
ssl=True,
)


def get_consumer(config: Config, connection: Connection, name: str):
pulse = config.pulse
qconf = pulse.queues[name]
exchange = Exchange(qconf.exchange, type="topic")
exchange(connection).declare(
passive=True
) # raise an error if exchange doesn't exist

queue = Queue(
name=f"queue/{pulse.user}/{name}",
exchange=exchange,
routing_key=qconf.routing_key,
durable=qconf.durable,
exclusive=False,
auto_delete=qconf.auto_delete,
)

callbacks = [
cls(config)
for name, cls in handlers.items()
if config.etl.handlers is None or name in config.etl.handlers
]
consumer = connection.Consumer(queue, auto_declare=False, callbacks=callbacks)
consumer.queues[0].queue_declare()
consumer.queues[0].queue_bind()
return consumer


def listen(config: Config, name: str):
with get_connection(config) as connection:
with get_consumer(config, connection, name):
while True:
try:
connection.drain_events(timeout=None)
except TimeoutError:
pass


def drain_events(config: Config, name: str):
with get_connection(config) as connection:
with get_consumer(config, connection, name):
connection.drain_events(timeout=0)
46 changes: 0 additions & 46 deletions src/fxci_etl/pulse/listen.py

This file was deleted.

0 comments on commit d62c1cd

Please sign in to comment.