-
I tried to use workflow checkpoints for storing and restoring state of workflow in case of errors. I used checkpoints and serialized them in a file using Pydantic's Buggy example: import asyncio
from llama_index.core.workflow import Checkpoint, Event, StartEvent, StopEvent, Workflow, WorkflowCheckpointer, step
class MiddleEvent(Event):
content: str
class TestWorkflow(Workflow):
@step
async def start(self, ev: StartEvent) -> MiddleEvent:
return MiddleEvent(content="foo")
@step
async def finish(self, ev: MiddleEvent) -> StopEvent:
return StopEvent(content=ev.content + " bar")
async def main():
w = TestWorkflow(verbose=True)
w = WorkflowCheckpointer(workflow=w)
print("First run:")
await w.run()
checkpoints = list(w.checkpoints.items())[0][1]
first_check = checkpoints[0]
first_check = first_check.model_dump()
first_check = Checkpoint.model_validate(first_check)
print("From checkpoint:")
await w.run_from(checkpoint=first_check)
asyncio.run(main()) I expected I could rerun workflow from deserialized (Sorry, it's big)
Then... why would anyone use checkpoints? If I remove serialization and deserialization, code works. I tried to search for similar issues and found this:
I guess one should serialize and deserialize |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 2 replies
-
Yes, you're correct. In the LlamaIndex framework, it's recommended to serialize and deserialize the To properly manage workflow state, you should focus on the
This approach ensures that the workflow state is correctly serialized and deserialized, allowing for seamless state restoration [2][3]. To continue talking to Dosu, mention @dosu. Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other |
Beta Was this translation helpful? Give feedback.
-
IDK, I tried to serialize/deserialize import json
import asyncio
from pprint import pprint
from llama_index.core.workflow import Context, Event, StartEvent, StopEvent, Workflow, step
class EventA(Event):
content: str
class EventB(Event):
content: str
class TestWorkflow(Workflow):
@step
async def start(self, ev: StartEvent) -> EventA:
return EventA(content="first")
@step
async def process(self, ev: EventA) -> EventB:
return EventB(content="second: " + ev.content)
@step
async def finish(self, ctx: Context, ev: EventB) -> StopEvent:
if await ctx.get('var'):
return StopEvent(result="end: " + ev.content)
else:
raise ValueError()
async def main():
w = TestWorkflow(verbose=True)
ctx = Context(w)
try:
pprint(await w.run(ctx=ctx))
except:
pass
ctx = json.dumps(ctx.to_dict(), ensure_ascii=False)
ctx = Context.from_dict(w, json.loads(ctx))
await ctx.set('var', '1')
print('-' * 40)
pprint(await w.run(ctx=ctx))
print("OK")
asyncio.run(main()) At first run, step throws an error. Which produces this input:
Which is what I expected, it should fail on Then I serialize/deserialize the
I expected, that step My question is still wide open: how one can store the state of One can have a big workflow, and in case some step throws an exception (for various reasons, network error or bug), user should be able to restore the workflow, instead of rerunning it again and performing the same heavy computations |
Beta Was this translation helpful? Give feedback.
-
I'm terribly sorry. Pydantic More information here: pydantic/pydantic#8213. Closing, as this problem is not relevant to LlamaIndex. Correct way of serializing/deserializing from llama_index.core.workflow import JsonPickleSerializer, JsonSerializer
ctx_dict = ctx.to_dict(serializer=JsonSerializer())
restored_ctx = Context.from_dict(
workflow, ctx_dict, serializer=JsonSerializer()
) |
Beta Was this translation helpful? Give feedback.
I'm terribly sorry.
Pydantic
model_dump
doesn't serialize as I expected.More information here: pydantic/pydantic#8213.
Closing, as this problem is not relevant to LlamaIndex.
Correct way of serializing/deserializing
Context
: