-
Notifications
You must be signed in to change notification settings - Fork 202
Open
Description
I have a big sync process running from event received by AMQ,
It finished the processing successfuly but when trying to send next request it getting :
"file": "***event_manager/rabbitmq_emitter.py",
"line": 50,
"function_name": "emit",
"status_code": 500,
"detail": "Server connection reset: ConnectionResetError(104, 'Connection reset by peer')"
The issue is that other services that uses the same pattern and are async processing are working fine.
i can put here example to my code :
`
main:
await config_manager.collect()
event_factory=EventFactory(config_manager)
event_handler=PGSEventHandler()
connection = await event_factory.connect_emitter()
dispatcher=PGSEventDispatcher(emitter=event_factory.emitter)
dispatcher.set_handler(event_global_name="genetic_analysis", handler=event_handler)
listener_id=event_factory.create_listener(dispatcher=dispatcher, queue_name=QueueName.PGS_WORKER)
# await mock_emit(event_factory)
listen_task = asyncio.create_task(event_factory.connect_listener(queue_name=QueueName.PGS_WORKER))
await listen_task
# await event_factory.connect_listener(queue_name=QueueName.PGS_WORKER)
if __name__=="__main__":
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
asyncio.run(main())`
Listener :
import json
import asyncio
import aio_pika
from common_ai.errors.event_exceptions import ListenerInvocationError
from common_ai.errors.rabbit_exceptions import RabbitMQConnectionError, RabbitMQChannelError
from common_ai.events.models.base_event_dispatcher import BaseEventDispatcher
from common_ai.events.models.base_event_listener import BaseEventListener
from common_ai.events.models.base_event_model import BaseEventModel
class RabbitMQListener(BaseEventListener):
def __init__(self, queue_name, dispatcher: BaseEventDispatcher, host, prefetch_count: int):
super().__init__(dispatcher=dispatcher, queue_name=queue_name, host=host)
self.connection = None
self.channel = None
self.prefetch_count = prefetch_count
self.heartbeat_task = None
self.stop_event = asyncio.Event()
self.message_queue = asyncio.Queue() # 🔄 Thread-safe async queue
async def connect(self):
"""Establish a connection to RabbitMQ."""
try:
self.logger.debug(f"🔄 (Take 4) Connecting to RabbitMQ at {self.host}...")
self.connection = await aio_pika.connect_robust(self.host, timeout=100000)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=self.prefetch_count)
self.logger.info("✅ RabbitMQ connection established.")
# Start async heartbeat task
if not self.heartbeat_task:
self.heartbeat_task = asyncio.create_task(self.keep_rabbitmq_alive())
# Start message processor task
asyncio.create_task(self.process_messages())
except Exception as error:
raise RabbitMQConnectionError(queue_host=self.host, extra_details=str(error))
async def keep_rabbitmq_alive(self):
"""Async heartbeat task to prevent RabbitMQ disconnections."""
while not self.stop_event.is_set():
try:
await asyncio.sleep(30)
if self.connection and not self.connection.is_closed:
await self.connection.ready()
self.logger.debug("✅ Sent RabbitMQ heartbeat.")
else:
self.logger.warning("⚠️ RabbitMQ connection lost. Reconnecting...")
await self.connect()
except Exception as e:
self.logger.error(f"❌ Heartbeat failed: {e}")
await self.connect()
async def callback(self, message: aio_pika.IncomingMessage):
"""Receives messages from RabbitMQ and adds them to the processing queue."""
try:
async with message.process(requeue=False, ignore_processed=True):
self.logger.info(f"📩 Received message: {message.body.decode()}")
event_data = json.loads(message.body)
event_model = BaseEventModel(**event_data)
# 🔄 Add the message to the async processing queue
await self.message_queue.put((message, event_model))
except Exception as error:
raise ListenerInvocationError(extra_details=str(error))
async def process_messages(self):
"""Processes messages from the queue on the same event loop."""
while not self.stop_event.is_set():
try:
message, event_model = await self.message_queue.get()
# ✅ Ensure the dispatcher is awaited properly
dispatcher_results = await self.dispatcher.dispatch(event_model)
if dispatcher_results:
self.logger.info("✅ Message processed successfully.")
else:
self.logger.warning("⚠️ Message processing failed.")
except Exception as error:
self.logger.error(f"❌ Error processing message: {error}")
async def start_listening(self):
"""Start listening for messages in the queue."""
try:
queue = await self.channel.get_queue(self.queue_name.value, ensure=True)
await queue.consume(self.callback)
self.logger.info("🎧 Listening for messages...")
await self.stop_event.wait() # Keeps the event loop running
except Exception as error:
raise RabbitMQChannelError(extra_details=str(error))
async def close(self):
"""Gracefully close the connection and stop background tasks."""
try:
self.stop_event.set()
if self.heartbeat_task:
self.heartbeat_task.cancel()
if self.connection:
await self.connection.close()
self.logger.info("🔴 RabbitMQ connection closed.")
except Exception as error:
raise RabbitMQConnectionError(extra_details=str(error))
Emitter :
import aio_pika
import json
from common_ai.events.models.base_event_emitter import BaseEmitter
from common_ai.events.models.base_event_model import BaseEventModel
from common_ai.errors.event_exceptions import EventEmitterNotConfiguredError
from common_ai.errors.rabbit_exceptions import RabbitMQConnectionError, RabbitMQPublishError
from common_ai.utils.e_logger import ELogger
from common_ai.events.models.base_event_model import EventExchangeType
ELogger.initialize_logger()
class RabbitMQEmitter(BaseEmitter):
"""Simple RabbitMQ Emitter for emitting events without tiers."""
def __init__(self, connection_url):
self.config = None
super().__init__(connection_url=connection_url)
self.connection = None
self.channel = None
async def connect(self) -> None:
try:
"""Establish a connection to RabbitMQ and create a channel."""
self.connection = await aio_pika.connect_robust(self.connection_url,timeout=1000000)
self.channel = await self.connection.channel()
except Exception as error:
raise RabbitMQConnectionError(extra_details=str(error))
async def emit(self, event:BaseEventModel, exchange: EventExchangeType =EventExchangeType.DIRECT) -> None:
f"""
Emit an event to RabbitMQ.
Args:
:param event: {BaseEventModel} Event to emit
:param exchange: str Exchange to emit
"""
try:
if not self.channel:
raise EventEmitterNotConfiguredError(detail="Call 'connect' before emitting events.")
exchange = str(exchange)
exchange = await self.channel.get_exchange(exchange)
message = aio_pika.Message(body=json.dumps(event.dict(),default=str).encode())
routing_key = event.event_type.event_global_name
await exchange.publish(
message, routing_key=routing_key
)
except Exception as error:
raise RabbitMQPublishError(detail=str(error))
async def close(self) -> None:
"""Close the channel and connection."""
try:
if not self.connection or not self.channel:
raise RuntimeError("Emitter is not connected. Call 'connect' before attempting to close.")
if self.channel:
await self.channel.close()
if self.connection:
await self.connection.close()
except Exception as error:
raise EventEmitterNotConfiguredError(detail=str(error))
Handler :
class GeneProcessingEventHandler(BaseEventHandler):
def __init__(self,emitter:BaseEmitter):
super().__init__(event_types_acceptable=[EventType(
event_global_id=str(uuid5(HOST_NAMESPACE, "gene_processing")),
event_global_name="gene_processing",
event_priority=3,
)],event_emitter=emitter)
self.config_manager = Config()
self.handler_id = uuid5(HOST_NAMESPACE, "gene_processing")
async def handle_event(self, event: BaseEventModel) -> Any:
if not any(event_type.event_global_id == event.event_type.event_global_id
for event_type in self.event_types_acceptable):
raise EventTypeNotSupportedError
test_id:str = event.payload.get('test_id', None)
if not test_id:
raise ApplicationException(detail="Event without test_id", status_code=500)
genetic_object = self.run_in_thread(test_id)
return genetic_object
def run_in_thread(self, test_id: str):
"""Runs async function in a separate thread with a dedicated event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(self._get_and_generate(test_id))
loop.close()
return result
async def _get_and_generate(self, test_id: str) -> Dict[str, Any]:
"""Handles async data retrieval and synchronous processing inside a thread."""
try:
print(f"_get_and_generate!!!")
# 🔹 Get genetic data (async call)
pgs_data, snps_data, ancestry_data, traits_config, meta_config = await self.__get_genetic_data(test_id)
# 🔹 Process genetic data (sync call)
with self.lock: # 🔒 Ensure thread safety if needed
genetic_object: Dict[str, Any] = GeneRenovationService.generate_genetic_object(
test_id=test_id,
pgs_obj=pgs_data,
snps_data=snps_data,
anc_obj=ancestry_data,
traits_config_csv=traits_config,
meta_strengths_config=meta_config
)
return genetic_object
except Exception as e:
print(f"❌ Error processing genetic data: {e}")
return {}
`
Any idea why it happens ?
The process is very long (5 hours) can't be async process .
Metadata
Metadata
Assignees
Labels
No labels