Pipeline development framework, easy to experiment and compare different pipelines, quick to deploy to workflow orchestration tools
Most of the current workflow orchestrators focus on executing the already-developed pipelines in production. This library focuses on the pipeline development process. It aims to make it easy to develop pipeline, and once the user reaches a good pipeline, it aims to make it easy to export to other production-grade workflow orchestrators. Notable features:
- Manage pipeline experiment: store pipeline run outputs, compare pipelines & visualize.
- Support pipeline as code: allow complex customization.
- Support pipeline as configuration - suitable for plug-and-play when pipeline is more stable.
- Fast pipeline execution, auto-cache & run from cache when necessary.
- Allow version control of artifacts with git-lfs/dvc/god...
- Export pipeline to compatible workflow orchestration tools (e.g. Argo workflow, Airflow, Kubeflow...).
pip install theflow
(A code walk-through of this session is stored in examples/10-minutes-quick-start.ipynb
. You can run it with Google Colab (TODO - attach the link).)
Pipeline can be defined as code. You initialize all the ops in self.initialize
and route them in self.run
.
from theflow import Function
# Define some operations used inside the pipeline
# Operation 1: normal class-based Python object
class IncrementBy(Function):
x: int
def run(self, y):
return self.x + y
# Operation 2: normal Python function
def decrement_by_5(x):
return x - 5
# Declare flow
class MathFlow(Function):
increment: Function
decrement: Function
def run(self, x):
# Route the operations in the flow
y = self.increment(x)
y = self.decrement(y)
y = self.increment(y)
return y
flow = MathFlow(increment=IncrementBy(x=10), decrement=decrement_by_5)
You run the pipeline by directly calling it. The output is the same object returned by self.run
.
output = flow(x=5)
print(f"{output=}, {type(output)=}") # output=5, type(output)=int
You can investigate pipeline's last run through the last_run
property.
flow.last_run.id() # id of the last run
flow.last_run.logs() # list all information of each step
# [TODO] flow.last_run.visualize(path="vis.png") # export the graph in `vis.png` file
- Arguments management
- Cache
- cache by runs, organized by root task, allow reproducible
- specify the files
- the keys are like
lru_cache
, takes in the original input key, specify the cache, but the cache should be file-backed, for run-after-run execution. - cli command to manipulate cache
- Compare pipeline in a result folder
- Dynamically create reproducible config
- Support pipeline branching and merging
- Support single process or multi-processing pipeline running
- Can synchronize changes in the workflow, allowing logs from different run to be compatible with each other
- Compare different runs
- Same cache directory
- Compare evaluation result based on kwargs
- CLI List runs
- CLI Delete unnecessary runs
- Add coverage, pre-commit, CI...
MIT License.