Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5f3ac9e
make parent_flow as job router
xiaoyachong Apr 29, 2025
46658c6
make parent_flow as job router
xiaoyachong Apr 29, 2025
a7b4884
separate parent and child workers
xiaoyachong May 4, 2025
f39f941
Fix parent flow failure propagation
xiaoyachong May 7, 2025
e825b4f
update job params list
xiaoyachong Jun 29, 2025
f2c75ea
address pr comments
xiaoyachong Aug 30, 2025
3e889ad
upgrade prefect version
xiaoyachong Sep 13, 2025
46b5271
add mlflow
xiaoyachong Nov 3, 2025
8683939
change type from process to docker for docker_pool
xiaoyachong Nov 4, 2025
64fceca
update env and readme
xiaoyachong Nov 4, 2025
8d27648
use run_process for docker
xiaoyachong Nov 4, 2025
f1d83d8
add conda env for segmentation
xiaoyachong Nov 12, 2025
7f96d71
update env
xiaoyachong Nov 12, 2025
e842660
read credentials from env
xiaoyachong Nov 13, 2025
4e47fb0
refactor parent flow
xiaoyachong Nov 13, 2025
b944dbb
load credentials in child flow
xiaoyachong Nov 13, 2025
132f171
remove unified child worker
xiaoyachong Nov 13, 2025
9624877
update scripts
xiaoyachong Nov 16, 2025
5633e33
update conda flow
xiaoyachong Nov 17, 2025
1c5ba4f
change docker_pool type back to process
xiaoyachong Nov 19, 2025
6b2a4e3
update config
xiaoyachong Nov 19, 2025
f51ff59
fix conda
xiaoyachong Nov 19, 2025
556b15f
clean up
xiaoyachong Nov 19, 2025
50d21c7
update readme
xiaoyachong Nov 19, 2025
2a06c3c
add child status check
xiaoyachong Nov 20, 2025
bd0522b
update typer version
xiaoyachong Nov 21, 2025
25a496b
add version for conda env
xiaoyachong Nov 21, 2025
5c00dfb
update volumes
xiaoyachong Nov 23, 2025
bb33da0
update conda env
xiaoyachong Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ PREFECT_WORK_DIR=$PWD

# Path to conda
CONDA_PATH=/opt/miniconda3

# MLFlow
MLFLOW_TRACKING_URI=http://localhost:5000
MLFLOW_TRACKING_USERNAME=admin
MLFLOW_TRACKING_PASSWORD=<secure password>
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,6 @@ cython_debug/
#.idea/


.vscode/
.vscode/
.DS_Store
logs/
384 changes: 352 additions & 32 deletions flows/parent_flow.py

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions prefect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ deployments:
entrypoint: flows/podman/podman_flows.py:launch_podman
parameters: {}
work_pool:
name: mlex_pool
name: podman_pool
work_queue_name: default-queue
job_variables: {}
schedule:
is_schedule_active: true
- name: launch_conda
version: 0.1.0
tags: []
description: Launch podman container
description: Launch conda environment
entrypoint: flows/conda/conda_flows.py:launch_conda
parameters: {}
work_pool:
name: mlex_pool
name: conda_pool
work_queue_name: default-queue
job_variables: {}
schedule:
Expand All @@ -41,7 +41,7 @@ deployments:
entrypoint: flows/slurm/slurm_flows.py:launch_slurm
parameters: {}
work_pool:
name: mlex_pool
name: slurm_pool
work_queue_name: default-queue
job_variables: {}
schedule:
Expand All @@ -53,7 +53,7 @@ deployments:
entrypoint: flows/docker/docker_flows.py:launch_docker
parameters: {}
work_pool:
name: mlex_pool
name: docker_pool
work_queue_name: default-queue
job_variables: {}
schedule:
Expand All @@ -69,4 +69,4 @@ deployments:
work_queue_name: default-queue
job_variables: {}
schedule:
is_schedule_active: true
is_schedule_active: true
28 changes: 28 additions & 0 deletions start_child_worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
source .env

export PREFECT_WORK_DIR=$PREFECT_WORK_DIR
prefect config set PREFECT_API_URL=$PREFECT_API_URL

# 1. Create work pools for job type docker
prefect work-pool create docker_pool --type "process"
prefect work-pool set-concurrency-limit docker_pool $PREFECT_WORK_POOL_CONCURRENCY
PREFECT_WORKER_WEBSERVER_PORT=8081 prefect worker start --pool docker_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck &

# 2. Create work pools for job type docker
prefect work-pool create podman_pool --type "process"
prefect work-pool set-concurrency-limit podman_pool $PREFECT_WORK_POOL_CONCURRENCY
PREFECT_WORKER_WEBSERVER_PORT=8082 prefect worker start --pool podman_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck &

# 3. Create work pools for job type conda
prefect work-pool create conda_pool --type "process"
prefect work-pool set-concurrency-limit conda_pool $PREFECT_WORK_POOL_CONCURRENCY
PREFECT_WORKER_WEBSERVER_PORT=8083 prefect worker start --pool conda_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck &

# 4. Create work pools for job type slurm
prefect work-pool create slurm_pool --type "process"
prefect work-pool set-concurrency-limit slurm_pool $PREFECT_WORK_POOL_CONCURRENCY
PREFECT_WORKER_WEBSERVER_PORT=8084 prefect worker start --pool slurm_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck &

echo "All workers started"
wait
84 changes: 84 additions & 0 deletions start_child_worker_background.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/bin/bash

# Load environment variables from .env file
source .env

echo "Executing Folder: ${PWD}"
# Initialize conda
source "$CONDA_PATH/etc/profile.d/conda.sh"

# Start the worker command in the background, capture its PID, and assign the log file
(
export PREFECT_WORK_DIR=$PREFECT_WORK_DIR
export PYTHONPATH=$PWD:$PYTHONPATH
prefect config set PREFECT_API_URL=$PREFECT_API_URL

# Create log directory if it doesn't exist
mkdir -p logs

# 1. Create docker worker pool
prefect work-pool create docker_pool --type "process" || true
prefect work-pool set-concurrency-limit docker_pool $PREFECT_WORK_POOL_CONCURRENCY

# Start the docker worker with logs that include PID
PREFECT_WORKER_WEBSERVER_PORT=8081 prefect worker start --pool docker_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck > "process_temp_docker.log" 2>&1 &
docker_pid=$!
# Rename the log file to include the actual PID of the worker process
docker_log="logs/docker_worker_${docker_pid}.log"
mv "process_temp_docker.log" "$docker_log"
echo "Started Docker worker with PID: $docker_pid and logging to $docker_log"

# 2. Create podman worker pool
prefect work-pool create podman_pool --type "process" || true
prefect work-pool set-concurrency-limit podman_pool $PREFECT_WORK_POOL_CONCURRENCY

# Start the podman worker with logs that include PID
PREFECT_WORKER_WEBSERVER_PORT=8082 prefect worker start --pool podman_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck > "process_temp_podman.log" 2>&1 &
podman_pid=$!
# Rename the log file to include the actual PID of the worker process
podman_log="logs/podman_worker_${podman_pid}.log"
mv "process_temp_podman.log" "$podman_log"
echo "Started Podman worker with PID: $podman_pid and logging to $podman_log"

# 3. Create conda worker pool
prefect work-pool create conda_pool --type "process" || true
prefect work-pool set-concurrency-limit conda_pool $PREFECT_WORK_POOL_CONCURRENCY

# Start the conda worker with logs that include PID
PREFECT_WORKER_WEBSERVER_PORT=8083 prefect worker start --pool conda_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck > "process_temp_conda.log" 2>&1 &
conda_pid=$!
# Rename the log file to include the actual PID of the worker process
conda_log="logs/conda_worker_${conda_pid}.log"
mv "process_temp_conda.log" "$conda_log"
echo "Started Conda worker with PID: $conda_pid and logging to $conda_log"

# 4. Create slurm worker pool
prefect work-pool create slurm_pool --type "process" || true
prefect work-pool set-concurrency-limit slurm_pool $PREFECT_WORK_POOL_CONCURRENCY

# Start the slurm worker with logs that include PID
PREFECT_WORKER_WEBSERVER_PORT=8084 prefect worker start --pool slurm_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck > "process_temp_slurm.log" 2>&1 &
slurm_pid=$!
# Rename the log file to include the actual PID of the worker process
slurm_log="logs/slurm_worker_${slurm_pid}.log"
mv "process_temp_slurm.log" "$slurm_log"
echo "Started Slurm worker with PID: $slurm_pid and logging to $slurm_log"

echo "All child workers started in background"

# Create a pid file with all workers for easy termination later
echo "$docker_pid $podman_pid $conda_pid $slurm_pid" > logs/child_workers_pids.txt
echo "PIDs saved to logs/child_workers_pids.txt for future reference"

# Output summary
echo ""
echo "===== Child Workers Summary ====="
echo "Docker worker: PID $docker_pid, Port 8081, Log: $docker_log"
echo "Podman worker: PID $podman_pid, Port 8082, Log: $podman_log"
echo "Conda worker: PID $conda_pid, Port 8083, Log: $conda_log"
echo "Slurm worker: PID $slurm_pid, Port 8084, Log: $slurm_log"
echo "=============================="
echo ""
echo "To view logs, use: tail -f $docker_log"
echo "To stop workers, run: kill \$(cat logs/child_workers_pids.txt)"
)
3 changes: 3 additions & 0 deletions start_worker.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
source .env
export MLFLOW_TRACKING_URI
export MLFLOW_TRACKING_USERNAME
export MLFLOW_TRACKING_PASSWORD

export PREFECT_WORK_DIR=$PREFECT_WORK_DIR
prefect config set PREFECT_API_URL=$PREFECT_API_URL
Expand Down
28 changes: 18 additions & 10 deletions start_worker_background.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,30 @@ source "$CONDA_PATH/etc/profile.d/conda.sh"
# Start the worker command in the background, capture its PID, and assign the log file
(
export PREFECT_WORK_DIR=$PREFECT_WORK_DIR
export PYTHONPATH=$PWD:$PYTHONPATH
prefect config set PREFECT_API_URL=$PREFECT_API_URL

prefect work-pool create mlex_pool --type "process"
# Create logs directory if it doesn't exist
mkdir -p logs

# Create mlex pool for parent worker
prefect work-pool create mlex_pool --type "process" || true
prefect work-pool set-concurrency-limit mlex_pool $PREFECT_WORK_POOL_CONCURRENCY

# Deploy all flows
prefect deploy --all

# Create a log file
log_file="process$$.log"

# Start the worker process, redirecting stdout and stderr to a temporary log file
prefect worker start --pool mlex_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck &> "process_temp.log" &
# Start the parent worker process, redirecting stdout and stderr to a temporary log file
PREFECT_WORKER_WEBSERVER_PORT=8080 prefect worker start --pool mlex_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck &> "process_temp_parent.log" &
pid_worker=$!

# Rename the log file to include the actual PID of the worker process
log_file="process${pid_worker}.log"
mv "process_temp.log" "$log_file"
log_file="logs/parent_worker_${pid_worker}.log"
mv "process_temp_parent.log" "$log_file"

# Save the PID for easy reference
echo "$pid_worker" > logs/parent_worker_pid.txt

echo "Started Prefect worker with PID: $pid_worker and logging to $log_file"
)
echo "Started parent Prefect worker with PID: $pid_worker and logging to $log_file"
echo "To stop this worker, run: kill \$(cat logs/parent_worker_pid.txt)"
)