Skip to content

Conversation

@satishrudderstack
Copy link

@satishrudderstack satishrudderstack commented Apr 28, 2025

We are evaluating Piscina workers for User Transformations(/customTransform path). The PR has the following changes

  • Added Piscina related code in a separate service/folder. It exposes 3 functions
    1. initialize() - Initializes the single instance of Piscina on Server startup
    2. terminate() - Cleans up the instance of Piscina on Server shutdown
    3. transform() - Actual transformation via Piscina worker
  • The code is written in TypeScript but there's an issue with Piscina as we cannot directly use Typescript files as Piscina worker. So we are passing transpiled .js filename to Piscina on production and dev but for tests we are loading the .ts file because we are running tests directly on typescript files

What is the related Linear task?

Resolves DAW-1090

Please explain the objectives of your changes below

Put down any required details on the broader aspect of your changes. If there are any dependent changes, mandatorily mention them here

Any changes to existing capabilities/behaviour, mention the reason & what are the changes ?

N/A

Any new dependencies introduced with this change?

N/A

Any new generic utility introduced or modified. Please explain the changes.

N/A

Any technical or performance related pointers to consider with the change?

N/A

@coderabbitai review


Developer checklist

  • My code follows the style guidelines of this project

  • No breaking changes are being introduced.

  • All related docs linked with the PR?

  • All changes manually tested?

  • Any documentation changes needed with this change?

  • Is the PR limited to 10 file changes?

  • Is the PR limited to one linear task?

  • Are relevant unit and component test-cases added in new readability format?

Reviewer checklist

  • Is the type of change in the PR title appropriate as per the changes?

  • Verified that there are no credentials or confidential data exposed with the changes.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Apr 28, 2025

Summary by CodeRabbit

  • New Features

    • Introduced support for asynchronous task processing using a worker pool, configurable via an environment variable.
    • Added a new Prometheus metric to monitor the worker pool queue size.
  • Chores

    • Integrated worker pool initialization and termination into the application lifecycle.
    • Added a test suite to ensure correct behavior of the worker pool management functions.

Summary by CodeRabbit

  • New Features

    • Introduced optional support for asynchronous event processing using a Piscina worker pool, enabled via an environment variable.
    • Added new background processing capabilities for user transformation events, improving scalability and performance when enabled.
  • Tests

    • Added comprehensive tests for the Piscina worker pool initialization, termination, and transformation task handling.
  • Chores

    • Implemented initialization and graceful shutdown routines for the worker pool during application startup and shutdown.
    • Updated process startup and shutdown routines to conditionally manage the worker pool based on configuration.

Walkthrough

This update introduces a Piscina-based worker pool for asynchronous user event transformation processing. The UserTransformController conditionally delegates transformation tasks to the Piscina worker pool when the USE_PISCINA environment variable is enabled. A new singleton PiscinaService manages the worker pool lifecycle and task delegation. Supporting changes include a worker handler module, integration into cluster startup and shutdown sequences, a Prometheus metric for queue size, and a comprehensive test suite for the Piscina wrapper. Public interfaces remain unchanged.

Changes

File(s) Change Summary
src/controllers/userTransform.ts Modified transform method to conditionally use transformWithPiscina based on USE_PISCINA; added import for transformWithPiscina.
src/services/piscina/wrapper.ts Added PiscinaService singleton class managing Piscina worker pool with static methods for initialization, termination, and transformation; exported helper functions for external use.
src/services/piscina/transform.ts Added new worker handler module exporting a default async function that delegates transformation to UserTransformService.transformRoutine.
src/services/piscina/__tests__/wrapper.test.ts Added test suite covering Piscina wrapper functions: initialization, termination, and transformation delegation; includes mocks and environment isolation.
src/util/cluster.js Integrated Piscina initialization and termination calls into cluster worker lifecycle based on USE_PISCINA environment variable.
src/util/prometheus.js Added new Prometheus gauge metric piscina_queue_size to track the current size of the Piscina worker queue.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant UserTransformController
    participant PiscinaService
    participant PiscinaWorker
    participant UserTransformService

    Client->>UserTransformController: POST /transform (events, features, requestSize)
    alt USE_PISCINA = true
        UserTransformController->>PiscinaService: transform(events, features, requestSize)
        PiscinaService->>PiscinaWorker: run({events, features, requestSize})
        PiscinaWorker->>UserTransformService: transformRoutine(events, features, requestSize)
        UserTransformService-->>PiscinaWorker: UserTransformationServiceResponse
        PiscinaWorker-->>PiscinaService: UserTransformationServiceResponse
        PiscinaService-->>UserTransformController: UserTransformationServiceResponse
    else USE_PISCINA = false
        UserTransformController->>UserTransformService: transformRoutine(events, features, requestSize)
        UserTransformService-->>UserTransformController: UserTransformationServiceResponse
    end
    UserTransformController-->>Client: Response
Loading

Suggestions for Improvement

  • Enhance operational visibility by adding more detailed metrics on worker pool utilization, task latency, and error rates.
  • Implement robust error handling and fallback strategies for Piscina task failures, including retries or circuit breakers.
  • Validate environment variable configurations at application startup to prevent misconfiguration issues.
  • Document the new environment variables (USE_PISCINA and related Piscina pool settings) clearly in deployment and developer guides.
  • Expand integration tests to cover end-to-end scenarios with Piscina enabled and disabled, ensuring consistent behavior.
  • Consider adding dynamic scaling or configuration reload capabilities for the Piscina pool to adapt to workload changes without restarts.

Note

⚡️ AI Code Reviews for VS Code, Cursor, Windsurf

CodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback.
Learn more here.


Note

⚡️ Faster reviews with caching

CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.
Enjoy the performance boost—your workflow just got faster.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@devops-github-rudderstack
Copy link
Contributor

Allure Test reports for this run are available at:

@codecov
Copy link

codecov bot commented Apr 28, 2025

Codecov Report

❌ Patch coverage is 84.72222% with 11 lines in your changes missing coverage. Please review.
✅ Project coverage is 91.11%. Comparing base (79cebfc) to head (d6f45e0).
⚠️ Report is 642 commits behind head on develop.

Files with missing lines Patch % Lines
src/controllers/userTransform.ts 14.28% 6 Missing ⚠️
src/services/piscina/wrapper.ts 92.30% 5 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           develop    #4282      +/-   ##
===========================================
- Coverage    91.21%   91.11%   -0.11%     
===========================================
  Files          632      637       +5     
  Lines        33053    33447     +394     
  Branches      7827     7905      +78     
===========================================
+ Hits         30150    30475     +325     
- Misses        2658     2733      +75     
+ Partials       245      239       -6     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@devops-github-rudderstack
Copy link
Contributor

Allure Test reports for this run are available at:

@devops-github-rudderstack
Copy link
Contributor

Allure Test reports for this run are available at:

@satishrudderstack satishrudderstack marked this pull request as ready for review April 30, 2025 10:20
@satishrudderstack satishrudderstack requested review from a team and sivashanmukh as code owners April 30, 2025 10:20
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
src/services/piscina/__tests__/wrapper.test.ts (1)

16-26: Before hook cleanup can be improved for performance.

The test setup properly clears mocks and resets environment variables between tests. However, using delete on environment variables can impact performance.

Consider using undefined assignment instead of delete operator for better performance:

  beforeEach(() => {
    jest.clearAllMocks();
    (Piscina as unknown as jest.Mock).mockImplementation(() => mockPiscina);
    // Reset environment variables
    process.env.USE_PISCINA = 'true';
-   delete process.env.PISCINA_MAX_THREADS;
-   delete process.env.PISCINA_MIN_THREADS;
-   delete process.env.PISCINA_IDLE_TIMEOUT;
-   delete process.env.PISCINA_MAX_QUEUE;
-   delete process.env.PISCINA_CONCURRENT_TASKS_PER_WORKER;
+   process.env.PISCINA_MAX_THREADS = undefined;
+   process.env.PISCINA_MIN_THREADS = undefined;
+   process.env.PISCINA_IDLE_TIMEOUT = undefined;
+   process.env.PISCINA_MAX_QUEUE = undefined;
+   process.env.PISCINA_CONCURRENT_TASKS_PER_WORKER = undefined;
  });
🧰 Tools
🪛 Biome (1.9.4)

[error] 21-21: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 22-22: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 23-23: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 24-24: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 25-25: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)

src/services/piscina/wrapper.ts (3)

17-29: Explicitly declare the constructor as private

The singleton pattern implementation should have an explicitly declared private constructor to prevent direct instantiation outside the class.

class PiscinaService {
  private static instance: PiscinaService;

  private piscina: Piscina | null = null;

  private isInitialized = false;

+ private constructor() {}

  private static getInstance(): PiscinaService {
    if (!PiscinaService.instance) {
      PiscinaService.instance = new PiscinaService();
    }
    return PiscinaService.instance;
  }

31-54: Add validation for configuration values

The numerical configuration values from environment variables should be validated to ensure they're within acceptable ranges for Piscina.

private getPiscinaConfig(): PiscinaConfig {
  const config: PiscinaConfig = {};

  if (process.env.PISCINA_MAX_THREADS) {
-   config.maxThreads = parseInt(process.env.PISCINA_MAX_THREADS, 10);
+   const maxThreads = parseInt(process.env.PISCINA_MAX_THREADS, 10);
+   if (maxThreads > 0) {
+     config.maxThreads = maxThreads;
+   } else {
+     logger.warn(`Invalid PISCINA_MAX_THREADS value: ${process.env.PISCINA_MAX_THREADS}, must be > 0`);
+   }
  }
  // Apply similar validation to other numerical configuration parameters

94-101: Add explicit typing for Piscina's run method parameter

Creating an interface for the Piscina worker's input parameters would improve type safety and documentation.

+ interface PiscinaWorkerInput {
+   events: ProcessorTransformationRequest[];
+   features: FeatureFlags;
+   requestSize: number;
+ }

public static async transform(
  events: ProcessorTransformationRequest[],
  features: FeatureFlags,
  requestSize: number,
): Promise<UserTransformationServiceResponse> {
  const service = PiscinaService.getInstance();
- return service.getPiscinaInstance().run({ events, features, requestSize });
+ return service.getPiscinaInstance().run<UserTransformationServiceResponse, PiscinaWorkerInput>({ 
+   events, 
+   features, 
+   requestSize 
+ });
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c58ff4a and 5259e05.

⛔ Files ignored due to path filters (2)
  • package-lock.json is excluded by !**/package-lock.json, !**/*.json
  • package.json is excluded by !**/*.json
📒 Files selected for processing (5)
  • src/controllers/userTransform.ts (2 hunks)
  • src/services/piscina/__tests__/wrapper.test.ts (1 hunks)
  • src/services/piscina/transform.ts (1 hunks)
  • src/services/piscina/wrapper.ts (1 hunks)
  • src/util/cluster.js (3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.js`: Focus on ESLint errors (max 3) and warnings (max 5).

**/*.js: Focus on ESLint errors (max 3) and warnings (max 5).

  • src/util/cluster.js
🧬 Code Graph Analysis (3)
src/util/cluster.js (4)
src/util/worker.js (3)
  • require (2-2)
  • require (3-3)
  • require (4-4)
src/util/metricsAggregator.js (2)
  • require (4-4)
  • logger (3-3)
src/services/piscina/wrapper.ts (2)
  • terminatePiscina (105-105)
  • initializePiscina (104-104)
src/routes/metricsRouter.js (1)
  • logger (2-2)
src/services/piscina/__tests__/wrapper.test.ts (2)
src/services/piscina/wrapper.ts (3)
  • terminatePiscina (105-105)
  • initializePiscina (104-104)
  • transformWithPiscina (106-106)
src/types/destinationTransformation.ts (1)
  • ProcessorTransformationRequest (18-26)
src/services/piscina/wrapper.ts (3)
src/util/cluster.js (1)
  • logger (3-3)
src/types/destinationTransformation.ts (1)
  • ProcessorTransformationRequest (18-26)
src/types/userTransformation.ts (1)
  • UserTransformationServiceResponse (20-23)
🪛 Biome (1.9.4)
src/services/piscina/__tests__/wrapper.test.ts

[error] 21-21: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 22-22: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 23-23: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 24-24: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 25-25: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)

🪛 GitHub Check: codecov/patch
src/controllers/userTransform.ts

[warning] 43-45: src/controllers/userTransform.ts#L43-L45
Added lines #L43 - L45 were not covered by tests


[warning] 52-53: src/controllers/userTransform.ts#L52-L53
Added lines #L52 - L53 were not covered by tests

🔇 Additional comments (12)
src/util/cluster.js (3)

7-7: The import statement for Piscina wrapper is correctly added.

Clean import of the Piscina functions needed for worker initialization and termination.


17-19: Good error handling in Piscina termination.

The error handling for terminating Piscina is appropriate, catching and logging errors without breaking the shutdown process.


94-94: Piscina initialization in worker process is correct.

The initialization of Piscina is appropriately placed in the worker process branch, not the master process.

src/controllers/userTransform.ts (2)

13-13: Import for Piscina transformation function looks good.

Clean import of the required Piscina transformation function.


52-53: Response handling is consistent with both implementation paths.

The code properly handles the transformation response regardless of whether Piscina or direct transformation was used. This ensures consistent API behavior.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 52-53: src/controllers/userTransform.ts#L52-L53
Added lines #L52 - L53 were not covered by tests

src/services/piscina/transform.ts (2)

1-4: Imports are properly structured.

Necessary imports for the worker function are correctly included.


6-16: Worker function implementation is clean and well-typed.

The worker function correctly delegates to the existing UserTransformService.transformRoutine. The TypeScript type definitions are thorough, ensuring type safety for the worker inputs and output.

src/services/piscina/__tests__/wrapper.test.ts (4)

1-9: Test setup with proper mocks.

Imports and mocks are correctly set up for the test suite.


34-69: Comprehensive tests for initialization logic.

The initialization tests cover all expected scenarios: enabling/disabling via environment variables and configuring through environment variables. The tests verify both the creation of the Piscina instance and logging.


72-85: Termination tests handle both initialization states.

The tests properly verify that termination functions correctly both when Piscina is initialized and when it isn't. Good error handling check.


87-140: Transform tests cover all expected scenarios.

The transform tests thoroughly verify that the Piscina worker is correctly invoked with the expected parameters and that an error is thrown when Piscina isn't initialized. The mock events are well-structured with all required properties.

src/services/piscina/wrapper.ts (1)

1-8:

Details

❓ Verification inconclusive

Add improved error handling for import failures

Consider adding try/catch around the Piscina import or having a fallback mechanism in case the module fails to load.


🏁 Script executed:

#!/bin/bash
# Check if Piscina is listed as a dependency in package.json
grep -A 5 "\"dependencies\"" package.json | grep "piscina"

# Check if there's a fallback mechanism elsewhere in the codebase
grep -r "catch.*Piscina" --include="*.ts" --include="*.js" .

Length of output: 165


🏁 Script executed:

#!/bin/bash
set -e

echo "1. Locating package.json files (excluding node_modules)..."
find . -type f -name "package.json" -not -path "*/node_modules/*"

echo
echo "2. Checking for Piscina in dependencies..."
for pkg in $(find . -type f -name "package.json" -not -path "*/node_modules/*"); do
  echo "$pkg"
  grep -R "\"piscina\"" -n "$pkg" || echo "  Piscina not found in dependencies of $pkg"
done

echo
echo "3. Scanning for import statements of Piscina in .ts/.js files..."
rg -n "import\s+.*Piscina" --glob "*.ts" --glob "*.js" || echo "No import statements for Piscina found."

echo
echo "4. Searching for try/catch blocks around Piscina usage..."
rg -n -C2 "Piscina" --glob "*.ts" --glob "*.js" | grep -E "try|catch" || echo "No try/catch around Piscina usage found."

Length of output: 1227


Verify Piscina dependency and import error handling

  • Ensure “piscina” is listed in your project’s package.json so that
    import Piscina from 'piscina' won’t fail at runtime.
  • Static ES imports cannot be wrapped in try/catch.
    If you need to guard against a missing or incompatible Piscina module, switch to a dynamic load inside a try/catch:
    let Piscina;
    try {
      Piscina = require('piscina');
    } catch (error) {
      logger.error('Failed to load Piscina module:', error);
      // fallback logic or rethrow
    }
  • If Piscina is guaranteed as a mandatory dependency, no additional import‐level error handling is required—rely on the runtime to surface any startup issues.

Please verify your dependency setup and adjust the import pattern if a fallback is needed.

Comment on lines +41 to +50
let processedResponse: UserTransformationServiceResponse;
if (process.env.USE_PISCINA === 'true') {
processedResponse = await transformWithPiscina(events, ctx.state.features, requestSize);
} else {
processedResponse = await UserTransformService.transformRoutine(
events,
ctx.state.features,
requestSize,
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Conditional Piscina usage is properly implemented.

The code correctly implements conditional usage of the Piscina worker pool based on the USE_PISCINA environment variable. However, the static analysis indicates these lines aren't covered by tests.


🏁 Script executed:

#!/bin/bash
# Check if there are any tests covering the conditional Piscina usage
rg -A 2 "USE_PISCINA" src/**/__tests__/*.test.ts

Length of output: 456


Add unit tests for the USE_PISCINA conditional branch

The if (process.env.USE_PISCINA === 'true') { … } else { … } block in
src/controllers/userTransform.ts (lines 41–50) isn’t exercised by any existing tests. Please add tests to cover both paths:

  • When USE_PISCINA='true', mock/set process.env.USE_PISCINA and assert that transformWithPiscina is invoked.
  • When USE_PISCINA is unset or any other value, assert that UserTransformService.transformRoutine is called.

Suggested test location:
src/controllers/__tests__/userTransform.test.ts

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 43-45: src/controllers/userTransform.ts#L43-L45
Added lines #L43 - L45 were not covered by tests

Comment on lines 63 to 81
public static initialize(): void {
const service = PiscinaService.getInstance();

if (process.env.USE_PISCINA !== 'true') {
logger.info('Piscina is disabled via USE_PISCINA environment variable');
return;
}

if (!service.isInitialized) {
const config = service.getPiscinaConfig();
service.piscina = new Piscina({
filename: path.resolve(__dirname, `transform${isTest ? '.ts' : '.js'}`),
execArgv: isTest ? ['-r', 'ts-node/register'] : undefined,
...config,
});
service.isInitialized = true;
logger.info('Piscina worker pool initialized');
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling during initialization

The initialization process should include try/catch to handle potential errors during Piscina instance creation.

public static initialize(): void {
  const service = PiscinaService.getInstance();

  if (process.env.USE_PISCINA !== 'true') {
    logger.info('Piscina is disabled via USE_PISCINA environment variable');
    return;
  }

  if (!service.isInitialized) {
    const config = service.getPiscinaConfig();
+   try {
      service.piscina = new Piscina({
        filename: path.resolve(__dirname, `transform${isTest ? '.ts' : '.js'}`),
        execArgv: isTest ? ['-r', 'ts-node/register'] : undefined,
        ...config,
      });
      service.isInitialized = true;
      logger.info('Piscina worker pool initialized');
+   } catch (error) {
+     logger.error(`Failed to initialize Piscina worker pool: ${error.message}`);
+   }
  }
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public static initialize(): void {
const service = PiscinaService.getInstance();
if (process.env.USE_PISCINA !== 'true') {
logger.info('Piscina is disabled via USE_PISCINA environment variable');
return;
}
if (!service.isInitialized) {
const config = service.getPiscinaConfig();
service.piscina = new Piscina({
filename: path.resolve(__dirname, `transform${isTest ? '.ts' : '.js'}`),
execArgv: isTest ? ['-r', 'ts-node/register'] : undefined,
...config,
});
service.isInitialized = true;
logger.info('Piscina worker pool initialized');
}
}
public static initialize(): void {
const service = PiscinaService.getInstance();
if (process.env.USE_PISCINA !== 'true') {
logger.info('Piscina is disabled via USE_PISCINA environment variable');
return;
}
if (!service.isInitialized) {
const config = service.getPiscinaConfig();
try {
service.piscina = new Piscina({
filename: path.resolve(__dirname, `transform${isTest ? '.ts' : '.js'}`),
execArgv: isTest ? ['-r', 'ts-node/register'] : undefined,
...config,
});
service.isInitialized = true;
logger.info('Piscina worker pool initialized');
} catch (error) {
logger.error(`Failed to initialize Piscina worker pool: ${error.message}`);
}
}
}

ControllerUtility.postProcess(ctx, processedRespone.retryStatus);

let processedResponse: UserTransformationServiceResponse;
if (process.env.USE_PISCINA === 'true') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] @abhimanyubabbar @satishrudderstack does it make sense to just randomly access environment variables throughout the code? Maybe this belongs in the context? I'm not familiar with this codebase but if there is already a pattern in place for this it might make sense to follow it unless it makes things too difficult.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/services/piscina/__tests__/wrapper.test.ts (1)

111-131: Consider adding error handling test cases

While the current tests cover successful transformation and initialization errors, they don't verify behavior when Piscina's run method fails.

Consider adding this test to verify error handling:

it('should propagate errors from Piscina run', async () => {
  const mockError = new Error('Worker error');
  mockPiscina.run.mockRejectedValue(mockError);

  initializePiscina();
  await expect(transformWithPiscina(mockEvents, mockFeatures, mockRequestSize))
    .rejects.toThrow('Worker error');
});
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5259e05 and b6b80b6.

⛔ Files ignored due to path filters (2)
  • package-lock.json is excluded by !**/package-lock.json, !**/*.json
  • package.json is excluded by !**/*.json
📒 Files selected for processing (3)
  • src/services/piscina/__tests__/wrapper.test.ts (1 hunks)
  • src/services/piscina/wrapper.ts (1 hunks)
  • src/util/cluster.js (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/util/cluster.js
  • src/services/piscina/wrapper.ts
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/services/piscina/__tests__/wrapper.test.ts (3)
src/services/piscina/wrapper.ts (3)
  • terminatePiscina (100-100)
  • initializePiscina (99-99)
  • transformWithPiscina (101-101)
src/util/cluster.js (1)
  • logger (3-3)
src/types/destinationTransformation.ts (1)
  • ProcessorTransformationRequest (18-26)
🪛 Biome (1.9.4)
src/services/piscina/__tests__/wrapper.test.ts

[error] 21-21: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 22-22: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 23-23: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 24-24: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 25-25: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)

⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
🔇 Additional comments (4)
src/services/piscina/__tests__/wrapper.test.ts (4)

1-9: Good use of mocking for unit tests

The imports and mock setup are well-structured, correctly isolating the test environment from actual dependencies. Mocking both Piscina and the logger ensures tests don't have external dependencies.


10-33: Test setup handles environment state appropriately

The test setup and teardown process is well-implemented with proper mock clearing and environment variable management. The afterEach handler correctly ensures that the Piscina instance is terminated after each test.

Regarding the static analysis warnings about using delete on environment variables:

  • In test contexts, using delete is semantically correct to simulate environment variables being absent
  • Setting to undefined would not create the same behavior for testing purposes
🧰 Tools
🪛 Biome (1.9.4)

[error] 21-21: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 22-22: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 23-23: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 24-24: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 25-25: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


63-76: Termination tests are complete

The tests appropriately verify successful termination of initialized instances and safe handling when no instance exists. Good coverage of the terminate functionality.


78-110: Well-structured mock data for transformation tests

The mock data for the transformation tests is comprehensive and follows the expected structure for ProcessorTransformationRequest. This ensures the tests accurately represent real usage scenarios.

Comment on lines 34 to 61
describe('initialize', () => {
it('should initialize Piscina', () => {
process.env.USE_PISCINA = 'true';
initializePiscina();
expect(Piscina).toHaveBeenCalled();
expect(logger.info).toHaveBeenCalledWith('Piscina worker pool initialized');
});

it('should use environment variables for configuration', () => {
process.env.PISCINA_MAX_THREADS = '4';
process.env.PISCINA_MIN_THREADS = '2';
process.env.PISCINA_IDLE_TIMEOUT = '5000';
process.env.PISCINA_MAX_QUEUE = '100';
process.env.PISCINA_CONCURRENT_TASKS_PER_WORKER = '2';

initializePiscina();

expect(Piscina).toHaveBeenCalledWith(
expect.objectContaining({
maxThreads: 4,
minThreads: 2,
idleTimeout: 5000,
maxQueue: 100,
concurrentTasksPerWorker: 2,
}),
);
});
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add test for when Piscina is disabled

The initialization tests verify proper initialization and configuration, but there's no test verifying the behavior when Piscina is disabled.

Consider adding this test:

it('should not initialize Piscina when USE_PISCINA is not true', () => {
  process.env.USE_PISCINA = 'false';
  initializePiscina();
  expect(Piscina).not.toHaveBeenCalled();
  
  process.env.USE_PISCINA = undefined;
  initializePiscina();
  expect(Piscina).not.toHaveBeenCalled();
});

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/services/piscina/__tests__/wrapper.test.ts (1)

32-58: Add test for when Piscina is disabled

The initialization tests verify proper initialization and configuration, but there's no test verifying the behavior when Piscina is disabled.

Consider adding this test:

it('should not initialize Piscina when USE_PISCINA is not true', () => {
  process.env.USE_PISCINA = 'false';
  initializePiscina();
  expect(Piscina).not.toHaveBeenCalled();
  
  process.env.USE_PISCINA = undefined;
  initializePiscina();
  expect(Piscina).not.toHaveBeenCalled();
});
🧹 Nitpick comments (2)
src/services/piscina/__tests__/wrapper.test.ts (2)

16-24: Consider using undefined assignment instead of delete for environment variables

The delete operator on process.env properties can have performance implications, even in test code. While this isn't critical for tests, it's a good practice to use undefined assignment instead.

- delete process.env.PISCINA_MAX_THREADS;
- delete process.env.PISCINA_MIN_THREADS;
- delete process.env.PISCINA_IDLE_TIMEOUT;
- delete process.env.PISCINA_MAX_QUEUE;
- delete process.env.PISCINA_CONCURRENT_TASKS_PER_WORKER;
+ process.env.PISCINA_MAX_THREADS = undefined;
+ process.env.PISCINA_MIN_THREADS = undefined;
+ process.env.PISCINA_IDLE_TIMEOUT = undefined;
+ process.env.PISCINA_MAX_QUEUE = undefined;
+ process.env.PISCINA_CONCURRENT_TASKS_PER_WORKER = undefined;
🧰 Tools
🪛 Biome (1.9.4)

[error] 19-19: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 20-20: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 21-21: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 22-22: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 23-23: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


39-57: Consider testing with invalid environment variable values

The test verifies that environment variables are correctly used for configuration, but doesn't test the handling of invalid values (non-numeric strings, negative values, etc.).

Consider adding cases to test robustness against invalid input:

it('should handle invalid environment variable values', () => {
  process.env.PISCINA_MAX_THREADS = 'not-a-number';
  process.env.PISCINA_MIN_THREADS = '-1';
  
  initializePiscina();
  
  // Verify fallback to defaults or handling of invalid values
  expect(Piscina).toHaveBeenCalledWith(
    expect.objectContaining({
      // Expected fallback values
    }),
  );
});
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b6b80b6 and a44cf28.

📒 Files selected for processing (1)
  • src/services/piscina/__tests__/wrapper.test.ts (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/services/piscina/__tests__/wrapper.test.ts (3)
src/services/piscina/wrapper.ts (3)
  • terminatePiscina (100-100)
  • initializePiscina (99-99)
  • transformWithPiscina (101-101)
src/util/cluster.js (1)
  • logger (3-3)
src/types/destinationTransformation.ts (1)
  • ProcessorTransformationRequest (18-26)
🪛 Biome (1.9.4)
src/services/piscina/__tests__/wrapper.test.ts

[error] 19-19: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 20-20: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 21-21: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 22-22: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)


[error] 23-23: Avoid the delete operator which can impact performance.

Unsafe fix: Use an undefined assignment instead.

(lint/performance/noDelete)

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Build Transformer Docker Image - PR / Build Transformer Docker Image AMD64
  • GitHub Check: Build User Transformer Docker Image - PR / Build Transformer Docker Image ARM64
  • GitHub Check: Build User Transformer Docker Image - PR / Build Transformer Docker Image AMD64
  • GitHub Check: Code Coverage
  • GitHub Check: UT Tests
  • GitHub Check: test_and_publish
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
🔇 Additional comments (3)
src/services/piscina/__tests__/wrapper.test.ts (3)

74-105: LGTM! Comprehensive test data setup

The mock data is well-structured and provides a good test case for the transformation functionality. The test data follows the expected type structure and includes all relevant fields.


107-120: LGTM! Thorough verification of transform behavior

The test properly verifies that the Piscina worker is called with the correct parameters and that the result is correctly returned.


122-126: LGTM! Proper error handling test

Good job testing the error case when transformation is attempted without initialization. This verifies the defensive programming approach in the service.

@devops-github-rudderstack
Copy link
Contributor

Allure Test reports for this run are available at:

const mockFeatures = { enabled: true };
const mockRequestSize = 1;

it('should call Piscina run with correct parameters', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a more complete test where we instantiate the PiscinaService and run an actual transformation?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/services/piscina/wrapper.ts (1)

66-82: ⚠️ Potential issue

Add USE_PISCINA environment check and error handling.

According to the PR description, initialization should be controlled by the USE_PISCINA environment variable. Also, there's no error handling for Piscina instantiation as suggested in a previous review.

public static initialize(): void {
  const service = PiscinaService.getInstance();

+  if (process.env.USE_PISCINA !== 'true') {
+    logger.info('Piscina is disabled via USE_PISCINA environment variable');
+    return;
+  }

  if (!service.isInitialized) {
    const config = service.getPiscinaConfig();
+    try {
      service.piscina = new Piscina({
        filename: path.resolve(__dirname, `transform${isTest ? '.ts' : '.js'}`),
        execArgv: isTest ? ['-r', 'ts-node/register'] : undefined,
        ...config,
      });
      service.isInitialized = true;
      logger.info('Piscina worker pool initialized');

      // Start collecting Piscina metrics
      service.startMetricsCollection();
+    } catch (error) {
+      logger.error(`Failed to initialize Piscina worker pool: ${error.message}`);
+    }
  }
}
🧹 Nitpick comments (2)
src/services/piscina/wrapper.ts (2)

34-57: Add input validation for environment variables.

The code parses environment variables without validation. Consider adding validation to ensure the parsed values are positive numbers and handle potential parsing errors.

private getPiscinaConfig(): PiscinaConfig {
  const config: PiscinaConfig = {};

  if (process.env.PISCINA_MAX_THREADS) {
-    config.maxThreads = parseInt(process.env.PISCINA_MAX_THREADS, 10);
+    const maxThreads = parseInt(process.env.PISCINA_MAX_THREADS, 10);
+    if (!isNaN(maxThreads) && maxThreads > 0) {
+      config.maxThreads = maxThreads;
+    } else {
+      logger.warn(`Invalid PISCINA_MAX_THREADS value: ${process.env.PISCINA_MAX_THREADS}, using default`);
+    }
  }
  
  // Similar validation for other configuration options

122-130: Consider adding error handling for transformation failures.

The transform method could benefit from try-catch error handling to gracefully handle failures during worker execution.

public static async transform(
  events: ProcessorTransformationRequest[],
  features: FeatureFlags,
  requestSize: number,
): Promise<UserTransformationServiceResponse> {
  const service = PiscinaService.getInstance();
-  return service.getPiscinaInstance().run({ events, features, requestSize });
+  try {
+    return await service.getPiscinaInstance().run({ events, features, requestSize });
+  } catch (error) {
+    logger.error(`Error in Piscina worker transformation: ${error.message}`);
+    throw error;
+  }
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between a44cf28 and d6f45e0.

📒 Files selected for processing (2)
  • src/services/piscina/wrapper.ts (1 hunks)
  • src/util/prometheus.js (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • src/util/prometheus.js
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/services/piscina/wrapper.ts (4)
src/util/cluster.js (1)
  • logger (3-3)
src/util/prometheus.js (1)
  • logger (2-2)
src/types/destinationTransformation.ts (1)
  • ProcessorTransformationRequest (18-26)
src/types/userTransformation.ts (1)
  • UserTransformationServiceResponse (20-23)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Build Transformer Docker Image - PR / Build Transformer Docker Image AMD64
  • GitHub Check: Build Transformer Docker Image - PR / Build Transformer Docker Image ARM64
  • GitHub Check: Build User Transformer Docker Image - PR / Build Transformer Docker Image AMD64
  • GitHub Check: Build User Transformer Docker Image - PR / Build Transformer Docker Image ARM64
  • GitHub Check: Code Coverage
  • GitHub Check: UT Tests
  • GitHub Check: test_and_publish
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
🔇 Additional comments (7)
src/services/piscina/wrapper.ts (7)

1-7: The imports look appropriate for the functionality.

The necessary dependencies are properly imported, including Piscina, path utilities, type definitions, feature flags, logging, and stats collection.


10-16: Well-structured type definition for PiscinaConfig.

The type definition clearly specifies all the configurable parameters for the Piscina worker pool, making the code more maintainable and type-safe.


18-32: Singleton pattern is correctly implemented.

The class properly implements the singleton pattern with a private static instance and a getInstance method, ensuring only one PiscinaService instance exists throughout the application lifecycle.


59-64: Good error handling for uninitialized Piscina instance.

The getPiscinaInstance method correctly checks if Piscina is initialized before returning the instance, preventing potential null reference errors.


84-104: Well-implemented metrics collection with proper error handling.

The metrics collection logic properly handles the case when there's no Piscina instance, clears existing intervals, and uses try-catch for error handling. The collection interval of 5 seconds seems reasonable.


106-120: Comprehensive termination logic.

The terminate method correctly cleans up all resources: clears the metrics interval, destroys the Piscina instance, and resets state variables. The logging helps with observability.


132-134: Clean API exports.

Exporting named functions that delegate to the static methods provides a clean API for external modules to use while maintaining encapsulation of the singleton instance.

@devops-github-rudderstack
Copy link
Contributor

Allure Test reports for this run are available at:

@sonarqubecloud
Copy link

@devops-github-rudderstack
Copy link
Contributor

This PR is considered to be stale. It has been open for 20 days with no further activity thus it is going to be closed in 7 days. To avoid such a case please consider removing the stale label manually or add a comment to the PR.

@devops-github-rudderstack
Copy link
Contributor

This PR is considered to be stale. It has been open for 20 days with no further activity thus it is going to be closed in 7 days. To avoid such a case please consider removing the stale label manually or add a comment to the PR.

@devops-github-rudderstack
Copy link
Contributor

This PR is considered to be stale. It has been open for 20 days with no further activity thus it is going to be closed in 7 days. To avoid such a case please consider removing the stale label manually or add a comment to the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants