Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CSV Exporter] Stream Optimization for CSV Export with Queue and Concurrency Control #350

Open
Juiced66 opened this issue Jul 5, 2024 · 0 comments
Assignees
Labels
enhancement New feature or request

Comments

@Juiced66
Copy link
Contributor

Juiced66 commented Jul 5, 2024

Description :

Optimize Stream Handling for CSV Export with Queue and Concurrency Control.
Improve the efficiency of streaming large datasets to CSV by implementing a queue and concurrency control mechanism. This ensures that the server handles data export smoothly without overloading resources.

Task:

  1. Implement a queue mechanism to manage concurrent tasks.
  2. Refactor the sendExport method to process data in chunks using the queue.
  3. Ensure that the stream writes data incrementally to avoid memory overflow.
  4. Handle errors gracefully within the concurrency control setup.

Example Code:

Implement Queue and Concurrency Control

Queue Implementation:

class ConcurrencyQueue {
  private concurrencyLimit: number;
  private activePromises: number;
  private queue: (() => Promise<void>)[];

  constructor(concurrencyLimit: number) {
    this.concurrencyLimit = concurrencyLimit;
    this.activePromises = 0;
    this.queue = [];
  }

  enqueue(task: () => Promise<void>) {
    this.queue.push(task);
    this.dequeue();
  }

  private async dequeue() {
    if (this.activePromises >= this.concurrencyLimit || this.queue.length === 0) {
      return;
    }

    this.activePromises++;
    const task = this.queue.shift();
    task().finally(() => {
      this.activePromises--;
      this.dequeue();
    });
  }

  async waitForAll() {
    while (this.activePromises > 0 || this.queue.length > 0) {
      await new Promise(resolve => setTimeout(resolve, 100));
    }
  }
}

Usage :

async sendExport(stream: PassThrough, exportId: string) {
  try {
    const { query, sort, model, target } = await this.getExport(exportId);

    const engine = await this.getEngine();
    const modelDocument = await ask<AskModelDeviceGet | AskModelAssetGet>(
      `ask:device-manager:model:${target}:get`,
      {
        engineGroup: engine.group,
        model,
      }
    );

    const measureColumns = await this.generateMeasureColumns(
      modelDocument[target].measures
    );

    const columns: Column[] = [
      { header: "Payload Id", path: "_id" },
      { header: "Measured At", path: "_source.measuredAt", formatter: (value) => formatDate(value, dateFormat, locale) },
      { header: "Measure Type", path: "_source.type" },
      { header: "Device Id", path: "_source.origin._id" },
      { header: "Device Model", path: "_source.origin.deviceModel" },
      { header: "Asset Id", path: "_source.asset._id" },
      { header: "Asset Model", path: "_source.asset.model" },
      ...measureColumns,
    ];
  
    stream.write(stringify([columns.map((column) => column.header)]));

    const concurrencyQueue = new ConcurrencyQueue(5); // Limit concurrency to 5

    let result = await this.sdk.document.search<MeasureContent>(
      this.engineId,
      InternalCollection.MEASURES,
      { query, sort },
      { size: 200 }
    );

    const processHits = async (hits: KDocument<MeasureContent>[]) => {
      const rows = hits.map(hit =>
        stringify([selectedColumns.map(({ header, isMeasure, path }) => {
          let value = _.get(hit, path, null);
          if (isMeasure && target === "asset" && hit._source.asset?.measureName !== header) {
            return null;
          }
          return value;
        })])
      );
      stream.write(rows.join(''));
    };

    while (result.hits.length > 0) {
      concurrencyQueue.enqueue(() => processHits(result.hits));
      if (result.hits.length < 200) {
        break; // No more results
      }
      result = await this.sdk.document.search<MeasureContent>(
        this.engineId,
        InternalCollection.MEASURES,
        {
          query,
          sort,
          // search_after: result.hits[result.hits.length - 1].sort, <= this should be done in issue #349 
        },
        { size: 200 }
      );
    }

    // Wait for all tasks to complete
    await concurrencyQueue.waitForAll();
  } catch (error) {
    stream.write(error.message);
  } finally {
    stream.end();
  }
}

Pros:

  • Improves server performance by managing concurrency.
  • Prevents memory overflow by processing data in manageable chunks.
  • Ensures efficient use of server resources.

Cons:

  • Adds complexity to the data export logic.
  • Requires careful testing to ensure concurrency control works as expected.
@Juiced66 Juiced66 self-assigned this Jul 5, 2024
@Juiced66 Juiced66 added the enhancement New feature or request label Jul 5, 2024
@Juiced66 Juiced66 changed the title [CSV Export] Stream Optimization for CSV Export with Queue and Concurrency Control [CSV Exporter] Stream Optimization for CSV Export with Queue and Concurrency Control Jul 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant