-
Notifications
You must be signed in to change notification settings - Fork 5
make parent_flow as job router #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
make parent_flow as job router #26
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look very promising! I’ve added a couple of questions below. The main suggestion is to decouple the model metadata so it focuses purely on the algorithm itself, rather than environment or runtime-specific configuration (e.g. conda_env, network, volumes, etc.).
In addition, a few things to consider further:
- Would it be worthwhile to use Prefect 3.x? I understand this would require changing mlex_utils, but are there other repos that would require major changes as well?
- What mlflow version is compatible with these changes? Do we need to update the dependencies?
- I'd also recommend moving sensitive information to Prefect Secrets.
- As @dylanmcreynolds suggested, we should consider adding a SFapi worker for NERSC.
- Updating README and creating documentation
- Creating unit tests for this repo
flows/parent_flow.py
Outdated
| return FlowType.conda | ||
|
|
||
| @task | ||
| def get_algorithm_details_from_mlflow(model_name: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add model version? If not defined, should we go for the latest?
| def get_algorithm_details_from_mlflow(model_name: str): | |
| def get_algorithm_details_from_mlflow(model_name: str, model_version: Optional[int] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The model_name sent from the application side follows the same naming convention as in default_model.json:
"model_name": "pytorch_autoencoder_v0.0.5"
where the version is included. It is also consistent with the original algorithm display:

Alternatively, I could split it into two fields—model_name: "pytorch_autoencoder" and model_version: "1" (instead of "v0.0.5")—and have the application send them as separate variables. Note that MLflow assigns numeric versions (1, 2, …, n, with n being the latest) by default. If we want a custom version name (e.g., "v0.0.5"), we need to add it manually as a tag (which I have already done). Let me know which approach you think is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Splitting it into 2 fields and following mlflow version tracking convention sounds like a good plan to me
flows/parent_flow.py
Outdated
| tags = run.data.tags | ||
|
|
||
| # Get relevant fields from MLflow params | ||
| algorithm_details = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’d recommend simplifying this section to focus on metadata that’s directly relevant to the algorithm itself, rather than environment or runtime configuration (e.g. conda_env, network, volumes, etc.):
| algorithm_details = { | |
| algorithm_details = { | |
| "model_name": model_name, | |
| "image_name": params.get("image_name", ""), | |
| "image_tag": params.get("image_tag", ""), | |
| "source_code": params.get("source_code", ""), # e.g. repo link | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion. I change algorithm_details to
algorithm_details = {
"model_name": model_name,
# Core Algorithm Information
"image_name": params.get("image_name", ""),
"image_tag": params.get("image_tag", ""),
"source": params.get("source", ""),
"is_gpu_enabled": params.get("is_gpu_enabled", "False").lower() == "true"
}
And create another job_details to load information from the newly created config.yml file:
job_details = {
# Container settings
"volumes": config.get("container", {}).get("volumes", []),
"network": config.get("container", {}).get("network", ""),
# Slurm settings
"num_nodes": config.get("slurm", {}).get("num_nodes", 1),
"partitions": config.get("slurm", {}).get("partitions", "[]"),
"reservations": config.get("slurm", {}).get("reservations", "[]"),
"max_time": config.get("slurm", {}).get("max_time", "1:00:00"),
"submission_ssh_key": config.get("slurm", {}).get("submission_ssh_key", ""),
"forward_ports": config.get("slurm", {}).get("forward_ports", "[]"),
# Get conda environment based on the model type
"conda_env": _get_conda_env_for_model(model_name, config)
}
|
For invoking a container from a script, I would think that the mechanisms are exactly the same except for |
With the current implementation, the setup differs slightly because the To elaborate, for the ✅ Docker-out-of-Docker (Sibling Container Pattern) Flow Sequence: I attempted to use the " |
| prefect work-pool create docker_pool --type "docker" | ||
| prefect work-pool update docker_pool --concurrency-limit $PREFECT_WORK_POOL_CONCURRENCY | ||
| prefect deploy -n launch_docker --pool docker_pool --prefect-file prefect-docker.yaml | ||
| PREFECT_WORKER_WEBSERVER_PORT=8081 prefect worker start --pool docker_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we run on the ALS infrastructure, prefect server will be reached at something like https://prefect.computing.als.lbl.gov. I'm a little concerned to see the PREFECT_WORKER_WEBSERVER_PORT setting, which would throw that off. But I may not understand everything here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the PREFECT_WORKER_WEBSERVER_PORT setting because we now run multiple workers concurrently (parent_pool and docker_pool). The prefect worker start command exposes a health check webserver on port 8080 by default. When starting multiple workers on the same machine, each needs a unique port to avoid conflicts - hence PREFECT_WORKER_WEBSERVER_PORT=8081 for the docker_pool worker.
This port setting only affects the local health check endpoint and does not impact how workers communicate with the Prefect server at https://prefect.computing.als.lbl.gov (which is controlled by PREFECT_API_URL). However, we should verify this works as expected in the real ALS deployment.
I guess I was looking at this script thinking that |
I’m not entirely sure about this, but here’s the explanation from |
taxe10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the previous comments! These are a few new suggestions to consider:
- We should avoid spanning docker containers from other docker containers as this will likely be restricted in deployment environments
- The conda workers are currently not working due to a missing path to their source code - I added a suggestion on how we could manage this in the config file
- Error tracking between parent and child flows - e.g. when a child flow fails, the error is not parsed to the parent flow, which keeps the
Completedstatus
config.yml
Outdated
| pca: "mlex_dimension_reduction_pca" | ||
| umap: "mlex_dimension_reduction_umap" | ||
| clustering: "mlex_clustering" | ||
| dlsia: "dlsia" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if here we should add
algorithm_path:
pytorch_autoencoder: /path/to/autoencoders
.
.
dlsia: /path/to/dlsia # /src/train.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came up with another method to determine the algorithm_path. Since we register the image_name in MLflow (e.g., ghcr.io/mlexchange/mlex_dlsia_segmentation_prototype), we can easily extract the algorithm folder name (mlex_dlsia_segmentation_prototype) from it. With ALGORITHMS_DIR defined in the .env file, the full path becomes ALGORITHMS_DIR/folder_name. I’ve updated my code accordingly, so we no longer need to define algorithm_path in config.yml.
I also found another limitation in the segmentation app when switching the worker type. There is a model_dir variable in io_parameter (loaded from WRITE_DIR in .env) that determines where DVC results are saved. For the conda flow, this should be a local machine path; for the Docker flow, it needs to be a container path. This means that whenever we switch worker types, we must also update that value on the application side, which is not ideal.
I’m considering saving the DVC results as artifacts directly to the MLflow server instead. This would avoid path differences but would require changes in both the DLSIA and segmentation repositories.
config.yml
Outdated
| @@ -0,0 +1,32 @@ | |||
| # MLExchange Job Configuration | |||
| # Default HPC type to use | |||
| hpc_type: "als" | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also wondering if we should rename this to:
worker:
name: "als"
type: "podman"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I just updated it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be addressed in the next PR - should we add a --run-background flag? This is such that we can maintain a single file for each worker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good. I will address it in the next PR.
Thanks for your reviews! I’ve addressed all the comments and closed the issue for the third one. Both the |
flows/utils.py
Outdated
| return FlowType.conda | ||
|
|
||
|
|
||
| def _get_conda_env_for_model(model_name: str, config: dict) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this function would need to be modified as new algorithms are registered. I wonder if we could use the {model_name}_{version} as the key to find the conda environment in the config file?
I am trying to register dlsia 2.0 (refactored)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I’m using {model_name}_{version} to name the Conda environments in config.yml:
# Conda environment settings
conda:
conda_env_name:
DLSIA MSDNet_0.0.1: "dlsia"
DLSIA TUNet_0.0.1: "dlsia"
DLSIA TUNet3+_0.0.1: "dlsia"
Regarding versioning, there are two types to consider. You may have noticed the custom version defined in models.json (e.g., 0.0.1), and the version number used by MLflow (e.g., Version 1, Version 2, which are integer-based). In config.yml, I’m using our custom-defined version.
Not directly related—but for full algorithm version support, there will need to be substantial changes in highres_segmentation (front-end UI, params_list, and algorithm registration), as well as on the Prefect worker side. These updates are not implemented yet and can be addressed in a future PR.
Refactored Prefect worker to use a job routing architecture:
parent_flow.pyto act as a job router that:params_listas inputsworker.name(hard-coded for now, may check workload of HPCs to decide it)MLflowalgorithm registry for target environmentconfig.ymlinstead ofio_parametersrun_deployment()instead of directly calllaunch_XX())start_xx_child_worker.shfor foreground worker executionstart_xx_child_worker_background.shfor background execution withPIDtracking.envinstead ofio_parameters.docker_pool(type: process) for Docker-based jobspodman_pool(type: process) for Podman-based jobsconda_pool(type: process) for Conda environment jobsslurm_pool(type: process) for Slurm jobsparent_pool(type: process) for parent job routerPIDsfor easy trackingPIDssaved for simplified worker managementdetermine_best_environment()function inutils.pystill needs improvement in a future PR.Ideally, it should evaluate the workload across all available resources (e.g., ALS cluster Ball, NSLS-II, NERSC, ALCF) and automatically select the optimal environment to run different job types (e.g.,
Docker,Podman,Slurm) on the corresponding resources.For now, we’ve fixed
worker.nametoalsinconfig.yml, which will start aDockerjob.This environment-selection mechanism is the key feature we plan to implement in the next phase.
Companion PRs: Data Clinic #17 and LSE #43