Skip to content

Commit

Permalink
feat: Add flow_id, self.name for instance names of Flows and Nodes an…
Browse files Browse the repository at this point in the history
…d easy isolated storage for nested flows

This change introduces several improvements to BaseNode and Flow classes:

1. Added instance name tracking with get_instance_name()
2. Simplified UUID format to 8-character hex without hyphens
3. Added flow hierarchy tracking with flow.name and flow.id
4. Improved flow name lookup with explicit naming support

These changes enable:
- Better debugging with named instances and flows
- Simplified flow identification with shorter UUIDs
- Flow hierarchy awareness for nested flows
- Consistent naming across nodes and flows

The improvements include:
- Automatic instance name lookup walking up the call stack
- Simplified 8-character flow IDs for easier reference
- Flow name tracking with explicit naming support
- Parent flow tracking for nested flows

This is particularly useful for:
- Debugging complex flows with named components
- Tracking flow execution in logs
- Visualizing flow hierarchies
- Maintaining backward compatibility

feat: Add flow_storage attribute to BaseNode

feat: Add instance name tracking to BaseNode for better debugging

fix: add base case to prevent infinite recursion in `_propagate_flow_id`

fix: prevent infinite recursion in flow propagation with visited set

fix: improve instance name lookup by walking up call stack

feat: Add rework flow with file processing and LLM integration

refactor: enhance debug output in GetOpinion.prep with class name

feat: Add name parameter to GetOpinion node initialization

feat: Add debug print statement to BaseNode key discovery

refactor: improve instance name lookup and GetOpinion initialization

feat: Add debug print statement for opinion2_Node name

feat: Add flow hierarchy tracking to BaseNode

feat: add flow name and id tracking with alias support

refactor: Simplify flow UUIDs and improve naming consistency

draft: Just a example to check if it works, it needs some cleaning!

docs: Add pocketflow.txt documentation

feat: Add instance tracking, simplified UUIDs, and flow hierarchy in BaseNode
  • Loading branch information
johnr14 committed Jan 15, 2025
1 parent b7444fa commit d10844d
Show file tree
Hide file tree
Showing 4 changed files with 339 additions and 26 deletions.
24 changes: 24 additions & 0 deletions pocketflow.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
This change introduces several improvements to BaseNode and Flow classes:

1. Added instance name tracking with get_instance_name()
2. Simplified UUID format to 8-character hex without hyphens
3. Added flow hierarchy tracking with flow.name and flow.id
4. Improved flow name lookup with explicit naming support

These changes enable:
- Better debugging with named instances and flows
- Simplified flow identification with shorter UUIDs
- Flow hierarchy awareness for nested flows
- Consistent naming across nodes and flows

The improvements include:
- Automatic instance name lookup walking up the call stack
- Simplified 8-character flow IDs for easier reference
- Flow name tracking with explicit naming support
- Parent flow tracking for nested flows

This is particularly useful for:
- Debugging complex flows with named components
- Tracking flow execution in logs
- Visualizing flow hierarchies
- Maintaining backward compatibility
154 changes: 128 additions & 26 deletions pocketflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,35 @@
import asyncio, warnings, copy, time
import asyncio, warnings, copy, time, uuid, sys

class BaseNode:
def __init__(self): self.params,self.successors={},{}
def __init__(self):
self.params, self.successors = {}, {}
self.flow_id = None
self.flow_storage = {}
self.name = self.get_instance_name() or str(id(self))
self.flow = None # Will be set by Flow._propagate_flow_id
self.parent = None # Will be set by Flow._propagate_flow_id

def get_instance_name(self):
"""Find the variable name this instance is assigned to, if any"""
# First check caller's locals
try:
frame = sys._getframe(1)
while frame:
# Check both locals and globals of each frame
for scope in (frame.f_locals, frame.f_globals):
for key, value in scope.items():
if value is self:
# Skip special names and self references
if not key.startswith('_') and key != 'self':
return key
# Move up the call stack
frame = frame.f_back
except (AttributeError, ValueError):
pass
return None
def _get_id(self):
"""Get unique identifier for this node within its flow"""
return f"{self.flow_id}.{id(self)}" if self.flow_id else str(id(self))
def set_params(self,params): self.params=params
def add_successor(self,node,action="default"):
if action in self.successors: warnings.warn(f"Overwriting successor for action '{action}'")
Expand All @@ -11,8 +39,8 @@ def exec(self,prep_res): pass
def post(self,shared,prep_res,exec_res): pass
def _exec(self,prep_res): return self.exec(prep_res)
def _run(self,shared): p=self.prep(shared);e=self._exec(p);return self.post(shared,p,e)
def run(self,shared):
if self.successors: warnings.warn("Node won't run successors. Use Flow.")
def run(self,shared):
if self.successors: warnings.warn("Node won't run successors. Use Flow.")
return self._run(shared)
def __rshift__(self,other): return self.add_successor(other)
def __sub__(self,action):
Expand All @@ -37,22 +65,68 @@ class BatchNode(Node):
def _exec(self,items): return [super(BatchNode,self)._exec(i) for i in items]

class Flow(BaseNode):
def __init__(self,start): super().__init__();self.start=start
def __init__(self, start):
super().__init__()
self.start = start
# Generate simpler UUID without hyphens
self.flow_id = uuid.uuid4().hex[:8]
self.id = self.flow_id # Alias for flow_id
# Force name lookup even if flow is created inline
self.name = self.get_instance_name() or f"flow_{self.flow_id}"
self._propagate_flow_id(self.start)

def _get_id(self):
"""Get unique identifier for this flow"""
return self.flow_id

def _propagate_flow_id(self, node, visited=None):
"""Set flow_id and flow hierarchy on all nodes in the flow"""
if visited is None:
visited = set()

if node is None or id(node) in visited: # Base cases
return

visited.add(id(node))
node.flow_id = self.flow_id
node.flow = self
node.parent = self.parent if hasattr(self, 'parent') else None

for successor in node.successors.values():
self._propagate_flow_id(successor, visited)
def get_next_node(self,curr,action):
nxt=curr.successors.get(action or "default")
if not nxt and curr.successors: warnings.warn(f"Flow ends: '{action}' not found in {list(curr.successors)}")
return nxt
def _orch(self,shared,params=None):
curr,p=copy.copy(self.start),(params or {**self.params})
while curr: curr.set_params(p);c=curr._run(shared);curr=copy.copy(self.get_next_node(curr,c))
def _orch(self, shared, params=None):
# Initialize flow storage if needed
if 'storage' not in shared:
shared['storage'] = {}
if self.flow_id not in shared['storage']:
shared['storage'][self.flow_id] = {}

flow_storage = shared['storage'][self.flow_id]
curr, p = copy.copy(self.start), (params or {**self.params})
while curr:
curr.set_params(p)
c = curr._run(flow_storage)
curr = copy.copy(self.get_next_node(curr, c))
def _run(self,shared): pr=self.prep(shared);self._orch(shared);return self.post(shared,pr,None)
def exec(self,prep_res): raise RuntimeError("Flow can't exec.")

class BatchFlow(Flow):
def _run(self,shared):
pr=self.prep(shared) or []
for bp in pr: self._orch(shared,{**self.params,**bp})
return self.post(shared,pr,None)
def _run(self, shared):
# Initialize flow storage if needed
if 'storage' not in shared:
shared['storage'] = {}
if self.flow_id not in shared['storage']:
shared['storage'][self.flow_id] = {}

flow_storage = shared['storage'][self.flow_id]
pr = self.prep(flow_storage) or []
for bp in pr:
self._orch(flow_storage, {**self.params, **bp})
return self.post(flow_storage, pr, None)

class AsyncNode(Node):
def prep(self,shared): raise RuntimeError("Use prep_async.")
Expand All @@ -64,14 +138,14 @@ async def prep_async(self,shared): pass
async def exec_async(self,prep_res): pass
async def exec_fallback_async(self,prep_res,exc): raise exc
async def post_async(self,shared,prep_res,exec_res): pass
async def _exec(self,prep_res):
async def _exec(self,prep_res):
for i in range(self.max_retries):
try: return await self.exec_async(prep_res)
except Exception as e:
if i==self.max_retries-1: return await self.exec_fallback_async(prep_res,e)
if self.wait>0: await asyncio.sleep(self.wait)
async def run_async(self,shared):
if self.successors: warnings.warn("Node won't run successors. Use AsyncFlow.")
async def run_async(self,shared):
if self.successors: warnings.warn("Node won't run successors. Use AsyncFlow.")
return await self._run_async(shared)
async def _run_async(self,shared): p=await self.prep_async(shared);e=await self._exec(p);return await self.post_async(shared,p,e)

Expand All @@ -82,19 +156,47 @@ class AsyncParallelBatchNode(AsyncNode,BatchNode):
async def _exec(self,items): return await asyncio.gather(*(super(AsyncParallelBatchNode,self)._exec(i) for i in items))

class AsyncFlow(Flow,AsyncNode):
async def _orch_async(self,shared,params=None):
curr,p=copy.copy(self.start),(params or {**self.params})
while curr:curr.set_params(p);c=await curr._run_async(shared) if isinstance(curr,AsyncNode) else curr._run(shared);curr=copy.copy(self.get_next_node(curr,c))
async def _orch_async(self, shared, params=None):
# Initialize flow storage if needed
if 'storage' not in shared:
shared['storage'] = {}
if self.flow_id not in shared['storage']:
shared['storage'][self.flow_id] = {}

flow_storage = shared['storage'][self.flow_id]
curr, p = copy.copy(self.start), (params or {**self.params})
while curr:
curr.set_params(p)
if isinstance(curr, AsyncNode):
c = await curr._run_async(flow_storage)
else:
c = curr._run(flow_storage)
curr = copy.copy(self.get_next_node(curr, c))
async def _run_async(self,shared): p=await self.prep_async(shared);await self._orch_async(shared);return await self.post_async(shared,p,None)

class AsyncBatchFlow(AsyncFlow,BatchFlow):
async def _run_async(self,shared):
pr=await self.prep_async(shared) or []
for bp in pr: await self._orch_async(shared,{**self.params,**bp})
return await self.post_async(shared,pr,None)
async def _run_async(self, shared):
# Initialize flow storage if needed
if 'storage' not in shared:
shared['storage'] = {}
if self.flow_id not in shared['storage']:
shared['storage'][self.flow_id] = {}

flow_storage = shared['storage'][self.flow_id]
pr = await self.prep_async(flow_storage) or []
for bp in pr:
await self._orch_async(flow_storage, {**self.params, **bp})
return await self.post_async(flow_storage, pr, None)

class AsyncParallelBatchFlow(AsyncFlow,BatchFlow):
async def _run_async(self,shared):
pr=await self.prep_async(shared) or []
await asyncio.gather(*(self._orch_async(shared,{**self.params,**bp}) for bp in pr))
return await self.post_async(shared,pr,None)
async def _run_async(self, shared):
# Initialize flow storage if needed
if 'storage' not in shared:
shared['storage'] = {}
if self.flow_id not in shared['storage']:
shared['storage'][self.flow_id] = {}

flow_storage = shared['storage'][self.flow_id]
pr = await self.prep_async(flow_storage) or []
await asyncio.gather(*(self._orch_async(flow_storage, {**self.params, **bp}) for bp in pr))
return await self.post_async(flow_storage, pr, None)
1 change: 1 addition & 0 deletions pocketflow/example.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fasdasdasd
Loading

0 comments on commit d10844d

Please sign in to comment.