Skip to content

Commit 3aebd23

Browse files
FlyteRemote - Delay fetching task execution data until node complete (#3347)
Signed-off-by: Yee Hing Tong <[email protected]>
1 parent 545d07e commit 3aebd23

File tree

1 file changed

+17
-4
lines changed

1 file changed

+17
-4
lines changed

flytekit/remote/remote.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2747,8 +2747,11 @@ def sync_node_execution(
27472747
# This is the plain ol' task execution case
27482748
else:
27492749
execution._task_executions = [
2750+
# Sync task execution but only get inputs/outputs if the overall execution is done
27502751
self.sync_task_execution(
2751-
FlyteTaskExecution.promote_from_model(t), node_mapping[node_id].task_node.flyte_task.interface
2752+
FlyteTaskExecution.promote_from_model(t),
2753+
node_mapping[node_id].task_node.flyte_task.interface,
2754+
get_task_exec_data=execution.is_done,
27522755
)
27532756
for t in iterate_task_executions(self.client, execution.id)
27542757
]
@@ -2763,16 +2766,26 @@ def sync_node_execution(
27632766
return execution
27642767

27652768
def sync_task_execution(
2766-
self, execution: FlyteTaskExecution, entity_interface: typing.Optional[TypedInterface] = None
2769+
self,
2770+
execution: FlyteTaskExecution,
2771+
entity_interface: typing.Optional[TypedInterface] = None,
2772+
get_task_exec_data: bool = True,
27672773
) -> FlyteTaskExecution:
27682774
"""Sync a FlyteTaskExecution object with its corresponding remote state."""
2775+
27692776
execution._closure = self.client.get_task_execution(execution.id).closure
2770-
execution_data = self.client.get_task_execution_data(execution.id)
27712777
task_id = execution.id.task_id
27722778
if entity_interface is None:
27732779
entity_definition = self.fetch_task(task_id.project, task_id.domain, task_id.name, task_id.version)
27742780
entity_interface = entity_definition.interface
2775-
return self._assign_inputs_and_outputs(execution, execution_data, entity_interface)
2781+
if get_task_exec_data:
2782+
try:
2783+
execution_data = self.client.get_task_execution_data(execution.id)
2784+
return self._assign_inputs_and_outputs(execution, execution_data, entity_interface)
2785+
except Exception as e:
2786+
logger.error(f"Failed to get data for successful task execution: {execution.id}, error: {e}")
2787+
raise
2788+
return execution
27762789

27772790
#############################
27782791
# Terminate Execution State #

0 commit comments

Comments
 (0)