Closed
Description
Bug description
Any processor that executes after filter
will crash with:
ERR pipeline worker stopped error="task outbox-events:source:enrich_metadata: processor didn't return any records"
As explained by @hariso, this is because a filter record is of a special type.
Steps to reproduce
Example pipeline:
version: 2.2
pipelines:
- id: outbox-events
status: running
connectors:
- id: source
type: source
plugin: builtin:postgres
settings:
url: ${PG_SOURCE_DSN}
tables: outbox_events
cdcMode: logrepl
logrepl.autoCleanup: 'false'
logrepl.publicationName: cnd_outbox
logrepl.slotName: cnd_outbox_events
logrepl.withAvroSchema: 'false'
snapshot.fetchSize: '1000'
snapshotMode: initial
processors:
- id: insert_only
plugin: 'filter'
condition: '{{ ne .Operation 1 }}'
- id: enrich_metadata
plugin: 'custom.javascript'
settings:
script: |
function process(rec) {
return rec;
}
- id: output-log
type: destination
plugin: builtin:log
settings:
sdk.schema.extract.key.enabled: 'false'
sdk.schema.extract.payload.enabled: 'false'
Version
v0.13.4
Metadata
Metadata
Assignees
Labels
Type
Projects
Status
Done