Skip to content

Batching with delay and filtering is very slow #1986

Closed
@hariso

Description

@hariso

Bug description

We noticed in a pipeline that has records that need to be filtered and when batching is enabled, the pipeline's throughput is slower than expected.

After we eliminated some possible causes for this, we found that the combination of having a record that needs to be filtered and batching is causing this behavior. Whenever a filtered record is found in a pipeline, Conduit simply waits. Only when the batching time is over, Conduit reads the next record.

Steps to reproduce

The below pipeline config file reads a source file. Lines that have dontwrite in them need to be filtered. Example source file:

write 1
write 2
write 3
dontwrite 1
write 4
write 5
write 6
write 7
write 8
dontwrite 2

Pipeline config file:

version: "2.2"
pipelines:
  - id: pipeline1
    status: running
    name: pipeline1
    description: A pipeline with a template record formatter
    connectors:
      - id: source1
        type: source
        plugin: builtin:file
        name: file-source
        settings:
          "sdk.schema.extract.payload.enabled": "false"
          "sdk.schema.extract.key.enabled": "false"
          path: /tmp/file-source.txt
        processors:
          - id: add-filter-metadata
            plugin: "custom.javascript"
            settings:
              script: |
                function process(rec) {
                  let afterStr = String.fromCharCode.apply(String, rec.Payload.After);
                
                  let filter = afterStr.includes("dontwrite")  
                  
                  rec.Metadata["filter"] = filter.toString();
                  return rec;
                }
          - id: filter-records
            plugin: "filter"
            condition: '{{ eq .Metadata.filter "true" }}'
      - id: destination1
        type: destination
        plugin: file
        name: file-dst
        settings:
          path: /tmp/file-destination.txt
          sdk.batch.size: 10
          sdk.batch.delay: "5s"

With the above setup, Conduit will write the first 3 records in 5s, then the next 5 records in 5s, and so on.

Version

v0.12.2

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions