Skip to content

Feature Request: Chainable pipelines #2274

@nickchomey

Description

@nickchomey

Problem

Currently Conduit is mostly focused on just pure ETL, and does a great job with it!

But something that isn't particularly possible is to tie Conduit into a broader real-time, event-driven architecture - specifically with respect to destination connector success/failure.

One common use case that I have in mind is CQRS-like caching - after there's a write to the destination DB, trigger some process that updates a cache.

Another use could be publishing some sort of message - be it http, nats, kafka etc... - to indicate to something else that a write has succeeded or failed. This might be further used as part of a SAGA Orchestration Pattern.

Proposed Solution

@hariso and I discussed this in discord previously, and somewhat settled on the following idea (which I've since refined a bit more).

It would be great to be able to continue a pipeline AFTER the destination connector write method returns.

Currently we can only use destination connectors in parallel, but if a destination connector could effectively serve as the source/parent for another one, then we could write our own destination connector that handles success/failure (the state of which could be added to the record's metadata) of particular messages.

Moreover, it would be even better if we could also use processors after destination connectors, so that we could easily filter, use javascript, or even send the record for Embedding (eg only wanting to do so AFTER the write succeeded to a db).

In that sense, it would essentially just be chaining entire pipelines together, which might even make this an easier thing to think about and implement? Source->Processors->Destination->Processors->Destination->etc...

I suppose that they could even be, in theory, infinitely chainable and probably (as with existing pipeline functionality) also branchable - multiple destination connectors "subscribed" to the output of a previous destination connector (now source).

I assume most people would just add some minimal functionality after the destination connector to trigger something elsewhere in the architecture (I'd probably just filter the records again and then publish a NATS message with the result, which another service could subscribe to and carry on with)

Yet, I could imagine using Conduit not just as... a conduit in a broader architecture... but as more of a Rosetta Stone-like application orchestration/coordination hub, allowing various sources and services to read/write to each other in a realtime, event driven fashion. And it would make a slick UI on top of it, like Meroxa, even more valuable. Perhaps something like n8n, but with robust, high performance, single binary deployments due to golang.

If there's interest in this, it seems to me that now would be a fortuitous time to give it some thought, given that you are looking to polish off the v2 architecture.

Alternatives Considered

Do nothing

Just write destination connectors that include such actions before the Write method ends. But, that's not all that practical when it comes to a complex connector, such as Mysql. And maintaining a fork of those connectors with modifications would also not be ideal

Use multiple pipelines

We could also chain various distinct pipelines together. eg start a pipeline with a mysql source connector after a pipeline terminates with a mysql destination, to listen for specific changes then trigger some action.

How does this differ from the proposed solution? You would have the overhead of a separate pipeline, regenerating the schema and formatting the OpenCDC record, you wouldn't have whatever metadata and other data was added from the original pipeline, etc.. - when all you wanted was to write a bit of javascript or send the record to NATS or OpenAI.

Hooks

These could be something that lets us register code - be it built-in or wasm - on success/failure of the destination write method.

This was my initial proposal, which also suggested hooks before/after each connector and processor. But, I dont think that it is appropriate/necessary, since there's already processors/destination connectors downstream from each of those that can handle this sort of thing. We only really need to deal with destination connectors, which are currently the end-of-the-line. And we dont need anything before the destination Write, because that's what processors are. So, we just need something after destination connectors, which might as well be more destination connectors (or processors + destination connectors).

Dagster has something that they call hooks, that might be somewhat similar
https://docs.dagster.io/guides/build/ops/op-hooks
https://www.getorchestra.io/guides/dagster-orchestration-principles-hooks-guide

Metadata

Metadata

Assignees

No one assigned

    Labels

    featureNew feature or request

    Type

    No type

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions