-
-
Notifications
You must be signed in to change notification settings - Fork 4k
feat(workflows): Engine consolidation to provider architecture #14547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
|
|
The latest updates on your projects. Learn more about Vercel for GitHub. 8 Skipped Deployments
|
|
@claude review this be thourough please |
PR Review: Workflow Engine Provider Architecture Consolidation (#14547)OverviewThis PR consolidates the workflow engine from separate in-memory and Redis modules into a unified provider architecture. This is a significant architectural improvement that modernizes the workflow engine to follow Medusa's provider pattern. Key Changes:
Code Quality Assessment✅ Strengths
|
|
@claude review this with the same approach as the previous review please |
Code Review: Workflow Engine Consolidation to Provider ArchitectureOverviewThis is a significant architectural refactoring that consolidates the workflow engine into a unified module with pluggable storage providers. The PR successfully aligns the workflow engine with Medusa's standard module/provider pattern used by payments, notifications, and other modules. ✅ Strengths1. Architecture & Design
2. Code Quality
3. Error Handling
4. Test Coverage
5. Backward Compatibility
🔍 Issues & RecommendationsCritical Issues1. Memory Leak Risk in Subscriber Management (workflow-orchestrator.ts:673-733)Location: The subscribe method has a warning comment about memory leaks, but the implementation could be more defensive: subscribe({ workflowId, transactionId, subscriber, subscriberId }: SubscribeOptions) {
// Subscribe to distributed notifications when first subscriber is added
if (\!WorkflowOrchestratorService.subscribers.has(workflowId)) {
this.storage_.notificationSubscriber?.subscribe(
workflowId,
this.handleDistributedNotification.bind(this)
)
}Issue: The distributed notification subscriber is never unsubscribed even when the last local subscriber is removed from a workflow (though the unsubscribe at line 782 does handle this). However, there's no automatic cleanup for workflow-scoped subscribers. Recommendation: Consider adding an automatic cleanup mechanism or making the memory leak warning more prominent in the API documentation. 2. Redis Connection Error Handling (redis-workflows-storage.ts:373-429)Location: private async ensureRedisConnection(): Promise<void> {
// ... connection checking logic
if (reconnectTasks.length > 0) {
await promiseAll(reconnectTasks)
}
}Issue: This method is called in Recommendation: Consider adding:
3. Lock Timeout Configuration (redis-workflows-storage.ts:879-894)Location: async #acquireLock(key: string, ttlSeconds: number = 5): Promise<string | null>Issue: The hardcoded 5-second lock TTL might be insufficient for long-running DB operations, especially under heavy load. Recommendation: Make lock TTL configurable via provider options: lockTtl?: number // in seconds, default: 5High Priority Issues4. Race Condition in Timer Cleanup (local-workflows-storage.ts:180-198)Location: private createManagedTimer(callback: () => void | Promise<void>, delay: number): NodeJS.Timeout {
const timer = setTimeout(async () => {
this.pendingTimers.delete(timer)
// ...
}, delay)
this.pendingTimers.add(timer)
return timer
}Issue: There's a tiny race window where the timer could fire before being added to Recommendation: Add the timer to the set before creating the setTimeout: const timer = {} as NodeJS.Timeout
this.pendingTimers.add(timer as any)
const actualTimer = setTimeout(async () => { /* ... */ }, delay)
// Update referenceOr simply document that delay should never be 0. 5. Missing Validation in Provider Loader (providers.ts:71-77)Location: if (options.providers?.length > 1) {
throw new Error(`Workflows module: Multiple providers configured: ${options.providers.map((p) => p.id).join(", ")}`)
}Issue: The error message is good, but the code doesn't validate that providers actually implement the required interfaces. Recommendation: Add interface validation before loading. Medium Priority Issues6. Inconsistent Error Logging (workflow-orchestrator.ts:812-815)Location: Some errors are logged but not thrown, while others are thrown. This inconsistency could make debugging difficult. Recommendation: Establish consistent error handling patterns:
7. Potential Data Loss on Parallel DB/Redis Operations (redis-workflows-storage.ts:652-661)Location: if (hasFinished && \!retentionTime) {
if (\!data.flow.metadata?.parentStepIdempotencyKey) {
await promiseAll([this.deleteFromDb(data), execPipeline()])
}
}Issue: If Recommendation: Consider using a saga pattern or reversing the order (Redis first, then DB). 8. Missing Type Safety in Notification Handler (redis-workflows-storage.ts:174-189)Location: const data = JSON.parse(message)
handler(workflowId, data)Issue: No validation that Recommendation: Add runtime type validation using a schema validator (zod, joi, etc.). Low Priority / Suggestions9. Performance Optimization OpportunityLocation: The 10. Documentation Improvements
11. Type Safety Improvements
🔒 Security ReviewPassed ✅
Notes
🚀 Performance ConsiderationsOptimizations ✅
Concerns
|
carlos-r-l-rodrigues
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing. 🔥
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
Summary
This PR consolidates the workflow engine into a unified provider-based architecture, aligning it with the pattern used by other Medusa modules (payments, notifications, etc.). The new architecture provides a single
@medusajs/workflowsmodule with pluggable storage providers, replacing the previous separateworkflow-engine-inmemoryandworkflow-engine-redismodules.Motivation
Changes
New Packages
@medusajs/workflows@medusajs/workflows-redisArchitecture
Before:
├── @medusajs/workflow-engine-inmemory (standalone module)
└── @medusajs/workflow-engine-redis (standalone module)
After:
├── @medusajs/workflows (unified module)
│ ├── WorkflowsModuleService (implements IWorkflowEngineService)
│ ├── WorkflowOrchestratorService (workflow execution)
│ └── LocalWorkflowsStorage (default in-memory provider)
└── @medusajs/workflows-redis (optional provider)
└── RedisWorkflowsStorage (distributed storage)
Key Features
Provider-based Storage
LocalWorkflowsStorage(in-memory, single-instance)RedisWorkflowsStorage(distributed, multi-instance)Distributed Notifications (new)
Shared Utilities
saveToDb(),deleteFromDb(),clearExpiredExecutions()preventRaceConditionExecutionIfNecessary()for concurrent execution handlingEnhanced Type Definitions
IWorkflowModuleOrchestratorServiceinterfaceDistributedNotificationSubscriberfor cross-instance communicationDistributedStorageHooksfor lifecycle managementConfiguration
Using Default (In-Memory) Storage
Note
Engine consolidation to provider architecture
@medusajs/workflowsmodule and new@medusajs/workflows-redisprovider (RedisWorkflowsStorage, loaders, config options) with pub/sub notifications and lifecycle hooksIWorkflowModuleOrchestratorService, storage hooks, notification subscriber, and scheduler/timeout/retry support@medusajs/medusa/workflowsandworkflows-redis, deprecate oldworkflow-engine-*, and map legacy package namesfilters_Written by Cursor Bugbot for commit 236b6d5. This will update automatically on new commits. Configure here.