-
Notifications
You must be signed in to change notification settings - Fork 3.6k
feat: add BaileysMessageProcessor for improved message handling #1726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add BaileysMessageProcessor for improved message handling #1726
Conversation
…ntegrate rxjs for asynchronous processing
Reviewer's GuideThis PR introduces a BaileysMessageProcessor leveraging RxJS to asynchronously process incoming message batches, integrates it into the BaileysStartupService for decoupled handling, and adds the rxjs dependency. Sequence diagram for asynchronous message processing with BaileysMessageProcessorsequenceDiagram
participant BaileysStartupService
participant BaileysMessageProcessor
participant onMessageReceive (Handler)
BaileysStartupService->>BaileysMessageProcessor: processMessage(payload, settings)
BaileysMessageProcessor->>BaileysMessageProcessor: messageSubject.next({messages, type, requestId, settings})
BaileysMessageProcessor-->>onMessageReceive (Handler): onMessageReceive({messages, type, requestId}, settings) (via RxJS pipeline)
onMessageReceive (Handler)-->>BaileysMessageProcessor: (async processing complete)
Class diagram for MountProps type used in BaileysMessageProcessorclassDiagram
class MountProps {
+onMessageReceive(payload: MessageUpsertPayload, settings: any): Promise<void>
}
BaileysMessageProcessor ..> MountProps : mount({ onMessageReceive })
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Santosl2 - I've reviewed your changes - here's some feedback:
- Consider guarding in processMessage to log or throw if mount wasn’t called first, so you don’t emit into an uninitialized Subject.
- To prevent unbounded queue growth under load, think about adding a concurrency/backpressure strategy (e.g. mergeMap with a concurrency limit) instead of using an unbounded Subject with concatMap.
- Once BaileysMessageProcessor is fully in place, remove the commented-out direct call to messageHandle to keep the event loop code clean.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider guarding in processMessage to log or throw if mount wasn’t called first, so you don’t emit into an uninitialized Subject.
- To prevent unbounded queue growth under load, think about adding a concurrency/backpressure strategy (e.g. mergeMap with a concurrency limit) instead of using an unbounded Subject with concatMap.
- Once BaileysMessageProcessor is fully in place, remove the commented-out direct call to messageHandle to keep the event loop code clean.
## Individual Comments
### Comment 1
<location> `src/api/integrations/channel/whatsapp/baileysMessage.processor.ts:44` </location>
<code_context>
+
+ processMessage(payload: MessageUpsertPayload, settings: any) {
+ const { messages, type, requestId } = payload;
+ this.messageSubject.next({ messages, type, requestId, settings });
+ }
+
</code_context>
<issue_to_address>
No backpressure or queue size limit is enforced on messageSubject.
This could cause memory issues if many messages are processed rapidly. Consider implementing a queue size limit or backpressure mechanism.
Suggested implementation:
```typescript
// Limit the queue to 100 messages (adjust as needed)
protected readonly MESSAGE_QUEUE_LIMIT = 100;
protected messageQueue: Array<{ messages: any; type: any; requestId: any; settings: any }> = [];
protected messageSubject = new Subject<{
```
```typescript
processMessage(payload: MessageUpsertPayload, settings: any) {
const { messages, type, requestId } = payload;
if (this.messageQueue.length >= this.MESSAGE_QUEUE_LIMIT) {
// Drop the oldest message to make room for the new one (or log/drop new, as preferred)
this.processorLogs.warn(
`Message queue full (${this.MESSAGE_QUEUE_LIMIT}). Dropping oldest message.`
);
this.messageQueue.shift();
}
const message = { messages, type, requestId, settings };
this.messageQueue.push(message);
this.messageSubject.next(message);
}
```
</issue_to_address>
### Comment 2
<location> `src/api/integrations/channel/whatsapp/baileysMessage.processor.ts:48` </location>
<code_context>
+ }
+
+ onDestroy() {
+ this.subscription?.unsubscribe();
+ this.messageSubject.complete();
+ }
+}
</code_context>
<issue_to_address>
Unsubscribing and completing the Subject may cause issues if processMessage is called after onDestroy.
Calling processMessage after onDestroy will throw an error due to the completed Subject. Please guard processMessage or document its usage constraints.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
||
processMessage(payload: MessageUpsertPayload, settings: any) { | ||
const { messages, type, requestId } = payload; | ||
this.messageSubject.next({ messages, type, requestId, settings }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (performance): No backpressure or queue size limit is enforced on messageSubject.
This could cause memory issues if many messages are processed rapidly. Consider implementing a queue size limit or backpressure mechanism.
Suggested implementation:
// Limit the queue to 100 messages (adjust as needed)
protected readonly MESSAGE_QUEUE_LIMIT = 100;
protected messageQueue: Array<{ messages: any; type: any; requestId: any; settings: any }> = [];
protected messageSubject = new Subject<{
processMessage(payload: MessageUpsertPayload, settings: any) {
const { messages, type, requestId } = payload;
if (this.messageQueue.length >= this.MESSAGE_QUEUE_LIMIT) {
// Drop the oldest message to make room for the new one (or log/drop new, as preferred)
this.processorLogs.warn(
`Message queue full (${this.MESSAGE_QUEUE_LIMIT}). Dropping oldest message.`
);
this.messageQueue.shift();
}
const message = { messages, type, requestId, settings };
this.messageQueue.push(message);
this.messageSubject.next(message);
}
this.subscription?.unsubscribe(); | ||
this.messageSubject.complete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Unsubscribing and completing the Subject may cause issues if processMessage is called after onDestroy.
Calling processMessage after onDestroy will throw an error due to the completed Subject. Please guard processMessage or document its usage constraints.
Por favor, resolva os conflitos apontados! |
PT
Essa implementação adiciona uma fila reativa para o processamento de mensagens recebidas pelo Baileys, utilizando RxJS. O objetivo é garantir que os lotes de mensagens sejam processados de forma sequencial, evitando concorrência e race conditions que poderiam ocorrer em fluxos com alto volume.
Com o uso do
Subject
em conjunto comconcatMap
, garantimos que uma nova mensagem só será processada após a conclusão da anterior. Isso é especialmente importante em cenários onde oonMessageReceive
realiza operações assíncronas que não devem ocorrer em paralelo (ex: gravação em banco, chamadas de API, etc).Além disso, o tratamento de erros impede que falhas isoladas interrompam o fluxo completo de mensagens.
Essa abordagem torna o processamento mais previsível, seguro e resiliente, principalmente sob carga.
ENHere’s the English version for the PR:
This implementation introduces a reactive queue for processing incoming Baileys messages using RxJS. The goal is to ensure that message batches are processed sequentially, avoiding concurrency issues and race conditions that could occur under high message volume.
By leveraging a
Subject
combined withconcatMap
, we ensure that each batch is only processed after the previous one completes. This is critical in scenarios whereonMessageReceive
performs asynchronous operations that must not run in parallel (e.g., database writes, external API calls).Error handling is also included to prevent a single failure from breaking the entire message stream.
Overall, this approach improves the reliability, safety, and predictability of message processing, especially under heavy load.
Summary by Sourcery
Introduce BaileysMessageProcessor leveraging RxJS to asynchronously batch and process incoming WhatsApp messages, integrate it into BaileysStartupService for message handling and cleanup
New Features:
Enhancements:
Build: