diff --git a/.env.example b/.env.example index ede5eb5..3072327 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,6 @@ + +# Prefect Configuration +# ===================== # The maximum number of flow runs to run simultaneously in the work pool. # Replace with the desired concurrency level (integer). PREFECT_WORK_POOL_CONCURRENCY=4 @@ -14,5 +17,32 @@ PREFECT_API_URL=http://localhost:4200/api # Replace with the path to your desired working directory. PREFECT_WORK_DIR=$PWD + +# Conda Configuration +# ===================== # Path to conda CONDA_PATH=/opt/miniconda3 +# Base directory where algorithm repositories are stored +# Each algorithm should be in its own subdirectory (e.g., folder_name=mlex_dlsia_segmentation_prototype) +# The worker will cd into ALGORITHMS_DIR/folder_name before running the algorithm +ALGORITHMS_DIR= + +# Docker Container Configuration +# ======================== +CONTAINER_NETWORK=mle_net +TILED_STORAGE_DIR= #Data clinic/LSE only: reads from local filesystem, not Tiled + +# MLflow Configuration +# ===================== +MLFLOW_TRACKING_URI=http://localhost:5000 +MLFLOW_TRACKING_USERNAME=admin +MLFLOW_TRACKING_PASSWORD= + +# Tiled API Keys +# ============== +# These will be automatically injected into child flows based on which URIs are present +# Key for data input Tiled instance +DATA_TILED_API_KEY=your_data_tiled_api_key +MASK_TILED_API_KEY=your_mask_tiled_api_key +SEG_TILED_API_KEY=your_seg_tiled_api_key +RESULTS_TILED_API_KEY=your_results_tiled_api_key \ No newline at end of file diff --git a/.gitignore b/.gitignore index 75e5d3a..fa4252f 100644 --- a/.gitignore +++ b/.gitignore @@ -160,4 +160,8 @@ cython_debug/ #.idea/ -.vscode/ \ No newline at end of file +.vscode/ +.DS_Store +logs/ + +tmp/ \ No newline at end of file diff --git a/README.md b/README.md index 7abaf2e..a24b5d1 100644 --- a/README.md +++ b/README.md @@ -1,39 +1,83 @@ # mlex_prefect_worker -This repository contains the necessary scripts and configuration files to start a Prefect process worker with conda, Podman and Slurm flows. +This repository contains the necessary scripts and configuration files to start a Prefect process worker with conda, Docker, Podman and Slurm flows. ## Getting Started -1. Create a conda environment with the required packages: +### 1. Create and activate conda environment - ```bash - conda create --name myenv python=3.11 - ``` +Create a conda environment with the required packages: +```bash +conda create --name myenv python=3.11 +conda activate myenv +``` -2. Activate the conda environment: +### 2. Install the package - ```bash - conda activate myenv - ``` +This will install dependencies: +```bash +python -m pip install . +``` -3. Install the package. This will install dependencies. +### 3. Configure environment variables - ```bash - python -m pip install . - ``` +Copy the example environment file and update it with your settings: +```bash +cp .env.example .env +# Edit .env with your configuration +``` -4. Change permissions for the shell scripts to make them executable: +### 4. Make shell scripts executable - ```bash - chmod +x start_worker.sh - chmod +x flows/podman/bash_run_podman.sh - ``` +Change permissions for the shell scripts: +```bash +chmod +x start_parent_worker.sh +chmod +x start_docker_child_worker.sh +``` -5. Run the `start_worker.sh` script to start the Prefect worker: +### 5. Start workers - ```bash - ./start_worker.sh - ``` +#### Parent Worker (required) +The parent worker orchestrates job routing and execution: +```bash +./start_parent_worker.sh +``` + +#### Docker Child Worker (optional) +For Docker-based job execution: +```bash +./start_docker_child_worker.sh +``` + +## Worker Types + +This repository supports multiple execution environments: + +- **Conda**: Local execution in conda environments +- **Docker**: Containerized execution using Docker +- **Podman**: Containerized execution using Podman +- **Slurm**: HPC cluster execution via Slurm scheduler + +The parent worker automatically routes jobs to the appropriate execution environment based on the `worker.name` setting in `config.yml`. + +## Configuration + +Edit `config.yml` to configure: +- Worker type selection: facility names map to execution types (als→docker, nersc→slurm, nsls-ii→podman, conda→conda) +- Conda environment mappings +- Container volume mounts and networks +- Slurm job parameters + +## Monitoring + +Worker logs are stored in the `logs/` directory with the process ID in the filename: +```bash +# View Docker worker logs +tail -f logs/docker_worker_.log + +# Stop Docker worker +kill $(cat logs/docker_worker_pid.txt) +``` ## Copyright MLExchange Copyright (c) 2024, The Regents of the University of California, diff --git a/config.yml b/config.yml new file mode 100644 index 0000000..93b962c --- /dev/null +++ b/config.yml @@ -0,0 +1,33 @@ +# MLExchange Job Configuration +# Default worker configuration +worker: + name: "als" # Environment name ("als"->docker, "nersc"->slurm, "nsls-ii"->podman, "conda"->conda) + type: "docker" # Execution type (should match the name) +# Configuration for different job types + +# Conda environment settings +conda: + conda_env_name: + DLSIA MSDNet_0.0.1: "dlsia" + DLSIA TUNet_0.0.1: "dlsia" + DLSIA TUNet3+_0.0.1: "dlsia" + DLSIA MSDNet_0.1.0: "dlsia_0.1.0" + DLSIA TUNet_0.1.0: "dlsia_0.1.0" + DLSIA TUNet3+_0.1.0: "dlsia_0.1.0" + +# Docker/Podman container settings +container: + # Use environment variables for dynamic paths + # These will be expanded at runtime from .env file + volumes: + - "${TILED_STORAGE_DIR}:/tiled_storage" + network: "${CONTAINER_NETWORK}" + +# Slurm job scheduler settings +slurm: + num_nodes: 1 + partitions: '["p_cpu1", "p_cpu2"]' + reservations: '["r_cpu1", "r_cpu2"]' + max_time: "1:00:00" # Format: HH:MM:SS + forward_ports: '["8888:8888"]' + submission_ssh_key: "~/.ssh/id_rsa" \ No newline at end of file diff --git a/flows/conda/conda_flows.py b/flows/conda/conda_flows.py index c84d6b5..6c925bb 100644 --- a/flows/conda/conda_flows.py +++ b/flows/conda/conda_flows.py @@ -7,6 +7,7 @@ from flows.conda.schema import CondaParams from flows.logger import setup_logger +from flows.credentials import add_credentials_to_io_parameters @flow(name="launch_conda") @@ -28,6 +29,9 @@ async def launch_conda( # Append current flow run id conda_params.params["io_parameters"]["uid_save"] = current_flow_run_id + # Add credentials to io_parameters at the child flow level + conda_params.params = add_credentials_to_io_parameters(conda_params.params) + # Create temporary file for parameters with tempfile.NamedTemporaryFile(mode="w+t") as temp_file: yaml.dump(conda_params.params, temp_file) @@ -37,6 +41,7 @@ async def launch_conda( conda_params.conda_env_name, conda_params.python_file_name, temp_file.name, + conda_params.folder_name, # Pass folder name to script ] logger.info(f"Launching with command: {cmd}") process = await run_process(cmd, stream_output=True) diff --git a/flows/conda/run_conda.sh b/flows/conda/run_conda.sh index 0934fdc..8cb6f2c 100755 --- a/flows/conda/run_conda.sh +++ b/flows/conda/run_conda.sh @@ -2,18 +2,26 @@ source .env # Check if all arguments are provided -if [ $# -ne 3 ]; then - echo "Usage: $0 python " +if [ $# -ne 4 ]; then + echo "Usage: $0 " exit 1 fi source "$CONDA_PATH/etc/profile.d/conda.sh" + # Assign arguments to variables conda_environment=$1 python_file=$2 yaml_file=$3 +folder_name=$4 + +# Change to algorithm directory if ALGORITHMS_DIR is set +if [ -n "$ALGORITHMS_DIR" ] && [ -n "$folder_name" ]; then + cd "$ALGORITHMS_DIR/$folder_name" || exit 1 + echo "Working directory: $(pwd)" +fi -echo Calling model with: conda run --no-capture-output -n $conda_environment python $python_file $yaml_file +echo "Calling model with: conda run --no-capture-output -n $conda_environment python $python_file $yaml_file" # Call python with the python file and yaml file as arguments conda run --no-capture-output -n $conda_environment python $python_file $yaml_file diff --git a/flows/conda/schema.py b/flows/conda/schema.py index 5fddfdc..1af98a1 100644 --- a/flows/conda/schema.py +++ b/flows/conda/schema.py @@ -8,6 +8,9 @@ class CondaParams(BaseModel): python_file_name: str = Field( description="Python file to run", default="src/train.py" ) + folder_name: str = Field( + description="Folder name extracted from image_name", default="" + ) params: Optional[dict] = {} class Config: diff --git a/flows/credentials.py b/flows/credentials.py new file mode 100644 index 0000000..df0a02d --- /dev/null +++ b/flows/credentials.py @@ -0,0 +1,58 @@ +""" +Credential management utilities for child flows. +This module has minimal dependencies and can be imported by all flow types. +""" +import os +from dotenv import load_dotenv + +# Load .env file at module import +load_dotenv() + +# MLflow connection parameters - load from environment variables +MLFLOW_TRACKING_USERNAME = os.getenv("MLFLOW_TRACKING_USERNAME", "") +MLFLOW_TRACKING_PASSWORD = os.getenv("MLFLOW_TRACKING_PASSWORD", "") + +# Tiled API keys - load from environment variables +DATA_TILED_API_KEY = os.getenv("DATA_TILED_API_KEY", "") +MASK_TILED_API_KEY = os.getenv("MASK_TILED_API_KEY", "") +SEG_TILED_API_KEY = os.getenv("SEG_TILED_API_KEY", "") +RESULTS_TILED_API_KEY = os.getenv("RESULTS_TILED_API_KEY", "") + + +def add_credentials_to_io_parameters(params: dict) -> dict: + """ + Add credentials to io_parameters that were removed from the application. + This function intelligently adds only the credentials that are needed based on + which URIs are present in the io_parameters. + + Args: + params: Parameters dictionary containing io_parameters + + Returns: + Updated parameters dictionary with credentials added + """ + if "io_parameters" not in params: + params["io_parameters"] = {} + + io_params = params["io_parameters"] + + # Mapping of URI keys to their corresponding API key environment variables + tiled_uri_to_key_mapping = { + "data_tiled_uri": ("data_tiled_api_key", DATA_TILED_API_KEY), + "mask_tiled_uri": ("mask_tiled_api_key", MASK_TILED_API_KEY), + "seg_tiled_uri": ("seg_tiled_api_key", SEG_TILED_API_KEY), + "results_tiled_uri": ("results_tiled_api_key", RESULTS_TILED_API_KEY), + } + + # Add Tiled API keys only if corresponding URI exists and key is not already set + for uri_key, (api_key_name, api_key_value) in tiled_uri_to_key_mapping.items(): + if uri_key in io_params and api_key_name not in io_params: + io_params[api_key_name] = api_key_value + + # Add MLflow credentials only if not already present + if "mlflow_tracking_username" not in io_params: + io_params["mlflow_tracking_username"] = MLFLOW_TRACKING_USERNAME + if "mlflow_tracking_password" not in io_params: + io_params["mlflow_tracking_password"] = MLFLOW_TRACKING_PASSWORD + + return params \ No newline at end of file diff --git a/flows/docker/bash_run_docker.sh b/flows/docker/bash_run_docker.sh index 948e2ff..5a158fe 100755 --- a/flows/docker/bash_run_docker.sh +++ b/flows/docker/bash_run_docker.sh @@ -49,4 +49,4 @@ if [ "$?" -ne 0 ]; then fi echo "Successfully launched Docker container" -exit 0 +exit 0 \ No newline at end of file diff --git a/flows/docker/docker_flows.py b/flows/docker/docker_flows.py index 147f201..735a941 100644 --- a/flows/docker/docker_flows.py +++ b/flows/docker/docker_flows.py @@ -1,5 +1,5 @@ import tempfile - +import os import yaml from prefect import context, flow from prefect.states import Failed @@ -7,6 +7,7 @@ from flows.docker.schema import DockerParams from flows.logger import setup_logger +from flows.credentials import add_credentials_to_io_parameters @flow(name="Docker flow") @@ -28,6 +29,8 @@ async def launch_docker( # Append current flow run id docker_params.params["io_parameters"]["uid_save"] = current_flow_run_id + # Add credentials to io_parameters at the child flow level + docker_params.params = add_credentials_to_io_parameters(docker_params.params) # Create temporary file for parameters with tempfile.NamedTemporaryFile(mode="w+t") as temp_file: yaml.dump(docker_params.params, temp_file) @@ -54,4 +57,4 @@ async def launch_docker( if process.returncode != 0: return Failed(message="Docker command failed") - return current_flow_run_id + return current_flow_run_id \ No newline at end of file diff --git a/flows/parent_flow.py b/flows/parent_flow.py index 0213a52..8c83d1e 100644 --- a/flows/parent_flow.py +++ b/flows/parent_flow.py @@ -1,47 +1,284 @@ -from enum import Enum +import logging +import os +import json -from prefect import flow, get_run_logger +from prefect import flow, task, get_run_logger +from prefect.deployments import run_deployment -from flows.conda.conda_flows import launch_conda -from flows.docker.docker_flows import launch_docker -from flows.podman.podman_flows import launch_podman -from flows.slurm.slurm_flows import launch_slurm +# Import the Prefect client to check flow run states +from prefect.client import get_client +# Import schema classes for validation +from flows.conda.schema import CondaParams +from flows.docker.schema import DockerParams +from flows.podman.schema import PodmanParams +from flows.slurm.schema import SlurmParams -class FlowType(str, Enum): - podman = "podman" - conda = "conda" - slurm = "slurm" - docker = "docker" +# Import utility functions and constants +from flows.utils import ( + FlowType, + load_config, + determine_best_environment, + get_algorithm_details_from_mlflow, + extract_folder_name_from_image, +) + +logger = logging.getLogger(__name__) + + +@task +def determine_best_environment_task(hpc_type: str) -> FlowType: + """ + Determine the best execution environment based on hpc_type. + + Args: + hpc_type: Type of HPC to execute on + + Returns: + Best flow type to use + """ + return determine_best_environment(hpc_type) + + +@task +def get_algorithm_details_from_mlflow_task(model_name: str, config: dict): + """ + Retrieve algorithm details from MLflow using the model name. + + Args: + model_name: The name of the model in MLflow + config: Configuration dictionary from config.yml + + Returns: + Tuple containing (algorithm_details, job_details) + """ + return get_algorithm_details_from_mlflow(model_name, config) @flow(name="Parent flow") -async def launch_parent_flow( - flow_type: FlowType, - params_list: list[dict], -): +async def launch_parent_flow(params_list: list[dict]): + """ + Smart job router that automatically selects the best execution environment + based on the worker configuration and loads algorithm details from MLflow. + + Args: + params_list: List of parameters for the job, each containing model_name and task_name + """ prefect_logger = get_run_logger() - + client = get_client() + + # Load configuration from file (with env vars expanded) + config = load_config() + + # Get worker configuration from config, default to "als" if not specified + worker_config = config.get("worker", {}) + worker_name = worker_config.get("name", "als") + prefect_logger.info(f"Starting job router (parent flow) for worker: {worker_name}") + + # Auto-select environment based on worker_type if specified, otherwise use worker_name + target_env = determine_best_environment_task(worker_name) + prefect_logger.info(f"Selected target environment: {target_env}") + + # Execute each step in sequence based on the selected environment flow_run_id = "" - for params in params_list: - if flow_type == FlowType.podman: - flow_run_id = await launch_podman( - podman_params=params, prev_flow_run_id=flow_run_id - ) - elif flow_type == FlowType.conda: - flow_run_id = await launch_conda( - conda_params=params, prev_flow_run_id=flow_run_id - ) - elif flow_type == FlowType.slurm: - flow_run_id = await launch_slurm( - slurm_params=params, prev_flow_run_id=flow_run_id - ) - elif flow_type == FlowType.docker: - flow_run_id = await launch_docker( - docker_params=params, prev_flow_run_id=flow_run_id - ) - else: - prefect_logger.error("Flow type not supported") - raise ValueError("Flow type not supported") - - pass + + for i, child_job_params in enumerate(params_list): + prefect_logger.info(f"Running step {i+1} of {len(params_list)}") + + try: + # Get model name and task + model_name = child_job_params.get("model_name", "") + task_name = child_job_params.get("task_name", "") + params = child_job_params.get("params", {}) + + # NOTE: Credentials are NO LONGER added here - they will be added in child flows + + # Get algorithm details and job details from MLflow + algorithm_details, job_details = get_algorithm_details_from_mlflow_task(model_name, config) + + # Extract folder name from image_name + folder_name = extract_folder_name_from_image(algorithm_details.get("image_name", "")) + + # Get the appropriate python file name based on the task name + if task_name == "execute": + python_file = algorithm_details.get("python_file", "") + elif task_name == "train": + python_file = algorithm_details.get("python_file_train", "") + elif task_name == "inference": + python_file = algorithm_details.get("python_file_inference", "") + elif task_name == "tune": + python_file = algorithm_details.get("python_file_tune", "") + else: + # For any other task, default to python_file + python_file = algorithm_details.get("python_file", "") + + if not python_file: + prefect_logger.error(f"No Python file found for task {task_name}") + raise ValueError(f"No Python file found for task {task_name}") + + if target_env == FlowType.conda: + # Prepare conda parameters - use job_details for conda_env + conda_relevant_params = { + "conda_env_name": job_details["conda_env"], + "python_file_name": python_file, + "folder_name": folder_name, + "params": params + } + # Validate parameters with the schema + conda_params = CondaParams(**conda_relevant_params) + # If there's a previous flow run ID, set it in the parameters + if flow_run_id: + if "io_parameters" not in conda_params.params: + conda_params.params["io_parameters"] = {} + conda_params.params["io_parameters"]["uid_retrieve"] = flow_run_id + + # Run the conda deployment with parameters + deployment_data = { + "conda_params": conda_params.dict(), + "prev_flow_run_id": flow_run_id + } + flow_run = await run_deployment( + name="launch_conda/launch_conda", + parameters=deployment_data, + poll_interval=60 + ) + + if flow_run.state.is_failed(): + raise RuntimeError(f"Child flow failed at step {i+1}") + + flow_run_id = str(flow_run.id) + + elif target_env == FlowType.docker: + # Prepare docker parameters - use algorithm_details for image info and job_details for environment + docker_relevant_params = { + "image_name": algorithm_details["image_name"], + "image_tag": algorithm_details["image_tag"], + "command": f"python {python_file}", + "volumes": job_details["volumes"], + "network": job_details["network"], + "env_vars": {}, + "params": params + } + # Validate parameters with the schema + docker_params = DockerParams(**docker_relevant_params) + # If there's a previous flow run ID, set it in the parameters + if flow_run_id: + if "io_parameters" not in docker_params.params: + docker_params.params["io_parameters"] = {} + docker_params.params["io_parameters"]["uid_retrieve"] = flow_run_id + + # Run the docker deployment with parameters + deployment_data = { + "docker_params": docker_params.dict(), + "prev_flow_run_id": flow_run_id + } + flow_run = await run_deployment( + name="Docker flow/launch_docker", + parameters=deployment_data, + poll_interval=60 + ) + + if flow_run.state.is_failed(): + raise RuntimeError(f"Child flow failed at step {i+1}") + + flow_run_id = str(flow_run.id) + + elif target_env == FlowType.podman: + # Prepare podman parameters - use algorithm_details for image info and job_details for environment + podman_relevant_params = { + "image_name": algorithm_details["image_name"], + "image_tag": algorithm_details["image_tag"], + "command": f"python {python_file}", + "volumes": job_details["volumes"], + "network": job_details["network"], + "env_vars": {}, + "params": params + } + # Validate parameters with the schema + podman_params = PodmanParams(**podman_relevant_params) + # If there's a previous flow run ID, set it in the parameters + if flow_run_id: + if "io_parameters" not in podman_params.params: + podman_params.params["io_parameters"] = {} + podman_params.params["io_parameters"]["uid_retrieve"] = flow_run_id + + # Run the podman deployment with parameters + deployment_data = { + "podman_params": podman_params.dict(), + "prev_flow_run_id": flow_run_id + } + flow_run = await run_deployment( + name="Podman flow/launch_podman", + parameters=deployment_data, + poll_interval=60 + ) + + if flow_run.state.is_failed(): + raise RuntimeError(f"Child flow failed at step {i+1}") + + flow_run_id = str(flow_run.id) + + elif target_env == FlowType.slurm: + # Parse string JSON values if needed + partitions = job_details["partitions"] + if isinstance(partitions, str): + partitions = json.loads(partitions) + + reservations = job_details["reservations"] + if isinstance(reservations, str): + reservations = json.loads(reservations) + + forward_ports = job_details["forward_ports"] + if isinstance(forward_ports, str): + forward_ports = json.loads(forward_ports) + + # Prepare slurm parameters - use job_details for slurm configuration + slurm_relevant_params = { + "job_name": f"{model_name}_{task_name}", + "num_nodes": job_details["num_nodes"], + "partitions": partitions, + "reservations": reservations, + "max_time": job_details["max_time"], + "conda_env_name": job_details["conda_env"], + "forward_ports": forward_ports, + "submission_ssh_key": job_details["submission_ssh_key"], + "python_file_name": python_file, + "params": params + } + + # Validate parameters with the schema + slurm_params = SlurmParams(**slurm_relevant_params) + + # If there's a previous flow run ID, set it in the parameters + if flow_run_id: + if "io_parameters" not in slurm_params.params: + slurm_params.params["io_parameters"] = {} + slurm_params.params["io_parameters"]["uid_retrieve"] = flow_run_id + + # Run the slurm deployment with parameters + deployment_data = { + "slurm_params": slurm_params.dict(), + "prev_flow_run_id": flow_run_id + } + flow_run = await run_deployment( + name="launch_slurm/launch_slurm", + parameters=deployment_data, + poll_interval=60 + ) + + if flow_run.state.is_failed(): + raise RuntimeError(f"Child flow failed at step {i+1}") + + flow_run_id = str(flow_run.id) + + else: + raise ValueError("Flow type not supported") + + prefect_logger.info(f"Step {i+1} completed with flow run ID: {flow_run_id}") + + except Exception as e: + prefect_logger.error(f"Error in step {i+1}: {str(e)}") + raise + + prefect_logger.info(f"All steps completed successfully. Final flow run ID: {flow_run_id}") + return flow_run_id \ No newline at end of file diff --git a/flows/podman/podman_flows.py b/flows/podman/podman_flows.py index 1cc8b02..829789c 100644 --- a/flows/podman/podman_flows.py +++ b/flows/podman/podman_flows.py @@ -7,6 +7,7 @@ from flows.logger import setup_logger from flows.podman.schema import PodmanParams +from flows.credentials import add_credentials_to_io_parameters @flow(name="Podman flow") @@ -28,6 +29,9 @@ async def launch_podman( # Append current flow run id podman_params.params["io_parameters"]["uid_save"] = current_flow_run_id + # Add credentials to io_parameters at the child flow level + podman_params.params = add_credentials_to_io_parameters(podman_params.params) + # Create temporary file for parameters with tempfile.NamedTemporaryFile(mode="w+t") as temp_file: yaml.dump(podman_params.params, temp_file) diff --git a/flows/slurm/slurm_flows.py b/flows/slurm/slurm_flows.py index 818272e..4cdb223 100644 --- a/flows/slurm/slurm_flows.py +++ b/flows/slurm/slurm_flows.py @@ -8,6 +8,7 @@ from flows.logger import setup_logger from flows.slurm.schema import SlurmParams +from flows.credentials import add_credentials_to_io_parameters @flow(name="launch_slurm") @@ -26,6 +27,9 @@ async def launch_slurm( # Append current flow run id slurm_params.params["io_parameters"]["uid_save"] = current_flow_run_id + # Add credentials to io_parameters at the child flow level + slurm_params.params = add_credentials_to_io_parameters(slurm_params.params) + # Create temporary file for parameters with tempfile.NamedTemporaryFile(mode="w+t", dir=".") as temp_file: yaml.dump(slurm_params.params, temp_file) diff --git a/flows/utils.py b/flows/utils.py new file mode 100644 index 0000000..a592272 --- /dev/null +++ b/flows/utils.py @@ -0,0 +1,242 @@ +import logging +import os +import json +from enum import Enum + +import yaml +import mlflow +from mlflow.tracking import MlflowClient +from prefect import get_run_logger +from dotenv import load_dotenv + +# Load .env file at module import +load_dotenv() + +logger = logging.getLogger(__name__) + +# MLflow connection parameters - load from environment variables +MLFLOW_TRACKING_URI = os.getenv("MLFLOW_TRACKING_URI", "") +MLFLOW_TRACKING_USERNAME = os.getenv("MLFLOW_TRACKING_USERNAME", "") +MLFLOW_TRACKING_PASSWORD = os.getenv("MLFLOW_TRACKING_PASSWORD", "") + +# Path to configuration file +CONFIG_PATH = "config.yml" + + +class FlowType(str, Enum): + podman = "podman" + conda = "conda" + slurm = "slurm" + docker = "docker" + + +def expand_env_vars(obj): + """Recursively expand environment variables in nested structures""" + if isinstance(obj, dict): + return {k: expand_env_vars(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [expand_env_vars(item) for item in obj] + elif isinstance(obj, str): + # Expand ${VAR} and $VAR patterns + return os.path.expandvars(obj) + else: + return obj + + +def load_config(): + """ + Load the configuration from config.yml file and expand environment variables. + + Returns: + Dictionary containing configuration with expanded env vars + """ + try: + with open(CONFIG_PATH, 'r') as f: + config = yaml.safe_load(f) + + # Expand all environment variables + config = expand_env_vars(config) + + return config + except Exception as e: + logger.error(f"Error loading configuration from {CONFIG_PATH}: {str(e)}") + return {} + + +def extract_folder_name_from_image(image_name: str) -> str: + """ + Extract folder name from image_name. + For example: ghcr.io/mlexchange/mlex_dlsia_segmentation_prototype -> mlex_dlsia_segmentation_prototype + + Args: + image_name: Full image name from MLflow + + Returns: + Folder name extracted from image_name + """ + if not image_name: + return "" + + # Split by '/' and get the last part + parts = image_name.split('/') + if len(parts) > 0: + return parts[-1] + + return "" + + +def determine_best_environment(hpc_type: str): + """ + Determine the best execution environment based on hpc_type. + + Args: + hpc_type: Type of HPC to execute on (can be worker name or flow type) + + Returns: + Best flow type to use + """ + logger = get_run_logger() + + # Map HPC type to flow type + hpc_type = hpc_type.lower() + if hpc_type == "nersc": + logger.info(f"Worker type is NERSC, selecting SLURM") + return FlowType.slurm + elif hpc_type == "nsls-ii": + logger.info(f"Worker type is NSLS-II, selecting PODMAN") + return FlowType.podman + elif hpc_type == "als": + logger.info(f"Worker type is ALS cluster-ball, selecting DOCKER") + return FlowType.docker + elif hpc_type in [ft.value for ft in FlowType]: + # If the hpc_type is actually a flow type, use it directly + return FlowType(hpc_type) + else: + # Default to conda for unknown types + logger.info(f"Unknown worker type: {hpc_type}, defaulting to CONDA environment") + return FlowType.conda + + +def _get_conda_env_for_model(model_name: str, config: dict, model_version: str = "") -> str: + """ + Simple helper function to determine the appropriate conda environment for a model. + + Args: + model_name: The name of the model + config: The configuration dictionary from config.yml + model_version: The version of the model from MLflow tags (optional) + + Returns: + The appropriate conda environment name + """ + conda_envs = config.get("conda", {}).get("conda_env_name", {}) + + # Try direct lookup by model_name_version first if version is provided + if model_version: + versioned_key = f"{model_name}_{model_version}" + if versioned_key in conda_envs: + return conda_envs[versioned_key] + + # Try direct lookup by model name + if model_name in conda_envs: + return conda_envs[model_name] + + # Return empty string if no match found + return "" + + +def get_algorithm_details_from_mlflow(model_name: str, config: dict): + """ + Retrieve algorithm details from MLflow using the model name. + + Args: + model_name: The name of the model in MLflow + config: Configuration dictionary from config.yml + + Returns: + Tuple containing (algorithm_details, job_details) + """ + logger = get_run_logger() + logger.info(f"Retrieving details for model {model_name} from MLflow") + + # Log MLflow connection parameters for debugging + logger.info(f"MLflow Tracking URI: {MLFLOW_TRACKING_URI}") + logger.info(f"MLflow Username: {'Set' if MLFLOW_TRACKING_USERNAME else 'Not set'}") + logger.info(f"MLflow Password: {'Set' if MLFLOW_TRACKING_PASSWORD else 'Not set'}") + + # Set MLflow connection + os.environ["MLFLOW_TRACKING_USERNAME"] = MLFLOW_TRACKING_USERNAME + os.environ["MLFLOW_TRACKING_PASSWORD"] = MLFLOW_TRACKING_PASSWORD + mlflow.set_tracking_uri(MLFLOW_TRACKING_URI) + + try: + client = MlflowClient() + + # Get the latest version of the model + logger.info(f"Attempting to get latest versions for model: {model_name}") + versions = client.get_latest_versions(model_name) + if not versions: + logger.error(f"No versions found for model {model_name}") + raise ValueError(f"Model {model_name} not found in MLflow") + + version = versions[0] + logger.info(f"Found version {version.version} for model {model_name}") + + # Get the run to access parameters and tags + run = client.get_run(version.run_id) + logger.info(f"Retrieved run with ID: {run.info.run_id}") + + # Extract the relevant parameters + params = run.data.params + + # Extract algorithm version from tags + tags = run.data.tags + algorithm_version = tags.get("version", "") + logger.info(f"Algorithm version from tags: {algorithm_version}") + + # Get algorithm details from MLflow - only the core information + algorithm_details = { + "model_name": model_name, + # Core Algorithm Information + "image_name": params.get("image_name", ""), + "image_tag": params.get("image_tag", ""), + "source": params.get("source", ""), + "is_gpu_enabled": params.get("is_gpu_enabled", "False").lower() == "true" + } + + # Handle Python file paths + if "python_file_train" in params: + algorithm_details["python_file_train"] = params.get("python_file_train", "") + if "python_file_inference" in params: + algorithm_details["python_file_inference"] = params.get("python_file_inference", "") + if "python_file_tune" in params: + algorithm_details["python_file_tune"] = params.get("python_file_tune", "") + if "python_file" in params: + algorithm_details["python_file"] = params.get("python_file", "") + + # Create job details from config.yml (already expanded by load_config) + job_details = { + # Container settings + "volumes": config.get("container", {}).get("volumes", []), + "network": config.get("container", {}).get("network", ""), + # Slurm settings + "num_nodes": config.get("slurm", {}).get("num_nodes", 1), + "partitions": config.get("slurm", {}).get("partitions", "[]"), + "reservations": config.get("slurm", {}).get("reservations", "[]"), + "max_time": config.get("slurm", {}).get("max_time", "1:00:00"), + "submission_ssh_key": config.get("slurm", {}).get("submission_ssh_key", ""), + "forward_ports": config.get("slurm", {}).get("forward_ports", "[]"), + # Get conda environment based on the model type and version from tags + "conda_env": _get_conda_env_for_model(model_name, config, algorithm_version) + } + + logger.info(f"Successfully retrieved details for model {model_name}") + return algorithm_details, job_details + + except Exception as e: + logger.error(f"Error retrieving algorithm details from MLflow: {str(e)}") + + # Print the full exception traceback for better debugging + import traceback + logger.error(f"Traceback: {traceback.format_exc()}") + raise \ No newline at end of file diff --git a/prefect.yaml b/prefect.yaml index 36ed745..3c5bc8f 100644 --- a/prefect.yaml +++ b/prefect.yaml @@ -1,10 +1,13 @@ name: mlex_prefect_worker -prefect-version: 2.14.0 +prefect-version: 3.4.2 -build: +# build section allows you to manage and build docker images +build: null -push: +# push section allows you to manage if and how this project is uploaded to remote locations +push: null +# pull section allows you to provide instructions for cloning this project in remote locations pull: - prefect.deployments.steps.set_working_directory: directory: '{{ $PREFECT_WORK_DIR }}' @@ -13,60 +16,65 @@ deployments: - name: launch_podman version: 0.1.0 tags: [] + concurrency_limit: null description: Launch podman container entrypoint: flows/podman/podman_flows.py:launch_podman parameters: {} work_pool: - name: mlex_pool + name: podman_pool work_queue_name: default-queue job_variables: {} - schedule: - is_schedule_active: true + schedules: [] + - name: launch_conda version: 0.1.0 tags: [] - description: Launch podman container + concurrency_limit: null + description: Launch conda environment entrypoint: flows/conda/conda_flows.py:launch_conda parameters: {} work_pool: - name: mlex_pool + name: conda_pool work_queue_name: default-queue job_variables: {} - schedule: - is_schedule_active: true + schedules: [] + - name: launch_slurm version: 0.1.0 tags: [] + concurrency_limit: null description: Launch slurm job entrypoint: flows/slurm/slurm_flows.py:launch_slurm parameters: {} work_pool: - name: mlex_pool + name: slurm_pool work_queue_name: default-queue job_variables: {} - schedule: - is_schedule_active: true + schedules: [] + - name: launch_docker version: 0.1.0 tags: [] + concurrency_limit: null description: Launch docker job entrypoint: flows/docker/docker_flows.py:launch_docker parameters: {} work_pool: - name: mlex_pool + name: docker_pool work_queue_name: default-queue job_variables: {} - schedule: - is_schedule_active: true + schedules: [] + + - name: launch_parent_flow version: 0.1.0 tags: [] + concurrency_limit: null description: Launch parent flow entrypoint: flows/parent_flow.py:launch_parent_flow parameters: {} work_pool: - name: mlex_pool + name: parent_pool work_queue_name: default-queue job_variables: {} - schedule: - is_schedule_active: true + schedules: [] \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 16139aa..8ca70c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,8 +13,11 @@ readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" dependencies = [ - "prefect==2.14.21", - "typer<=0.9.4", + "prefect==3.4.2", + "typer==0.15.4", + "mlflow==2.22.0", + "requests>=2.31.0", + "python-dotenv" ] [project.optional-dependencies] @@ -23,4 +26,4 @@ dev = [ "pytest-asyncio", "pre-commit", "flake8" -] +] \ No newline at end of file diff --git a/start_conda_child_worker.sh b/start_conda_child_worker.sh new file mode 100755 index 0000000..d2d4f35 --- /dev/null +++ b/start_conda_child_worker.sh @@ -0,0 +1,13 @@ +#!/bin/bash +source .env + +export PREFECT_WORK_DIR=$PREFECT_WORK_DIR +prefect config set PREFECT_API_URL=$PREFECT_API_URL + +# Create work pool for job type conda +prefect work-pool create conda_pool --type "process" +prefect work-pool update conda_pool --concurrency-limit $PREFECT_WORK_POOL_CONCURRENCY +prefect deploy -n launch_conda --pool conda_pool +PREFECT_WORKER_WEBSERVER_PORT=8083 prefect worker start --pool conda_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck + +echo "Conda worker started" \ No newline at end of file diff --git a/start_conda_child_worker_background.sh b/start_conda_child_worker_background.sh new file mode 100644 index 0000000..2cf4685 --- /dev/null +++ b/start_conda_child_worker_background.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +# Load environment variables from .env file +source .env + +echo "Executing Folder: ${PWD}" +# Initialize conda +source "$CONDA_PATH/etc/profile.d/conda.sh" + +# Start the worker command in the background, capture its PID, and assign the log file +( + export PREFECT_WORK_DIR=$PREFECT_WORK_DIR + export PYTHONPATH=$PWD:$PYTHONPATH + prefect config set PREFECT_API_URL=$PREFECT_API_URL + + # Create log directory if it doesn't exist + mkdir -p logs + + # Create conda worker pool + prefect work-pool create conda_pool --type "process" || true + prefect work-pool update conda_pool --concurrency-limit $PREFECT_WORK_POOL_CONCURRENCY + prefect deploy -n launch_conda --pool conda_pool + + # Start the conda worker with logs that include PID + PREFECT_WORKER_WEBSERVER_PORT=8083 prefect worker start --pool conda_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck > "process_temp_conda.log" 2>&1 & + conda_pid=$! + + # Rename the log file to include the actual PID of the worker process + conda_log="logs/conda_worker_${conda_pid}.log" + mv "process_temp_conda.log" "$conda_log" + + # Create a pid file for easy termination later + echo "$conda_pid" > logs/conda_worker_pid.txt + + echo "Started Conda worker with PID: $conda_pid and logging to $conda_log" + echo "To view logs, use: tail -f $conda_log" + echo "To stop worker, run: kill -9 \$(cat logs/conda_worker_pid.txt)" +) \ No newline at end of file diff --git a/start_docker_child_worker.sh b/start_docker_child_worker.sh new file mode 100755 index 0000000..15b33ef --- /dev/null +++ b/start_docker_child_worker.sh @@ -0,0 +1,13 @@ +#!/bin/bash +source .env + +export PREFECT_WORK_DIR=$PREFECT_WORK_DIR +prefect config set PREFECT_API_URL=$PREFECT_API_URL + +# Create work pool for job type docker +prefect work-pool create docker_pool --type "process" +prefect work-pool update docker_pool --concurrency-limit $PREFECT_WORK_POOL_CONCURRENCY +prefect deploy -n launch_docker --pool docker_pool +PREFECT_WORKER_WEBSERVER_PORT=8081 prefect worker start --pool docker_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck + +echo "Docker worker started" \ No newline at end of file diff --git a/start_docker_child_worker_background.sh b/start_docker_child_worker_background.sh new file mode 100755 index 0000000..34fedcb --- /dev/null +++ b/start_docker_child_worker_background.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +# Load environment variables from .env file +source .env + +echo "Executing Folder: ${PWD}" + +# Start the worker command in the background, capture its PID, and assign the log file +( + export PREFECT_WORK_DIR=$PREFECT_WORK_DIR + export PYTHONPATH=$PWD:$PYTHONPATH + prefect config set PREFECT_API_URL=$PREFECT_API_URL + + # Create log directory if it doesn't exist + mkdir -p logs + + # Create docker worker pool + prefect work-pool create docker_pool --type "process" || true + prefect work-pool update docker_pool --concurrency-limit $PREFECT_WORK_POOL_CONCURRENCY + prefect deploy -n launch_docker --pool docker_pool + + # Start the docker worker with logs that include PID + PREFECT_WORKER_WEBSERVER_PORT=8081 prefect worker start --pool docker_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck > "process_temp_docker.log" 2>&1 & + docker_pid=$! + + # Rename the log file to include the actual PID of the worker process + docker_log="logs/docker_worker_${docker_pid}.log" + mv "process_temp_docker.log" "$docker_log" + + # Create a pid file for easy termination later + echo "$docker_pid" > logs/docker_worker_pid.txt + + echo "Started Docker worker with PID: $docker_pid and logging to $docker_log" + echo "To view logs, use: tail -f $docker_log" + echo "To stop worker, run: kill -9 \$(cat logs/docker_worker_pid.txt)" +) \ No newline at end of file diff --git a/start_parent_worker.sh b/start_parent_worker.sh new file mode 100755 index 0000000..811c8bc --- /dev/null +++ b/start_parent_worker.sh @@ -0,0 +1,11 @@ +#!/bin/bash +source .env + +export PREFECT_WORK_DIR=$PREFECT_WORK_DIR +prefect config set PREFECT_API_URL=$PREFECT_API_URL + +prefect work-pool create parent_pool --type "process" +prefect work-pool update parent_pool --concurrency-limit $PREFECT_WORK_POOL_CONCURRENCY + +prefect deploy -n launch_parent_flow --pool parent_pool +prefect worker start --pool parent_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck \ No newline at end of file diff --git a/start_parent_worker_background.sh b/start_parent_worker_background.sh new file mode 100755 index 0000000..c1ac2f4 --- /dev/null +++ b/start_parent_worker_background.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +# Load environment variables from .env file +source .env + +echo "Executing Folder: ${PWD}" + +# Start the worker command in the background, capture its PID, and assign the log file +( + export PREFECT_WORK_DIR=$PREFECT_WORK_DIR + export PYTHONPATH=$PWD:$PYTHONPATH + prefect config set PREFECT_API_URL=$PREFECT_API_URL + + # Create logs directory if it doesn't exist + mkdir -p logs + + # Create parent pool for parent worker + prefect work-pool create parent_pool --type "process" + prefect work-pool update parent_pool --concurrency-limit $PREFECT_WORK_POOL_CONCURRENCY + + # Deploy parent flow with updated syntax for Prefect 3.4.2 + prefect deploy -n launch_parent_flow --pool parent_pool + + # Start the parent worker process, redirecting stdout and stderr to a temporary log file + PREFECT_WORKER_WEBSERVER_PORT=8080 prefect worker start --pool parent_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck &> "process_temp_parent.log" & + pid_worker=$! + + # Rename the log file to include the actual PID of the worker process + log_file="logs/parent_worker_${pid_worker}.log" + mv "process_temp_parent.log" "$log_file" + + # Save the PID for easy reference + echo "$pid_worker" > logs/parent_worker_pid.txt + + echo "Started parent Prefect worker with PID: $pid_worker and logging to $log_file" + echo "To stop this worker, run: kill -9 \$(cat logs/parent_worker_pid.txt)" +) \ No newline at end of file diff --git a/start_podman_child_worker.sh b/start_podman_child_worker.sh new file mode 100644 index 0000000..1d413c5 --- /dev/null +++ b/start_podman_child_worker.sh @@ -0,0 +1,13 @@ +#!/bin/bash +source .env + +export PREFECT_WORK_DIR=$PREFECT_WORK_DIR +prefect config set PREFECT_API_URL=$PREFECT_API_URL + +# Create work pool for job type podman +prefect work-pool create podman_pool --type "process" +prefect work-pool update podman_pool --concurrency-limit $PREFECT_WORK_POOL_CONCURRENCY +prefect deploy -n launch_podman --pool podman_pool +PREFECT_WORKER_WEBSERVER_PORT=8082 prefect worker start --pool podman_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck + +echo "Podman worker started" \ No newline at end of file diff --git a/start_podman_child_worker_background.sh b/start_podman_child_worker_background.sh new file mode 100644 index 0000000..1a651a7 --- /dev/null +++ b/start_podman_child_worker_background.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +# Load environment variables from .env file +source .env + +echo "Executing Folder: ${PWD}" +# Initialize conda +source "$CONDA_PATH/etc/profile.d/conda.sh" + +# Start the worker command in the background, capture its PID, and assign the log file +( + export PREFECT_WORK_DIR=$PREFECT_WORK_DIR + export PYTHONPATH=$PWD:$PYTHONPATH + prefect config set PREFECT_API_URL=$PREFECT_API_URL + + # Create log directory if it doesn't exist + mkdir -p logs + + # Create podman worker pool + prefect work-pool create podman_pool --type "process" || true + prefect work-pool update podman_pool --concurrency-limit $PREFECT_WORK_POOL_CONCURRENCY + prefect deploy -n launch_podman --pool podman_pool + + # Start the podman worker with logs that include PID + PREFECT_WORKER_WEBSERVER_PORT=8082 prefect worker start --pool podman_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck > "process_temp_podman.log" 2>&1 & + podman_pid=$! + + # Rename the log file to include the actual PID of the worker process + podman_log="logs/podman_worker_${podman_pid}.log" + mv "process_temp_podman.log" "$podman_log" + + # Create a pid file for easy termination later + echo "$podman_pid" > logs/podman_worker_pid.txt + + echo "Started Podman worker with PID: $podman_pid and logging to $podman_log" + echo "To view logs, use: tail -f $podman_log" + echo "To stop worker, run: kill -9 \$(cat logs/podman_worker_pid.txt)" +) \ No newline at end of file diff --git a/start_slurm_child_worker.sh b/start_slurm_child_worker.sh new file mode 100644 index 0000000..8dd1619 --- /dev/null +++ b/start_slurm_child_worker.sh @@ -0,0 +1,13 @@ +#!/bin/bash +source .env + +export PREFECT_WORK_DIR=$PREFECT_WORK_DIR +prefect config set PREFECT_API_URL=$PREFECT_API_URL + +# Create work pool for job type slurm +prefect work-pool create slurm_pool --type "process" +prefect work-pool update slurm_pool --concurrency-limit $PREFECT_WORK_POOL_CONCURRENCY +prefect deploy -n launch_slurm --pool slurm_pool +PREFECT_WORKER_WEBSERVER_PORT=8084 prefect worker start --pool slurm_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck + +echo "Slurm worker started" \ No newline at end of file diff --git a/start_slurm_child_worker_background.sh b/start_slurm_child_worker_background.sh new file mode 100644 index 0000000..4fac21b --- /dev/null +++ b/start_slurm_child_worker_background.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +# Load environment variables from .env file +source .env + +echo "Executing Folder: ${PWD}" +# Initialize conda +source "$CONDA_PATH/etc/profile.d/conda.sh" + +# Start the worker command in the background, capture its PID, and assign the log file +( + export PREFECT_WORK_DIR=$PREFECT_WORK_DIR + export PYTHONPATH=$PWD:$PYTHONPATH + prefect config set PREFECT_API_URL=$PREFECT_API_URL + + # Create log directory if it doesn't exist + mkdir -p logs + + # Create slurm worker pool + prefect work-pool create slurm_pool --type "process" || true + prefect work-pool update slurm_pool --concurrency-limit $PREFECT_WORK_POOL_CONCURRENCY + prefect deploy -n launch_slurm --pool slurm_pool + + # Start the slurm worker with logs that include PID + PREFECT_WORKER_WEBSERVER_PORT=8084 prefect worker start --pool slurm_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck > "process_temp_slurm.log" 2>&1 & + slurm_pid=$! + + # Rename the log file to include the actual PID of the worker process + slurm_log="logs/slurm_worker_${slurm_pid}.log" + mv "process_temp_slurm.log" "$slurm_log" + + # Create a pid file for easy termination later + echo "$slurm_pid" > logs/slurm_worker_pid.txt + + echo "Started Slurm worker with PID: $slurm_pid and logging to $slurm_log" + echo "To view logs, use: tail -f $slurm_log" + echo "To stop worker, run: kill -9 \$(cat logs/slurm_worker_pid.txt)" +) \ No newline at end of file diff --git a/start_worker.sh b/start_worker.sh deleted file mode 100755 index 1432470..0000000 --- a/start_worker.sh +++ /dev/null @@ -1,10 +0,0 @@ -source .env - -export PREFECT_WORK_DIR=$PREFECT_WORK_DIR -prefect config set PREFECT_API_URL=$PREFECT_API_URL - -prefect work-pool create mlex_pool --type "process" -prefect work-pool set-concurrency-limit mlex_pool $PREFECT_WORK_POOL_CONCURRENCY -prefect deploy --all - -prefect worker start --pool mlex_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck \ No newline at end of file diff --git a/start_worker_background.sh b/start_worker_background.sh deleted file mode 100644 index a195055..0000000 --- a/start_worker_background.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash - -# Load environment variables from .env file -source .env - -echo "Executing Folder: ${PWD}" -# Initialize conda -source "$CONDA_PATH/etc/profile.d/conda.sh" - -# Start the worker command in the background, capture its PID, and assign the log file -( - export PREFECT_WORK_DIR=$PREFECT_WORK_DIR - prefect config set PREFECT_API_URL=$PREFECT_API_URL - - prefect work-pool create mlex_pool --type "process" - prefect work-pool set-concurrency-limit mlex_pool $PREFECT_WORK_POOL_CONCURRENCY - prefect deploy --all - - # Create a log file - log_file="process$$.log" - - # Start the worker process, redirecting stdout and stderr to a temporary log file - prefect worker start --pool mlex_pool --limit $PREFECT_WORKER_LIMIT --with-healthcheck &> "process_temp.log" & - pid_worker=$! - - # Rename the log file to include the actual PID of the worker process - log_file="process${pid_worker}.log" - mv "process_temp.log" "$log_file" - - echo "Started Prefect worker with PID: $pid_worker and logging to $log_file" -)