Skip to content

Commit 3a6bcb0

Browse files
vijayvammiclaude
andauthored
feat: Refactor branch logging (#246)
* docs: add telemetry design for pipeline observability Design document for Phase 1 telemetry implementation: - Pipeline and task-level spans via logfire-api - StreamingSpanProcessor for FastAPI SSE real-time streaming - Self-hosted OpenTelemetry backend support - Zero-dependency shim (no-ops without logfire installed) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * docs: add detailed telemetry implementation plan 12 bite-sized tasks with TDD approach: - Add logfire-api dependency - Create telemetry module with helpers - Add StreamingSpanProcessor - Instrument PipelineContext, JobContext - Instrument all task types (Python, Notebook, Shell) - FastAPI streaming example 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * docs: add local telemetry test, mark FastAPI as placeholder - Add Task 11: Local telemetry test with console output - Mark Task 12 (FastAPI) as placeholder pending logfire integration docs - Add task order table to summary 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * docs: update FastAPI example to use logfire.instrument_fastapi() - Use logfire.instrument_fastapi(app) for HTTP request instrumentation - Pipeline/task spans become children of HTTP request via OTEL context - Keep StreamingSpanProcessor for real-time SSE streaming to UI 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add logfire-api dependency for telemetry support - Add logfire-api>=2.0.0 to core dependencies - Add telemetry optional extras with logfire and opentelemetry 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add telemetry module with helpers - truncate_value() for serializing params with size limit - set_stream_queue()/get_stream_queue() for SSE streaming - Re-export logfire_api as logfire for convenience 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add StreamingSpanProcessor for SSE streaming - OTEL_AVAILABLE flag detects opentelemetry availability - StreamingSpanProcessor pushes span_start/span_end to queue - Dual output: forwards to base processor AND streams to UI - Gracefully no-ops when OTEL not installed 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add telemetry spans to PipelineContext.execute - Pipeline span with name, run_id, executor attributes - logfire.info for started/completed events - logfire.error for failure events 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add telemetry spans to JobContext.execute - Job span with job_name, run_id, executor attributes - logfire.info for started/submitted events - logfire.error for failure events 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add telemetry spans to PythonTaskType - Add logfire import and truncate_value from telemetry module - Add _safe_serialize_params() for defensive serialization - Wrap execute_command in logfire.span with command as task name - Log task started/completed with truncated inputs/outputs - Log task failures with error message 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add telemetry spans to NotebookTaskType and ShellTaskType - Move _safe_serialize_params to BaseTaskType for reuse - Wrap NotebookTaskType.execute_command in logfire.span - Wrap ShellTaskType.execute_command in logfire.span - Log task started/completed with truncated inputs/outputs - Log task failures with error messages 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: export telemetry helpers from runnable package - Export set_stream_queue, get_stream_queue, truncate_value - Export OTEL_AVAILABLE flag - Conditionally export StreamingSpanProcessor when OTEL available 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add local telemetry test example Example shows console telemetry output with: - Pipeline span wrapping execution - Task spans with truncated inputs/outputs - Timestamps and durations 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add FastAPI telemetry streaming example Example demonstrates: - logfire.instrument_fastapi for HTTP span instrumentation - SSE streaming of telemetry spans via set_stream_queue - Pipeline execution in thread pool - User parameters via RUNNABLE_PRM_* env vars 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * fix: use functools.partial for run_in_executor args 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add dual telemetry output with SSE streaming and OTEL support - Add _emit_event() to task types for real-time SSE streaming - Include inputs/outputs in task events for observability - Auto-configure logfire from env vars at import time (RUNNABLE_TELEMETRY_CONSOLE, OTEL_EXPORTER_OTLP_ENDPOINT, LOGFIRE_TOKEN) - Update FastAPI example with functools.partial pipeline registry - Add comprehensive telemetry documentation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add multiple demo pipelines and improve param serialization - Add 6 pipelines: example, hello, parameters, parallel, map, shell - Add GET /pipelines endpoint to list available pipelines - Improve _safe_serialize_params to truncate per-value not whole dict - Handle ObjectParameter explicitly with "<object>" placeholder - Update README with pipeline examples and curl commands 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * docs: add async execution design document Design for native async support in runnable: - AsyncPythonTaskType for async functions - AsyncLocalExecutor for async graph traversal - AsyncPipelineContext with async execute() - AsyncPythonTask/AsyncPipeline SDK classes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * docs: add async execution implementation plan 18 bite-sized tasks with TDD approach: - AsyncPythonTaskType task plugin - AsyncLocalExecutor executor plugin - AsyncPipelineContext context class - AsyncPythonTask/AsyncPipeline SDK classes - Tests and example 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * docs: update async execution plan with AsyncGenerator streaming - Update AsyncPythonTaskType to support AsyncGenerator functions - Add event_callback parameter for streaming task_chunk events - Add execute_stream() method to AsyncPipeline for SSE - Add TaskChunkEvent model for streaming chunks - Add streaming LLM example with FastAPI SSE - Update tests to cover streaming behavior - Add architecture diagram showing dual flow pattern 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: Ading agentic capabilities * docs: update async execution design with hybrid sync/async pattern - Add explicit sync/async method signatures to base classes - BasePipelineExecutor async methods raise NotImplementedError by default - BaseTaskType.execute_command_async raises NotImplementedError by default - Only LocalExecutor and AsyncPythonTaskType implement async methods - Shared helpers minimize code duplication between sync/async paths - Clear error messages when async called on unsupported components 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * docs: update async execution plan with hybrid sync/async pattern - Rewrite plan with explicit method separation - Base classes raise NotImplementedError for async by default - LocalExecutor implements both sync and async paths - PipelineContext gets execute_async() method - Add shared helpers to minimize code duplication - 19 tasks covering full implementation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add async method stubs to base classes - Add pytest-asyncio>=0.23.0 to dev dependencies - Add async method stubs to BasePipelineExecutor: - execute_graph_async() - execute_from_graph_async() - trigger_node_execution_async() - _execute_node_async() - Add execute_command_async() stub to BaseTaskType - All stubs raise NotImplementedError by default 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add shared helpers and async methods to executor and nodes - Add _prepare_node_for_execution() shared helper - Add _finalize_graph_execution() shared helper - Refactor execute_from_graph() to use shared helper - Add async methods to LocalExecutor: - execute_graph_async() - execute_from_graph_async() - trigger_node_execution_async() - _execute_node_async() - Add execute_async() to BaseNode (delegates to sync by default) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add async methods to node classes - Add execute_as_graph_async() stub to CompositeNode - Add execute_async() to TaskNode with fallback to sync - Add execute_as_graph_async() to ParallelNode 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add async methods to composite nodes and PipelineContext - Add execute_as_graph_async() to MapNode - Add execute_as_graph_async() to ConditionalNode - Add _handle_completion() helper to PipelineContext - Add execute_async() to PipelineContext 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add async pipeline examples and tests - Add AsyncPipelineContext for simplified async execution - Add AsyncPythonTask and AsyncPythonTaskType for async functions - Add async examples (async_tasks.py, async_sequential.py) - Add test_async_examples in test_pipeline_examples.py - Update _context property to accept both context types - Export async classes from runnable/__init__.py 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * docs: add FastAPI LLM streaming design Design document for native async streaming from pipeline functions to SSE clients, demonstrating token-by-token LLM response streaming. Key decisions: - Separate telemetry (infrastructure) from LLM streaming (domain) - AsyncGenerator pass-through pattern for universal consumption - Minimal implementation via callback → queue → AsyncGenerator wrapper 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * docs: add FastAPI LLM streaming implementation plan Implementation plan for execute_streaming() on AsyncPipeline with event_callback stored on executor (not threaded through params). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add _event_callback attribute to BasePipelineExecutor 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: TaskNode.execute_async uses executor's event_callback Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add execute_streaming method to AsyncPipeline Add execute_streaming() method that returns an AsyncGenerator yielding events from pipeline execution. This enables streaming patterns like SSE by using asyncio.Queue to bridge executor callbacks to the AsyncGenerator interface. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add mock LLM streaming functions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add AsyncPipeline definition for LLM streaming 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add FastAPI app with SSE streaming endpoint 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * docs: add README for FastAPI LLM streaming example 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add agentic dependency group with fastapi and uvicorn * chore: remove fastapi-telemetry dependency group * refactor: remove unused methods from AsyncPipeline * feat: Ading streaming capability * fix: properly extract return values from AsyncGenerator done event * feat: make AsyncGenerator return value extraction configurable Add stream_end_type field to AsyncPythonTask with default 'done'. This allows users to customize which event type indicates end of stream and contains return values, making the framework more flexible beyond LLM-specific patterns. * refactor: remove unnecessary else block in AsyncGenerator handling AsyncGenerator functions should always yield event dicts with 'type'. The stream_end_type event contains return values. Removed backwards compatibility code for direct value yields to clarify the pattern. * fix: generate unique run_id for each FastAPI request - Import datetime and names module - Generate unique run_id per request using get_random_name() - Pass run_id to execute_streaming() - Remove unused chat-and-summarize endpoint * feat: Ading streaming capability * docs: added docs on FastAPI LLM integration * docs: added docs on FastAPI LLM integration * refactor: progress on map_variable to iter_variable refactoring Work in progress on refactoring map_variable usage to use iter_variable.map pattern in preparation for IterableParameterModel. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4 <[email protected]> * refactor: replace map_variable with iter_variable.map_variable This comprehensive refactoring replaces direct `map_variable` usage with the structured `iter_variable.map_variable` pattern using `IterableParameterModel` from defaults.py. Changes include: - Updated all function signatures to use `iter_variable: IterableParameterModel` instead of `map_variable` - Modified function bodies to access map variables via `iter_variable.map_variable.items()` - Fixed value access patterns to use `MapVariableModel.value` for actual data - Updated all function callers to pass `IterableParameterModel` instances - Fixed test files to use proper JSON-encoded values for `MapVariableModel` - Maintained backward compatibility in CLI entry points Files modified: 25 across core framework, pipeline executors, and extensions Tests: All passing, including specific map variable functionality 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4 <[email protected]> * fix: update parameter tests to use proper MapVariableModel JSON encoding Fixed remaining test failures in test_parmeters.py by updating MapVariableModel instances to use JSON-encoded values as required by the Json[Any] field type. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4 <[email protected]> * fix: address diagnostic issues in map.py - Replace deprecated .copy() calls with .model_copy() - Fix OrderedDict type annotation for converted_map - Ensure consistent pattern between sync and async versions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4 <[email protected]> * feat: Refactored map variable, getting ready for loop * feat: Refactored map variable, getting ready for loop * feat: add parameters field to BranchLog Add parameters Dict[str, Parameter] to BranchLog to support branch-scoped parameter storage. This is the foundation for simplifying parameter naming conventions in map/parallel nodes. Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add internal_branch_name parameter to get_parameters Allow get_parameters to retrieve branch-scoped parameters by passing an optional internal_branch_name. When not provided, returns root-level parameters for backward compatibility. Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat: add internal_branch_name parameter to set_parameters Allow set_parameters to store branch-scoped parameters by passing an optional internal_branch_name. When not provided, sets root-level parameters for backward compatibility. Co-Authored-By: Claude Sonnet 4.5 <[email protected]> * feat: allow create_branch_log to accept initial parameters Enable branches to be created with pre-populated parameters, supporting fan_out scenarios where parent parameters need to be copied to child branches. Co-Authored-By: Claude Opus 4.5 <[email protected]> * refactor: simplify ChunkedRunLogStore to only RunLog and BranchLog Remove STEP_LOG and PARAMETER log types. Steps and parameters are now stored within their parent (RunLog or BranchLog), eliminating the need for separate file management. Co-Authored-By: Claude Opus 4.5 <[email protected]> * refactor: add_step_log stores within parent RunLog/BranchLog Steps are now stored within their parent entity rather than as separate files. This simplifies file management and aligns with the branch-scoped model. Also cleaned up lingering references to STEP_LOG and PARAMETER in naming_pattern and retrieve methods. Co-Authored-By: Claude Sonnet 4.5 <[email protected]> * refactor: get/set_parameters use RunLog/BranchLog directly Parameters are now stored within RunLog.parameters or BranchLog.parameters, eliminating separate parameter files. Co-Authored-By: Claude Sonnet 4.5 <[email protected]> * refactor: simplify ChunkedRunLogStore reconstruction Remove obsolete methods and simplify _prepare_full_run_log. Branches are now self-contained with their own steps and params. Changes: - Removed orderly_retrieve method (no longer needed) - Simplified _prepare_full_run_log to just attach branches to parent steps - Fixed get_step_log to retrieve from parent RunLog/BranchLog - Fixed branch attachment logic to correctly identify parent steps Co-Authored-By: Claude Sonnet 4.5 <[email protected]> * fix: remove timestamp-based naming from BranchLog files - BranchLog files now use simple naming: BranchLog-{internal_name} - Removed Template substitution for creation_time - Removed unused imports (time, Template, string) - Branch files are now stable and predictable * fix: parallel map iteration failure tracking - Keep separate list of iteration variables for failure reporting - Fix TypeError when checking failed iterations (JSON string vs dict) - Simplify failed_iterations list comprehension * fix: fan_in to use branch-scoped parameters instead of old naming - Get parameters from each branch using internal_branch_name - Removed old {iter_variable}_{param_name} naming convention - Removed setting reduced flag (field was removed) - Removed duplicate set_parameters call - Fixed error messages to reference branch parameters * fix: tasks set parameters in branch scope not root - Pass internal_branch_name to set_parameters - Tasks now update parameters in their branch scope - Fixes old {iter_value}_{param_name} naming in branches * fix: add missing internal_branch_name field to BaseTaskType - Field was missing from BaseTaskType causing AttributeError - Added with default empty string and excluded from serialization - Required for branch-scoped parameter updates * fix: handle missing parameters in map fan_in gracefully - When branches take failure path, they may not set all return params - Changed KeyError exception to skip missing parameters - Only reduce parameters that were actually set by branches - Fixes map examples with on_failure handlers * fix: remove iteration value prefix from task return parameters - Task returns use clean names (processed_python not 1_processed_python) - Parameters are scoped to branches, no need for prefixing - Removed all {iter_value}_{param_name} generation logic - Fixed for Python, Notebook, and Shell task types * fix: create return parameters in parent scope during fan_in - Return parameters don't exist in parent initially (created by tasks) - Check if parameter exists before updating value - Create new JsonParameter if missing in parent scope - Fixes KeyError when setting reduced parameters * fix: set reduced parameters to parent scope in fan_in - Renamed variable to parent_internal_branch_name for clarity - For root-level map, this is empty string (RunLog parameters) - For nested map, this is the branch containing the map node - Ensures reduced parameters are set to correct parent scope * fix: use self.internal_branch_name directly for parent scope - Removed unnecessary _resolve_map_placeholders call - Reduced parameters go to where map node lives (parent of branches) - For root-level map: empty string (RunLog) - For nested map: the branch containing the map node * fix: set executable internal_branch_name in TaskNode execute methods Tasks need to know which branch they belong to so they can set parameters in the correct branch scope. The TaskNode now sets the resolved branch name on the executable before execution. * fix: pass internal_branch_name to task at creation time The task executable should receive internal_branch_name during construction so it knows which branch it belongs to from the start. This gets resolved to the actual branch name at execution time. * fix: update branch log in run log after parameter updates When updating parameters in a branch, we need to call add_branch_log to update the branch back in the run log structure. This is essential for file-based stores where branches are saved as separate files. * fix: address user fixes for branch parameter persistence * refactor: remove print debug statements and reduced concept - Removed print debug statements from tasks.py and map.py - Removed 'reduced' field from all Parameter classes - Simplified parameter resolution to direct get_parameters call - Removed unreduced parameter filtering from notebook execution - Removed placeholder parameter creation in map fan_out - Updated comments and variable names * refactor: simplify ChunkedRunLogStore with direct file naming With only 2 log types (RunLog and BranchLog), we can construct file names directly instead of using pattern matching: - RunLog: always named 'RunLog' - BranchLog: always named 'BranchLog-{internal_branch_name}' Changes: - Replaced naming_pattern() with get_file_name() - Replaced get_matches() abstract method with _exists() and _list_branch_logs() - Simplified store() and retrieve() to use direct file name construction - Updated _prepare_full_run_log() to use _list_branch_logs() - Changed model(**contents) to model.model_validate(contents) - Implemented new methods in ChunkedFileSystemRunLogStore - Removed unused Template import * refactor: update ChunkedMinioRunLogStore with new abstract methods Implemented _exists() and _list_branch_logs() to replace get_matches() pattern matching, making the Minio implementation consistent with the simplified ChunkedRunLogStore base class. - Removed unused Template import - Simplified file existence checking with direct path construction - Branch log listing uses exact glob pattern instead of template substitution * fix: correct file path construction in _store and _retrieve Both ChunkedFileSystemRunLogStore and ChunkedMinioRunLogStore now consistently construct full file paths in both _store and _retrieve methods by combining run_id folder with the file name. Previously, the path construction logic was tied to the 'insert' flag, which caused FileNotFoundError when retrieving files. Now the file name parameter is always treated as a relative name that needs to be prefixed with the run_id folder path. All tests in test_chunked_simplified.py now pass. * test: remove reduced attribute assertion from test Removed assertion for the 'reduced' attribute from test_object_parameter_init since the 'reduced' field was removed from all Parameter classes as part of the refactoring. All 53 tests in test_datastore.py now pass. * feat: reduced complexity in handling composite nodes * feat: reduced complexity in handling composite nodes * docs: add branch parameter rollback plan and update argo pipeline config * feat: reduced complexity in handling composite nodes --------- Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent 31b1d57 commit 3a6bcb0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2340
-1006
lines changed

argo-pipeline.yaml

Lines changed: 83 additions & 105 deletions
Large diffs are not rendered by default.

docs/plans/branch-parameter-rollback.md

Lines changed: 440 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
"""
2+
Demonstrate parameter rollback from conditional branches.
3+
4+
When a conditional branch executes and succeeds, parameters set within
5+
that branch roll back to the parent scope.
6+
7+
Execute with:
8+
python examples/10-branch-parameters/conditional_rollback.py
9+
"""
10+
11+
from examples.common.functions import (
12+
set_conditional_heads_param,
13+
set_conditional_tails_param,
14+
verify_conditional_rollback,
15+
)
16+
from runnable import Conditional, Pipeline, PythonTask
17+
18+
19+
def decide_heads():
20+
"""Return 'heads' to select heads branch."""
21+
return "heads"
22+
23+
24+
def main():
25+
# Create branch pipelines that set parameters
26+
heads_pipeline = PythonTask(
27+
name="heads_task",
28+
function=set_conditional_heads_param,
29+
returns=["branch_param"],
30+
).as_pipeline()
31+
32+
tails_pipeline = PythonTask(
33+
name="tails_task",
34+
function=set_conditional_tails_param,
35+
returns=["branch_param"],
36+
).as_pipeline()
37+
38+
# Conditional node selects branch based on 'choice' parameter
39+
conditional = Conditional(
40+
name="conditional",
41+
branches={"heads": heads_pipeline, "tails": tails_pipeline},
42+
parameter="choice",
43+
)
44+
45+
# Task to set the choice parameter
46+
decide_task = PythonTask(
47+
name="decide",
48+
function=decide_heads,
49+
returns=["choice"],
50+
)
51+
52+
# Task to verify the parameter rolled back from branch
53+
verify_task = PythonTask(
54+
name="verify",
55+
function=verify_conditional_rollback,
56+
terminate_with_success=True,
57+
)
58+
59+
pipeline = Pipeline(steps=[decide_task, conditional, verify_task])
60+
pipeline.execute()
61+
62+
return pipeline
63+
64+
65+
if __name__ == "__main__":
66+
main()
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""
2+
Demonstrate parameter conflict resolution in parallel branches.
3+
4+
When multiple branches set the same parameter, last write wins
5+
based on dictionary iteration order.
6+
7+
Execute with:
8+
python examples/10-branch-parameters/parallel_conflict.py
9+
"""
10+
11+
from examples.common.functions import set_shared_param_a, set_shared_param_b
12+
from runnable import Parallel, Pipeline, PythonTask
13+
14+
15+
def main():
16+
# Both branches set the same parameter name
17+
branch1_pipeline = PythonTask(
18+
name="branch1_task",
19+
function=set_shared_param_a,
20+
returns=["shared"],
21+
).as_pipeline()
22+
23+
branch2_pipeline = PythonTask(
24+
name="branch2_task",
25+
function=set_shared_param_b,
26+
returns=["shared"],
27+
).as_pipeline()
28+
29+
parallel = Parallel(
30+
name="parallel",
31+
branches={"branch1": branch1_pipeline, "branch2": branch2_pipeline},
32+
)
33+
34+
pipeline = Pipeline(steps=[parallel])
35+
pipeline.execute()
36+
37+
return pipeline
38+
39+
40+
if __name__ == "__main__":
41+
main()
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"""
2+
Demonstrate parameter rollback from parallel branches.
3+
4+
When parallel branches execute and succeed, parameters from all branches
5+
roll back to the parent scope.
6+
7+
Execute with:
8+
python examples/10-branch-parameters/parallel_rollback.py
9+
"""
10+
11+
from examples.common.functions import (
12+
set_parallel_branch1,
13+
set_parallel_branch2,
14+
set_parallel_branch3,
15+
verify_parallel_rollback,
16+
)
17+
from runnable import Parallel, Pipeline, PythonTask
18+
19+
20+
def main():
21+
# Create branch pipelines that set different parameters
22+
branch1_pipeline = PythonTask(
23+
name="branch1_task",
24+
function=set_parallel_branch1,
25+
returns=["result1"],
26+
).as_pipeline()
27+
28+
branch2_pipeline = PythonTask(
29+
name="branch2_task",
30+
function=set_parallel_branch2,
31+
returns=["result2"],
32+
).as_pipeline()
33+
34+
branch3_pipeline = PythonTask(
35+
name="branch3_task",
36+
function=set_parallel_branch3,
37+
returns=["result3"],
38+
).as_pipeline()
39+
40+
# Parallel node executes all branches
41+
parallel = Parallel(
42+
name="parallel",
43+
branches={
44+
"branch1": branch1_pipeline,
45+
"branch2": branch2_pipeline,
46+
"branch3": branch3_pipeline,
47+
},
48+
)
49+
50+
# Task to verify all parameters rolled back from branches
51+
verify_task = PythonTask(
52+
name="verify",
53+
function=verify_parallel_rollback,
54+
terminate_with_success=True,
55+
)
56+
57+
pipeline = Pipeline(steps=[parallel, verify_task])
58+
pipeline.execute()
59+
60+
return pipeline
61+
62+
63+
if __name__ == "__main__":
64+
main()

examples/common/functions.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,3 +225,59 @@ async def async_process(result: str):
225225
processed = f"processed_{result}"
226226
print(f"Processed: {processed}")
227227
return processed
228+
229+
230+
# Helper functions for conditional parameter rollback examples
231+
def set_conditional_heads_param():
232+
"""Set parameter when heads branch executes."""
233+
return "heads_value"
234+
235+
236+
def set_conditional_tails_param():
237+
"""Set parameter when tails branch executes."""
238+
return "tails_value"
239+
240+
241+
def set_conditional_multiple():
242+
"""Set multiple parameters in conditional branch."""
243+
return "param1_value", "param2_value", "param3_value"
244+
245+
246+
def verify_conditional_rollback(branch_param: str):
247+
"""Verify rolled back parameter from conditional branch."""
248+
assert branch_param in ["heads_value", "tails_value"]
249+
return branch_param
250+
251+
252+
# Helper functions for parallel parameter rollback examples
253+
def set_parallel_branch1():
254+
"""Set parameter in parallel branch 1."""
255+
return "branch1_value"
256+
257+
258+
def set_parallel_branch2():
259+
"""Set parameter in parallel branch 2."""
260+
return "branch2_value"
261+
262+
263+
def set_parallel_branch3():
264+
"""Set parameter in parallel branch 3."""
265+
return "branch3_value"
266+
267+
268+
def verify_parallel_rollback(result1: str, result2: str, result3: str):
269+
"""Verify all parallel branch parameters rolled back."""
270+
assert result1 == "branch1_value"
271+
assert result2 == "branch2_value"
272+
assert result3 == "branch3_value"
273+
return "verified"
274+
275+
276+
def set_shared_param_a():
277+
"""Set shared parameter to value A."""
278+
return "value_a"
279+
280+
281+
def set_shared_param_b():
282+
"""Set shared parameter to value B."""
283+
return "value_b"

extensions/job_executor/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
utils,
1313
)
1414
from runnable.datastore import DataCatalog, JobLog, JsonParameter, StepAttempt
15+
from runnable.defaults import IterableParameterModel
1516
from runnable.executor import BaseJobExecutor
1617
from runnable.tasks import BaseTaskType
1718

@@ -174,11 +175,11 @@ def _sync_catalog(
174175
return data_catalogs
175176

176177
def add_task_log_to_catalog(
177-
self, name: str, map_variable: Dict[str, str | int | float] | None = None
178+
self, name: str, iter_variable: Optional[IterableParameterModel] = None
178179
):
179180
log_file_name = utils.make_log_file_name(
180181
name=name,
181-
map_variable=map_variable,
182+
iter_variable=iter_variable,
182183
)
183184
task_console.save_text(log_file_name)
184185
# Put the log file in the catalog

0 commit comments

Comments
 (0)