Skip to content

Conversation

@Jim-Encord
Copy link
Contributor

Endless amount of effort could be put into this. Ideally there would be a separate fetch and execute queue. Ideally we could mark our dependencies as async to ensure that when we are fetching them, we can switch between different threads.

This is just one approach

@github-actions
Copy link

Encord Agents test report

93 tests   93 ✅  1m 58s ⏱️
 1 suites   0 💤
 1 files     0 ❌

Results for commit 25f99c4.

Comment on lines +275 to +283
with ThreadPoolExecutor() as executor:
dependency_list = list(
executor.map(
lambda context: solve_dependencies(
context=context, dependant=runner_agent.dependant, stack=stack
),
batch,
)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we'd want to wait for all tasks to be fetched before starting the compute? Can't we just call the agent when iterating the output of the executor.map call rather than wrapping it in list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One could do. I was just replicating the customer's original behaviour where they do all fetching first ahead of task execution.
Notably the customer wanted explicitly sequential inference and if we had the map include the agent execution, we lose this benefit.

help="Max number of tasks to try to process per stage on a given run. If `None`, will attempt all",
),
] = None,
pre_fetch_factor: Annotated[
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure name and functionality match here. Factor is multiplied, this seems to be an absolute number - at least based on doc string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be pre-fetch batch size. My view on factor was: If at x, we perform a (grouped) dependency fetch N/x times rather than N times.

except Exception:
print(f"[attempt {attempt+1}/{num_retries+1}] Agent failed with error: ")
traceback.print_exc()
with ThreadPoolExecutor() as executor:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not specify how many threads (perhaps even give as argument)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do. By default, ThreadPoolExecutor will pick it based on n_cpus. I didn't necessarily want to expand the interface too much. But I defo see your point

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants