From 143593cd65e9c40562884cb61317e99ecfbff1a9 Mon Sep 17 00:00:00 2001 From: quzha Date: Sun, 5 Feb 2023 16:24:37 +0800 Subject: [PATCH 1/4] fix frameworkcontroller bugs --- .../training_service/frameworkcontroller.rst | 74 ++++++++++++++++--- nni/tools/trial_tool/trial_runner.py | 3 +- .../frameworkcontrollerTrainingService.ts | 1 + .../kubernetes/kubernetesApiClient.ts | 30 ++++++-- .../training_service/reusable/environment.ts | 2 +- .../frameworkcontrollerEnvironmentService.ts | 18 +++++ .../kubernetesEnvironmentService.ts | 2 +- .../reusable/trialDispatcher.ts | 46 ++++++++++-- ts/webui/scripts/config/webpack.config.js | 2 + 9 files changed, 152 insertions(+), 26 deletions(-) diff --git a/docs/source/experiment/training_service/frameworkcontroller.rst b/docs/source/experiment/training_service/frameworkcontroller.rst index 4bbb653809..b2f0248d06 100644 --- a/docs/source/experiment/training_service/frameworkcontroller.rst +++ b/docs/source/experiment/training_service/frameworkcontroller.rst @@ -2,13 +2,69 @@ FrameworkController Training Service ==================================== NNI supports running experiment using `FrameworkController `__, -called frameworkcontroller mode. +called *frameworkcontroller* mode. FrameworkController is built to orchestrate all kinds of applications on Kubernetes, you don't need to install Kubeflow for specific deep learning framework like tf-operator or pytorch-operator. -Now you can use FrameworkController as the training service to run NNI experiment. +The following tutorial is based on Minikube, which should be the same for on-premise Kubernetes cluster. -Prerequisite for on-premises Kubernetes Service ------------------------------------------------ +Preparation +----------- + +**Step 0**. To run NNI experiment with FrameworkController, you should have a Kubernetes cluster (or Minikube for simplicity). + +**Step 1**. you need to install FrameworkController in the Kubernetes cluster simply with the following three commands. + +.. code-block:: bash + + kubectl create serviceaccount frameworkcontroller --namespace default + kubectl create clusterrolebinding frameworkcontroller \ + --clusterrole=cluster-admin \ + --user=system:serviceaccount:default:frameworkcontroller + kubectl create -f frameworkcontroller-with-default-config.yaml + +The content of frameworkcontroller-with-default-config.yaml is: + +.. code-block:: yaml + + apiVersion: apps/v1 + kind: StatefulSet + metadata: + name: frameworkcontroller + namespace: default + spec: + serviceName: frameworkcontroller + selector: + matchLabels: + app: frameworkcontroller + replicas: 1 + template: + metadata: + labels: + app: frameworkcontroller + spec: + # Using the ServiceAccount with granted permission + # if the k8s cluster enforces authorization. + serviceAccountName: frameworkcontroller + containers: + - name: frameworkcontroller + image: frameworkcontroller/frameworkcontroller + # Using k8s inClusterConfig, so usually, no need to specify + # KUBE_APISERVER_ADDRESS or KUBECONFIG + #env: + #- name: KUBE_APISERVER_ADDRESS + # value: {http[s]://host:port} + #- name: KUBECONFIG + # value: {Pod Local KubeConfig File Path} + +You can refer to `more advanced configuration of FrameworkController here `__. + +**Step 2**. When running on Kubernetes, NNI need a shared storage to synchronize trial code and log files +between the NNI experiment runing on your dev machine and the trials running on Kubernetes. +NFS and Azure File Storage are supported for now. + +.. code-block:: bash + + apt install nfs-common 1. A **Kubernetes** cluster using Kubernetes 1.8 or later. Follow this `guideline `__ to set up Kubernetes. @@ -31,11 +87,6 @@ Prerequisite for on-premises Kubernetes Service apt install nfs-common -6. Install **NNI**: - -.. code-block:: bash - - python -m pip install nni Prerequisite for Azure Kubernetes Service ----------------------------------------- @@ -136,3 +187,8 @@ NNIManager will start a rest server and listen on a port which is your NNI web p For example, if your web portal port is ``8080``, the rest server will listen on ``8081``, to receive metrics from trial job running in Kubernetes. So you should ``enable 8081`` TCP port in your firewall rule to allow incoming traffic. + +FAQ +--- + +1. TBD \ No newline at end of file diff --git a/nni/tools/trial_tool/trial_runner.py b/nni/tools/trial_tool/trial_runner.py index ef56b47485..3918c0b98b 100644 --- a/nni/tools/trial_tool/trial_runner.py +++ b/nni/tools/trial_tool/trial_runner.py @@ -167,7 +167,8 @@ def check_version(args): args.command_channel = settings["commandChannel"] if args.trial_command is None: - args.trial_command = settings["command"] + # FIXME: deal with distributed trial which has more than one trial command + args.trial_command = settings["command"][0] if args.nnimanager_ip is None: args.nnimanager_ip = settings["nniManagerIP"] if args.nnimanager_port is None: diff --git a/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts b/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts index dcba6253d2..94f6862569 100644 --- a/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts +++ b/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts @@ -186,6 +186,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple frameworkcontrollerJobName ); } + this.log.info('zql config: ', frameworkcontrollerJobConfig); await this.kubernetesCRDClient.createKubernetesJob(frameworkcontrollerJobConfig); // Set trial job detail until create frameworkcontroller job successfully diff --git a/ts/nni_manager/training_service/kubernetes/kubernetesApiClient.ts b/ts/nni_manager/training_service/kubernetes/kubernetesApiClient.ts index 7409202745..0e17d4ec52 100644 --- a/ts/nni_manager/training_service/kubernetes/kubernetesApiClient.ts +++ b/ts/nni_manager/training_service/kubernetes/kubernetesApiClient.ts @@ -181,16 +181,30 @@ abstract class KubernetesCRDClient { } } - public async createKubernetesJob(jobManifest: any): Promise { - let result: Promise; - const response: any = await this.operator.post({body: jobManifest}); - if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { - result = Promise.resolve(true); - } else { - result = Promise.reject(`KubernetesApiClient createKubernetesJob failed, statusCode is ${response.statusCode}`); + public async createKubernetesJob(jobManifest: any): Promise { + // await this.operator.post({body: jobManifest}) + // .then((response: any) => { + // if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { + // return Promise.resolve(); + // } else { + // return Promise.reject(`KubernetesApiClient createKubernetesJob failed, statusCode is ${response.statusCode}`); + // } + // }) + // .catch((error: Error) => { + // return Promise.reject(`Failed in creating Kubernetes job: KubernetesApiClient post error ${error}`); + // }); + + try { + const response: any = await this.operator.post({body: jobManifest}); + if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { + return Promise.resolve(); + } else { + return Promise.reject(`Creating Kubernetes job failed: KubernetesApiClient post statusCode is ${response.statusCode}`); + } + } catch (err: any) { + return Promise.reject(`Creating Kubernetes job failed: KubernetesApiClient post error ${err}`); } - return result; } //TODO : replace any diff --git a/ts/nni_manager/training_service/reusable/environment.ts b/ts/nni_manager/training_service/reusable/environment.ts index 2874ab2104..19cf8855a2 100644 --- a/ts/nni_manager/training_service/reusable/environment.ts +++ b/ts/nni_manager/training_service/reusable/environment.ts @@ -192,7 +192,7 @@ export class RunnerSettings { public nniManagerPort: number = 8081; public nniManagerVersion: string = ""; public logCollection: string = "none"; - public command: string = ""; + public command: string[] = []; public enableGpuCollector: boolean = true; // specify which communication channel is used by runner. diff --git a/ts/nni_manager/training_service/reusable/environments/kubernetes/frameworkcontrollerEnvironmentService.ts b/ts/nni_manager/training_service/reusable/environments/kubernetes/frameworkcontrollerEnvironmentService.ts index 1d495d6540..2e782dc59d 100644 --- a/ts/nni_manager/training_service/reusable/environments/kubernetes/frameworkcontrollerEnvironmentService.ts +++ b/ts/nni_manager/training_service/reusable/environments/kubernetes/frameworkcontrollerEnvironmentService.ts @@ -72,6 +72,8 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment } let configTaskRoles: any = undefined; configTaskRoles = this.config.taskRoles; + this.log.info('zql configTaskRoles: ', configTaskRoles); + this.log.info('zql configTaskRoles: ', JSON.stringify(configTaskRoles)); //Generate the port used for taskRole this.generateContainerPort(configTaskRoles); @@ -94,6 +96,11 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment frameworkcontrollerJobName ); // Create kubeflow job based on generated kubeflow job resource config + this.log.info('zql environment service config: ', frameworkcontrollerJobConfig); + this.log.info('zql environment service config: ', JSON.stringify(frameworkcontrollerJobConfig)); + // FIXME: dump the config for easy debuggability + // ... + // throw new Error("zql throw test"); await this.kubernetesCRDClient.createKubernetesJob(frameworkcontrollerJobConfig); } @@ -136,11 +143,15 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment private async prepareFrameworkControllerConfig(envId: string, trialWorkingFolder: string, frameworkcontrollerJobName: string): Promise { const podResources: any = []; + this.log.info('zql this.config: ', this.config); + this.log.info('zql this.config: ', JSON.stringify(this.config)); for (const taskRole of this.config.taskRoles) { const resource: any = {}; resource.requests = this.generatePodResource(toMegaBytes(taskRole.memorySize), taskRole.cpuNumber, taskRole.gpuNumber); resource.limits = {...resource.requests}; podResources.push(resource); + this.log.info('zql resource: ', resource); + this.log.info('zql resource: ', JSON.stringify(resource)); } // Generate frameworkcontroller job resource config object const frameworkcontrollerJobConfig: any = @@ -247,10 +258,17 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment }]); } + const securityContext: any = { + fsGroup: 1013, + runAsUser: 1013, + runAsGroup: 1013 + }; + const containers: any = [ { name: 'framework', image: replicaImage, + securityContext: securityContext, command: ['sh', `${path.join(trialWorkingFolder, runScriptFile)}`], volumeMounts: [ { diff --git a/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts b/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts index 2cf8927125..bfffbeea9e 100644 --- a/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts +++ b/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts @@ -151,7 +151,7 @@ export class KubernetesEnvironmentService extends EnvironmentService { protected async createNFSStorage(nfsServer: string, nfsPath: string): Promise { await cpp.exec(`mkdir -p ${this.nfsRootDir}`); try { - await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.nfsRootDir}`); + await cpp.exec(`echo 'quzha' | sudo -S mount ${nfsServer}:${nfsPath} ${this.nfsRootDir}`); } catch (error) { const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.nfsRootDir} failed, error is ${error}`; this.log.error(mountError); diff --git a/ts/nni_manager/training_service/reusable/trialDispatcher.ts b/ts/nni_manager/training_service/reusable/trialDispatcher.ts index 65ce50a163..4b4afae161 100644 --- a/ts/nni_manager/training_service/reusable/trialDispatcher.ts +++ b/ts/nni_manager/training_service/reusable/trialDispatcher.ts @@ -13,7 +13,7 @@ import { getBasePort, getExperimentId } from 'common/experimentStartupInfo'; import { getLogger, Logger } from 'common/log'; import { TrainingService, TrialJobApplicationForm, TrialJobMetric, TrialJobStatus } from 'common/trainingService'; import { delay, getExperimentRootDir, getIPV4Address, getLogLevel, getVersion, mkDirPSync, randomSelect, uniqueString } from 'common/utils'; -import { ExperimentConfig, SharedStorageConfig } from 'common/experimentConfig'; +import { ExperimentConfig, SharedStorageConfig, TrainingServiceConfig, FrameworkControllerConfig } from 'common/experimentConfig'; import { GPU_INFO, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, REPORT_METRIC_DATA, SEND_TRIAL_JOB_PARAMETER, STDOUT, TRIAL_END, VERSION_CHECK } from 'core/commands'; import { ScheduleResultType } from 'training_service/common/gpuData'; import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData'; @@ -30,6 +30,7 @@ import { SharedStorageService } from './sharedStorage'; import { NFSSharedStorageService } from './shared_storages/nfsStorageService' import { AzureBlobSharedStorageService } from './shared_storages/azureblobStorageService' import { TrialDetail } from './trial'; +import { assert } from 'console'; /** * It uses to manage jobs on training platforms @@ -228,9 +229,6 @@ class TrialDispatcher implements TrainingService { public async run(): Promise { await Promise.all(this.environmentServiceList.map(env => env.init())); for(const environmentService of this.environmentServiceList) { - - - await environmentService.getCommandChannel.start(); this.log.info(`TrialDispatcher: started channel: ${environmentService.getCommandChannel.constructor.name}`); @@ -495,6 +493,7 @@ class TrialDispatcher implements TrainingService { for (const environment of this.environments.values()) { if (environment.isAlive === true) { liveEnvironmentsCount++; + // FIXME: post warning message that environment/pod takes too long to be ready!!! if (environment.status === "RUNNING" && environment.isRunnerReady) { // if environment is not reusable and used, stop and not count as idle; const reuseMode = Array.isArray(this.config.trainingService) || (this.config.trainingService as any).reuseMode; @@ -640,7 +639,7 @@ class TrialDispatcher implements TrainingService { return randomSelect(validEnvironmentServiceList); } - private async prefetchEnvironments (): Promise { + private async prefetchEnvironments(): Promise { for (const environmentService of this.environmentServiceList) { const number = environmentService.prefetchedEnvironmentCount; this.log.info(`Initialize environments total number: ${number}`); @@ -650,6 +649,19 @@ class TrialDispatcher implements TrainingService { } } + private extractTrialCommands(trainingServiceName: string, trainingService: TrainingServiceConfig): string[] { + // FIXME: deal with different training services. + const trialCommands: string[] = []; + if (trainingServiceName === 'frameworkcontroller') { + // FIXME: deal with the mode of referencing backend's (e.g., frameworkcontroller) own configuration file + for (const taskRole of (trainingService as FrameworkControllerConfig).taskRoles) { + trialCommands.push(taskRole.command); + } + } + // else... + return trialCommands; + } + private async setEnvironmentSetting(environment: EnvironmentInformation): Promise { if (environment.environmentService === undefined) { throw new Error(`Environmentservice for ${environment.id} not initialized!`); @@ -660,14 +672,35 @@ class TrialDispatcher implements TrainingService { runnerSettings.nniManagerPort = getBasePort() + 1; runnerSettings.commandChannel = environmentService.getCommandChannel.channelName; runnerSettings.enableGpuCollector = this.enableGpuScheduler; - runnerSettings.command = this.config.trialCommand; + // trialCommand might be empty, in which case trialCommand is specified in taskRoles + // FIXME: rethink the relation between config.trialCommand and the command(s) in trainingservice config, + // maybe we should not provide such flexibility. + if (this.config.trialCommand === '') { + runnerSettings.command = [this.config.trialCommand]; + } + else { + if (this.config.trainingService instanceof Array) { + const foundTS = this.config.trainingService.find(element => element.platform === environmentService.getName); + if (foundTS !== undefined) + runnerSettings.command = this.extractTrialCommands(environmentService.getName, foundTS); + else + throw new Error(`${environmentService.getName} of the environment service cannot be found in config!`); + } + else { + runnerSettings.command = this.extractTrialCommands(environmentService.getName, this.config.trainingService); + } + } runnerSettings.nniManagerVersion = this.enableVersionCheck ? await getVersion() : ''; runnerSettings.logCollection = this.logCollection; runnerSettings.platform = environmentService.getName; runnerSettings.experimentId = this.experimentId; + this.log.info('zql setting config: ', this.config); + this.log.info('zql setting config: ', JSON.stringify(this.config)); const storageService: StorageService = this.getStorageService(environmentService); const envDir = storageService.joinPath("envs"); const runnerSettingsConfig = storageService.joinPath(envDir, environment.id, "settings.json"); + this.log.info('zql runnerSettings: ', runnerSettings); + this.log.info('zql runnerSettings: ', JSON.stringify(runnerSettings)); await storageService.save(JSON.stringify(runnerSettings), runnerSettingsConfig); } @@ -693,6 +726,7 @@ class TrialDispatcher implements TrainingService { // Generate setting.json file per environment to avoid conflict await this.setEnvironmentSetting(environment); + // FIXME: handles errors await environmentService.startEnvironment(environment); this.environments.set(environment.id, environment); diff --git a/ts/webui/scripts/config/webpack.config.js b/ts/webui/scripts/config/webpack.config.js index 88dff7072e..f223d25e92 100644 --- a/ts/webui/scripts/config/webpack.config.js +++ b/ts/webui/scripts/config/webpack.config.js @@ -587,6 +587,8 @@ module.exports = function(webpackEnv) { // public/ and not a SPA route new RegExp('/[^/]+\\.[^/]+$'), ], + swDest: 'dist/sw.js', + maximumFileSizeToCacheInBytes: 8 * 1024 * 1024, // 8 MB }), // TypeScript type checking useTypeScript && From cbcd74477152086e75d003a7ef286cb4bdf81bf6 Mon Sep 17 00:00:00 2001 From: quzha Date: Sun, 19 Feb 2023 22:20:37 +0800 Subject: [PATCH 2/4] frameworkcontroller v3 --- .../config/training_services/k8s_storage.py | 1 + nni/tools/trial_tool/trial_runner.py | 5 +- nni/tools/trial_tool/trial_runnerv3.py | 263 +++++++++ nni/tools/trial_tool/trialv3.py | 145 +++++ ts/nni_manager/common/experimentConfig.ts | 2 + ts/nni_manager/core/nnimanager.ts | 13 +- ts/nni_manager/package-lock.json | 36 +- .../training_service/common/util.ts | 10 + .../frameworkcontroller_v3/commandChannel.ts | 131 +++++ .../frameworkcontroller_v3/environment.ts | 199 +++++++ .../environmentService.ts | 343 +++++++++++ .../frameworkcontroller.ts | 534 ++++++++++++++++++ .../frameworkcontroller_v3/index.ts | 4 + .../webCommandChannel.ts | 141 +++++ .../frameworkcontrollerConfig.ts | 3 +- .../frameworkcontrollerEnvironmentService.ts | 18 - .../kubernetesEnvironmentService.ts | 2 +- .../reusable/trialDispatcher.ts | 46 +- ts/nni_manager/training_service/v3/compat.ts | 65 ++- ts/nni_manager/training_service/v3/factory.ts | 8 +- 20 files changed, 1868 insertions(+), 101 deletions(-) create mode 100644 nni/tools/trial_tool/trial_runnerv3.py create mode 100644 nni/tools/trial_tool/trialv3.py create mode 100644 ts/nni_manager/training_service/frameworkcontroller_v3/commandChannel.ts create mode 100644 ts/nni_manager/training_service/frameworkcontroller_v3/environment.ts create mode 100644 ts/nni_manager/training_service/frameworkcontroller_v3/environmentService.ts create mode 100644 ts/nni_manager/training_service/frameworkcontroller_v3/frameworkcontroller.ts create mode 100644 ts/nni_manager/training_service/frameworkcontroller_v3/index.ts create mode 100644 ts/nni_manager/training_service/frameworkcontroller_v3/webCommandChannel.ts diff --git a/nni/experiment/config/training_services/k8s_storage.py b/nni/experiment/config/training_services/k8s_storage.py index f122489442..685cea18e3 100644 --- a/nni/experiment/config/training_services/k8s_storage.py +++ b/nni/experiment/config/training_services/k8s_storage.py @@ -39,6 +39,7 @@ class K8sNfsConfig(K8sStorageConfig): storage: Literal['nfs'] = 'nfs' server: str path: str + local_mount_path: str @dataclass(init=False) class K8sAzureStorageConfig(K8sStorageConfig): diff --git a/nni/tools/trial_tool/trial_runner.py b/nni/tools/trial_tool/trial_runner.py index 3918c0b98b..55b9a2d624 100644 --- a/nni/tools/trial_tool/trial_runner.py +++ b/nni/tools/trial_tool/trial_runner.py @@ -167,8 +167,7 @@ def check_version(args): args.command_channel = settings["commandChannel"] if args.trial_command is None: - # FIXME: deal with distributed trial which has more than one trial command - args.trial_command = settings["command"][0] + args.trial_command = settings["command"] if args.nnimanager_ip is None: args.nnimanager_ip = settings["nniManagerIP"] if args.nnimanager_port is None: @@ -252,4 +251,4 @@ def check_version(args): trial_runner_syslogger.close() # the process doesn't exit even main loop exit. So exit it explictly. - os._exit(0) + os._exit(0) \ No newline at end of file diff --git a/nni/tools/trial_tool/trial_runnerv3.py b/nni/tools/trial_tool/trial_runnerv3.py new file mode 100644 index 0000000000..241151754b --- /dev/null +++ b/nni/tools/trial_tool/trial_runnerv3.py @@ -0,0 +1,263 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import argparse +import json +import os +import random +import re +import sys +import time +import traceback +from datetime import datetime, timedelta + +import pkg_resources + +from .gpu import collect_gpu_usage + +idle_timeout_seconds = 10 * 60 +gpu_refressh_interval_seconds = 5 +regular = re.compile('v?(?P[0-9](\.[0-9]){0,1}).*') +trial_runner_syslogger = None + + +def main_loop(args): + '''main loop logic for trial runner''' + idle_last_time = datetime.now() + gpu_refresh_last_time = datetime.now() - timedelta(minutes=1) + try: + if args.job_pid_file: + with open(args.job_pid_file, 'w') as job_file: + job_file.write("%d" % os.getpid()) + + trials = dict() + + command_channel = args.command_channel + # command loop + while True: + command_type, command_data = command_channel.receive() + if command_type == CommandType.NewTrialJob: + nni_log(LogType.Info, 'New trial job with {0}'.format(command_data)) + trial_id = command_data["trialId"] + if trial_id in trials.keys(): + trial = trials[trial_id] + if trial.is_running(): + raise Exception('trial %s is running already, cannot start a new one' % trial.id) + else: + del trials[trial_id] + trial = Trial(args, command_data) + trial.run() + trials[trial_id] = trial + elif command_type == CommandType.KillTrialJob: + nni_log(LogType.Info, 'Kill trial job with {0}'.format(command_data)) + trial_id = command_data + if trial_id in trials.keys(): + trial = trials[trial_id] + trial.kill(command_data) + elif command_type == CommandType.SendTrialJobParameter: + nni_log(LogType.Info, 'Receive trial job parameter: {0}'.format(command_data)) + trial_id = command_data["trialId"] + if trial_id in trials.keys(): + trial = trials[trial_id] + trial.save_parameter_file(command_data) + elif command_type is not None: + raise Exception("unknown command %s" % command_type) + + trial_list = list(trials.values()) + for trial in trial_list: + if trial is not None and trial.is_running(): + idle_last_time = datetime.now() + else: + del trials[trial.id] + + if (datetime.now() - idle_last_time).seconds > idle_timeout_seconds: + nni_log(LogType.Info, "trial runner is idle more than {0} seconds, so exit.".format( + idle_timeout_seconds)) + break + + if args.enable_gpu_collect and (datetime.now() - gpu_refresh_last_time).seconds > gpu_refressh_interval_seconds: + # collect gpu information + gpu_info = collect_gpu_usage(args.node_id) + command_channel.send(CommandType.ReportGpuInfo, gpu_info) + gpu_refresh_last_time = datetime.now() + time.sleep(0.5) + except Exception as ex: + traceback.print_exc() + raise ex + finally: + nni_log(LogType.Info, "main_loop exits.") + + trial_list = list(trials.values()) + for trial in trial_list: + trial.kill() + del trials[trial.id] + # wait to send commands + for _ in range(10): + if command_channel.sent(): + break + time.sleep(1) + command_channel.close() + + +def trial_runner_help_info(*args): + print('please run --help to see guidance') + + +def check_version(args): + try: + trial_runner_version = pkg_resources.get_distribution('nni').version + except pkg_resources.ResolutionError: + # package nni does not exist, try nni-tool package + nni_log(LogType.Error, 'Package nni does not exist!') + os._exit(1) + if not args.nni_manager_version: + # skip version check + nni_log(LogType.Warning, 'Skipping version check!') + else: + try: + command_channel = args.command_channel + trial_runner_version = regular.search(trial_runner_version).group('version') + nni_log(LogType.Info, '{0}: runner_version is {1}'.format(args.node_id, trial_runner_version)) + nni_manager_version = regular.search(args.nni_manager_version).group('version') + nni_log(LogType.Info, '{0}: nni_manager_version is {1}'.format(args.node_id, nni_manager_version)) + log_entry = {} + if trial_runner_version != nni_manager_version: + nni_log(LogType.Warning, '{0}: Version does not match!'.format(args.node_id)) + error_message = '{0}: NNIManager version is {1}, Trial runner version is {2}, NNI version does not match!'.format( + args.node_id, nni_manager_version, trial_runner_version) + log_entry['tag'] = 'VCFail' + log_entry['msg'] = error_message + command_channel.send(CommandType.VersionCheck, log_entry) + while not command_channel.sent(): + time.sleep(1) + else: + nni_log(LogType.Info, '{0}: Version match!'.format(args.node_id)) + log_entry['tag'] = 'VCSuccess' + command_channel.send(CommandType.VersionCheck, log_entry) + except AttributeError as err: + nni_log(LogType.Error, '{0}: {1}'.format(args.node_id, err)) + +if __name__ == '__main__': + + '''NNI Trial Runner main function''' + PARSER = argparse.ArgumentParser() + PARSER.set_defaults(func=trial_runner_help_info) + PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process') + PARSER.add_argument('--nnimanager_ip', type=str, help='NNI manager rest server IP') + PARSER.add_argument('--nnimanager_port', type=str, help='NNI manager rest server port') + PARSER.add_argument('--nni_manager_version', type=str, help='the nni version transmitted from nniManager') + PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trial runner') + PARSER.add_argument('--node_count', type=int, help='number of nodes, it determines how to consume command and save code file') + PARSER.add_argument('--job_pid_file', type=str, help='save trial runner process pid') + args, unknown = PARSER.parse_known_args() + + setting_file = "settings.json" + if not os.path.exists(setting_file): + setting_file = "../{}".format(setting_file) + if os.path.exists(setting_file): + with open(setting_file, 'r') as fp: + settings = json.load(fp) + print("setting is {}".format(settings)) + else: + print("not found setting file") + + args.exp_id = settings["experimentId"] + args.platform = settings["platform"] + # runner_id is unique runner in experiment + args.runner_id = os.path.basename(os.path.realpath(os.path.curdir)) + args.runner_name = "runner_"+args.runner_id + args.enable_gpu_collect = settings["enableGpuCollector"] + args.command_channel = settings["commandChannel"] + + if args.trial_command is None: + # FIXME: deal with distributed trial which has more than one trial command + args.trial_command = settings["command"][0] + if args.nnimanager_ip is None: + args.nnimanager_ip = settings["nniManagerIP"] + if args.nnimanager_port is None: + args.nnimanager_port = settings["nniManagerPort"] + if args.nni_manager_version is None: + args.nni_manager_version = settings["nniManagerVersion"] + if args.log_collection is None: + args.log_collection = settings["logCollection"] + if args.node_count is None: + # default has only one node. + args.node_count = 1 + + # FIXME: in reuse mode, multiple trials would have the same output dir + # NOTE: NNI_OUTPUT_DIR is updated in trial.py + os.environ['NNI_OUTPUT_DIR'] = os.curdir + "/nnioutput" + os.environ['NNI_PLATFORM'] = args.platform + os.environ['NNI_SYS_DIR'] = os.curdir + os.environ['NNI_EXP_ID'] = args.exp_id + # FIXME: multi-phase is true? + os.environ['MULTI_PHASE'] = "true" + # FIXME: trial job id is runner? + os.environ['NNI_TRIAL_JOB_ID'] = "runner" + os.environ['REUSE_MODE'] = "true" + + from .log_utils import LogType, RemoteLogger, StdOutputType, nni_log + from .trialv3 import Trial + from .file_channel import FileChannel + from .web_channel import WebChannel + from .commands import CommandType + + is_multi_node = args.node_count > 1 + + if (is_multi_node): + # FIXME: not supported yet!!! + # for multiple nodes, create a file to get a unique id. + while True: + node_id = random.randint(0, 10000) + unique_check_file_name = "node_%s" % (node_id) + if not os.path.exists(unique_check_file_name): + break + with open(unique_check_file_name, "w") as unique_check_file: + unique_check_file.write("%s" % (int(datetime.now().timestamp() * 1000))) + args.node_id = node_id + else: + # node id is unique in the runner + args.node_id = None + + # init command channel + command_channel = None + if args.command_channel == "file": + command_channel = FileChannel(args) + elif args.command_channel == 'aml': + from .aml_channel import AMLChannel + command_channel = AMLChannel(args) + else: + command_channel = WebChannel(args) + command_channel.open() + + nni_log(LogType.Info, "command channel is {}, actual type is {}".format(args.command_channel, type(command_channel))) + args.command_channel = command_channel + + trial_runner_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'runner', + StdOutputType.Stdout, args.log_collection, args.runner_name, command_channel) + sys.stdout = sys.stderr = trial_runner_syslogger + nni_log(LogType.Info, "{}: merged args is {}".format(args.node_id, args)) + + if args.trial_command is None: + nni_log(LogType.Error, "{}: no command is found.".format(args.node_id)) + os._exit(1) + check_version(args) + try: + main_loop(args) + except SystemExit as se: + nni_log(LogType.Info, '{}: NNI trial runner exit with code {}'.format(args.node_id, se.code)) + + # try best to send latest errors to server + timeout = 10 + while not command_channel.sent() and timeout > 0: + timeout -= 1 + time.sleep(1) + os._exit(se.code) + finally: + if trial_runner_syslogger is not None: + if trial_runner_syslogger.pipeReader is not None: + trial_runner_syslogger.pipeReader.set_process_exit() + trial_runner_syslogger.close() + + # the process doesn't exit even main loop exit. So exit it explictly. + os._exit(0) diff --git a/nni/tools/trial_tool/trialv3.py b/nni/tools/trial_tool/trialv3.py new file mode 100644 index 0000000000..2178feaa59 --- /dev/null +++ b/nni/tools/trial_tool/trialv3.py @@ -0,0 +1,145 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import ctypes +import os +import sys +import shlex +from datetime import datetime +from subprocess import Popen + +import psutil + +from .log_utils import LogType, RemoteLogger, StdOutputType, nni_log +from .commands import CommandType + +trial_output_path_name = ".nni" + + +class Trial: + def __init__(self, args, data): + self.process = None + self.data = data + self.args = args + self.command_channel = args.command_channel + self.trial_syslogger_stdout = None + + global NNI_TRIAL_JOB_ID + self.id = data["trialId"] + if self.id is None: + raise Exception("trial_id is not found in %s" % data) + os.environ['NNI_TRIAL_JOB_ID'] = self.id + NNI_TRIAL_JOB_ID = self.id + + # for multiple nodes. If it's None, it means single node. + self.node_id = args.node_id + if self.node_id is None: + self.name = self.id + else: + self.name = "%s_%s" % (self.id, self.node_id) + + def run(self): + # redirect trial's stdout and stderr to syslog + self.trial_syslogger_stdout = RemoteLogger(self.args.nnimanager_ip, self.args.nnimanager_port, 'trial', StdOutputType.Stdout, + self.args.log_collection, self.id, self.args.command_channel) + + nni_log(LogType.Info, "%s: start to run trial" % self.name) + + trial_dir = os.path.realpath(os.path.join(os.curdir, f"trial_{self.id}")) + os.makedirs(trial_dir, exist_ok=True) + self.trial_dir = trial_dir + + environ = os.environ.copy() + environ['NNI_TRIAL_SEQ_ID'] = str(self.data["sequenceId"]) + environ['NNI_OUTPUT_DIR'] = trial_dir + environ['NNI_SYS_DIR'] = trial_dir + + # prepare parameters + nni_log(LogType.Info, '%s: saving parameter %s' % (self.name, self.data["parameter"])) + parameter_file_name = os.path.join(trial_dir, "parameter.cfg") + with open(parameter_file_name, "w") as parameter_file: + parameter_file.write(self.data["parameter"]) + + # FIXME: seems this part is for distributed trial + # # make sure code prepared by other node. + # if self.node_id is not None: + # while True: + # if os.path.exists(prepared_flag_file_name): + # break + # time.sleep(0.1) + + trial_command = self.args.trial_command + + gpuIndices = self.data.get("gpuIndices") + if (gpuIndices is not None): + if sys.platform == "win32": + trial_command = 'set CUDA_VISIBLE_DEVICES="%s " && call %s' % (gpuIndices, trial_command) + else: + trial_command = 'CUDA_VISIBLE_DEVICES="%s " %s' % (gpuIndices, trial_command) + + self.log_pipe_stdout = self.trial_syslogger_stdout.get_pipelog_reader() + self.process = Popen(trial_command, shell=True, stdout=self.log_pipe_stdout, + stderr=self.log_pipe_stdout, cwd=os.path.join(os.curdir, "code"), env=dict(environ)) + nni_log(LogType.Info, '{0}: spawns a subprocess (pid {1}) to run command: {2}'. + format(self.name, self.process.pid, shlex.split(trial_command))) + + # FIXME: command_data may not has field parameters!!! + def save_parameter_file(self, command_data): + parameters = command_data["parameters"] + file_index = int(parameters["index"]) + if file_index == 0: + parameter_file_name = "parameter.cfg" + else: + parameter_file_name = "parameter_{}.cfg".format(file_index) + parameter_file_name = os.path.join(self.working_dir, parameter_file_name) + with open(parameter_file_name, "w") as parameter_file: + nni_log(LogType.Info, '%s: saving parameter %s' % (self.name, parameters["value"])) + parameter_file.write(parameters["value"]) + + def is_running(self): + if (self.process is None): + return False + + retCode = self.process.poll() + # child worker process exits and all stdout data is read + if retCode is not None and self.log_pipe_stdout.set_process_exit() and self.log_pipe_stdout.is_read_completed == True: + # In Windows, the retCode -1 is 4294967295. It's larger than c_long, and raise OverflowError. + # So covert it to int32. + retCode = ctypes.c_long(retCode).value + nni_log(LogType.Info, '{0}: subprocess terminated. Exit code is {1}.'.format(self.name, retCode)) + + end_time = int(datetime.now().timestamp() * 1000) + end_message = { + "code": retCode, + "time": end_time, + "trial": self.id, + } + self.command_channel.send(CommandType.TrialEnd, end_message) + self.cleanup() + return False + else: + return True + + def kill(self, trial_id=None): + if trial_id == self.id or trial_id is None: + if self.process is not None: + try: + nni_log(LogType.Info, "%s: killing trial" % self.name) + for child in psutil.Process(self.process.pid).children(True): + child.kill() + self.process.kill() + except psutil.NoSuchProcess: + nni_log(LogType.Info, "kill trial %s failed: %s does not exist!" % (trial_id, self.process.pid)) + except Exception as ex: + nni_log(LogType.Error, "kill trial %s failed: %s " % (trial_id, str(ex))) + self.cleanup() + + def cleanup(self): + nni_log(LogType.Info, "%s: clean up trial" % self.name) + self.process = None + if self.log_pipe_stdout is not None: + self.log_pipe_stdout.set_process_exit() + self.log_pipe_stdout = None + if self.trial_syslogger_stdout is not None: + self.trial_syslogger_stdout.close() + self.trial_syslogger_stdout = None diff --git a/ts/nni_manager/common/experimentConfig.ts b/ts/nni_manager/common/experimentConfig.ts index 5938b0252b..6d3a3b7770 100644 --- a/ts/nni_manager/common/experimentConfig.ts +++ b/ts/nni_manager/common/experimentConfig.ts @@ -106,6 +106,7 @@ export interface KubernetesStorageConfig { storageType: string; server?: string; path?: string; + localMountPath?: string; azureAccount?: string; azureShare?: string; keyVaultName?: string; @@ -152,6 +153,7 @@ export interface FrameworkControllerTaskRoleConfig { } export interface FrameworkControllerConfig extends TrainingServiceConfig { + // FIXME: some the fields defined in TrainingServiceConfig are not assigned!!! platform: 'frameworkcontroller'; storage: KubernetesStorageConfig; serviceAccountName: string; diff --git a/ts/nni_manager/core/nnimanager.ts b/ts/nni_manager/core/nnimanager.ts index 1dcf1cb4ed..9d5d911625 100644 --- a/ts/nni_manager/core/nnimanager.ts +++ b/ts/nni_manager/core/nnimanager.ts @@ -476,17 +476,22 @@ class NNIManager implements Manager { const reuseMode = Array.isArray(config.trainingService) || (config.trainingService as any).reuseMode; if (reuseMode) { - const module_ = await import('../training_service/reusable/routerTrainingService'); - return await module_.RouterTrainingService.construct(config); + // const module_ = await import('../training_service/reusable/routerTrainingService'); + // return await module_.RouterTrainingService.construct(config); + const module_ = await import ('../training_service/v3/compat'); + return new module_.V3asV1(config.trainingService as TrainingServiceConfig); } else if (platform === 'local') { const module_ = await import('../training_service/local/localTrainingService'); return new module_.LocalTrainingService(config.trainingService); } else if (platform === 'kubeflow') { const module_ = await import('../training_service/kubernetes/kubeflow/kubeflowTrainingService'); return new module_.KubeflowTrainingService(); + // } else if (platform === 'frameworkcontroller') { + // const module_ = await import('../training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService'); + // return new module_.FrameworkControllerTrainingService(); } else if (platform === 'frameworkcontroller') { - const module_ = await import('../training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService'); - return new module_.FrameworkControllerTrainingService(); + const module_ = await import ('../training_service/v3/compat'); + return new module_.V3asV1(config.trainingService as TrainingServiceConfig); } else if (platform === 'adl') { const module_ = await import('../training_service/kubernetes/adl/adlTrainingService'); return new module_.AdlTrainingService(); diff --git a/ts/nni_manager/package-lock.json b/ts/nni_manager/package-lock.json index 88783e5ee1..5b9271e653 100644 --- a/ts/nni_manager/package-lock.json +++ b/ts/nni_manager/package-lock.json @@ -1040,9 +1040,9 @@ "dev": true }, "node_modules/@types/mocha": { - "version": "10.0.0", - "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-10.0.0.tgz", - "integrity": "sha512-rADY+HtTOA52l9VZWtgQfn4p+UDVM2eDVkMZT1I6syp0YKxW2F9v+0pbRZLsvskhQv/vMb6ZfCay81GHbz5SHg==", + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-9.0.0.tgz", + "integrity": "sha512-scN0hAWyLVAvLR9AyW7HoFF5sJZglyBsbPuHO4fv7JRvfmPBMfp1ozWqOf/e4wwPNxezBZXRfWzMb6iFLgEVRA==", "dev": true }, "node_modules/@types/node": { @@ -1618,9 +1618,9 @@ } }, "node_modules/anymatch": { - "version": "3.1.2", - "resolved": "https://registry.npmjs.org/anymatch/-/anymatch-3.1.2.tgz", - "integrity": "sha512-P43ePfOAIupkguHUycrc4qJ9kz8ZiuOUijaETwX7THt0Y/GNK7v0aa8rY816xWjZ7rJdA5XdMcpVFTKMq+RvWg==", + "version": "3.1.3", + "resolved": "https://registry.npmjs.org/anymatch/-/anymatch-3.1.3.tgz", + "integrity": "sha512-KMReFUr0B4t+D+OBkjR3KYqvocp2XaSzO55UcB6mgQMd3KbcE+mWTyvVV7D/zsdEbNnV6acZUutkiHQXvTr1Rw==", "dev": true, "dependencies": { "normalize-path": "^3.0.0", @@ -4937,9 +4937,9 @@ } }, "node_modules/mocha": { - "version": "10.1.0", - "resolved": "https://registry.npmjs.org/mocha/-/mocha-10.1.0.tgz", - "integrity": "sha512-vUF7IYxEoN7XhQpFLxQAEMtE4W91acW4B6En9l97MwE9stL1A9gusXfoHZCLVHDUJ/7V5+lbCM6yMqzo5vNymg==", + "version": "10.2.0", + "resolved": "https://registry.npmjs.org/mocha/-/mocha-10.2.0.tgz", + "integrity": "sha512-IDY7fl/BecMwFHzoqF2sg/SHHANeBoMMXFlS9r0OXKDssYE1M5O43wUY/9BVPeIvfH2zmEbBfseqN9gBQZzXkg==", "dev": true, "dependencies": { "ansi-colors": "4.1.1", @@ -11309,9 +11309,9 @@ "dev": true }, "@types/mocha": { - "version": "10.0.0", - "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-10.0.0.tgz", - "integrity": "sha512-rADY+HtTOA52l9VZWtgQfn4p+UDVM2eDVkMZT1I6syp0YKxW2F9v+0pbRZLsvskhQv/vMb6ZfCay81GHbz5SHg==", + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-9.0.0.tgz", + "integrity": "sha512-scN0hAWyLVAvLR9AyW7HoFF5sJZglyBsbPuHO4fv7JRvfmPBMfp1ozWqOf/e4wwPNxezBZXRfWzMb6iFLgEVRA==", "dev": true }, "@types/node": { @@ -11758,9 +11758,9 @@ } }, "anymatch": { - "version": "3.1.2", - "resolved": "https://registry.npmjs.org/anymatch/-/anymatch-3.1.2.tgz", - "integrity": "sha512-P43ePfOAIupkguHUycrc4qJ9kz8ZiuOUijaETwX7THt0Y/GNK7v0aa8rY816xWjZ7rJdA5XdMcpVFTKMq+RvWg==", + "version": "3.1.3", + "resolved": "https://registry.npmjs.org/anymatch/-/anymatch-3.1.3.tgz", + "integrity": "sha512-KMReFUr0B4t+D+OBkjR3KYqvocp2XaSzO55UcB6mgQMd3KbcE+mWTyvVV7D/zsdEbNnV6acZUutkiHQXvTr1Rw==", "dev": true, "requires": { "normalize-path": "^3.0.0", @@ -14271,9 +14271,9 @@ "integrity": "sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw==" }, "mocha": { - "version": "10.1.0", - "resolved": "https://registry.npmjs.org/mocha/-/mocha-10.1.0.tgz", - "integrity": "sha512-vUF7IYxEoN7XhQpFLxQAEMtE4W91acW4B6En9l97MwE9stL1A9gusXfoHZCLVHDUJ/7V5+lbCM6yMqzo5vNymg==", + "version": "10.2.0", + "resolved": "https://registry.npmjs.org/mocha/-/mocha-10.2.0.tgz", + "integrity": "sha512-IDY7fl/BecMwFHzoqF2sg/SHHANeBoMMXFlS9r0OXKDssYE1M5O43wUY/9BVPeIvfH2zmEbBfseqN9gBQZzXkg==", "dev": true, "requires": { "ansi-colors": "4.1.1", diff --git a/ts/nni_manager/training_service/common/util.ts b/ts/nni_manager/training_service/common/util.ts index 2b3a367256..856710502c 100644 --- a/ts/nni_manager/training_service/common/util.ts +++ b/ts/nni_manager/training_service/common/util.ts @@ -222,6 +222,16 @@ export async function tarAdd(tarPath: string, sourcePath: string): Promise return Promise.resolve(); } +/** + * Uncompress tar file to directory + * @param tarFile + * @param targetPath + */ +export async function tarExtract(tarFile: string, targetPath: string): Promise { + tar.extract({cwd: targetPath, file: tarFile, sync: true}); + return Promise.resolve(); +} + /** * generate script file name * @param fileNamePrefix diff --git a/ts/nni_manager/training_service/frameworkcontroller_v3/commandChannel.ts b/ts/nni_manager/training_service/frameworkcontroller_v3/commandChannel.ts new file mode 100644 index 0000000000..7f812cc9b7 --- /dev/null +++ b/ts/nni_manager/training_service/frameworkcontroller_v3/commandChannel.ts @@ -0,0 +1,131 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { EventEmitter } from "events"; +import { getLogger, Logger } from "common/log"; +import { TRIAL_COMMANDS } from "core/commands"; +import { encodeCommand } from "core/ipcInterface"; + +import { Channel, EnvironmentInformation } from "./environment"; + +const acceptedCommands: Set = new Set(TRIAL_COMMANDS); + +export class Command { + public readonly environment: EnvironmentInformation; + public readonly command: string; + public readonly data: any; + + constructor(environment: EnvironmentInformation, command: string, data: any) { + if (!acceptedCommands.has(command)) { + throw new Error(`unaccepted command ${command}`); + } + this.environment = environment; + this.command = command; + this.data = data; + } +} + +export class RunnerConnection { + public readonly environment: EnvironmentInformation; + + constructor(environment: EnvironmentInformation) { + this.environment = environment; + } + + public async open(): Promise { + // do nothing + } + + public async close(): Promise { + // do nothing + } +} + +export abstract class CommandChannel { + protected readonly log: Logger; + protected runnerConnections: Map = new Map(); + protected readonly commandEmitter: EventEmitter; + + private readonly commandPattern: RegExp = /(?[\w]{2})(?[\d]{14})(?.*)\n?/gm; + + public constructor(commandEmitter: EventEmitter) { + this.log = getLogger('CommandChannel'); + this.commandEmitter = commandEmitter; + } + + public abstract get channelName(): Channel; + public abstract config(key: string, value: any): Promise; + public abstract start(): Promise; + public abstract stop(): Promise; + + // Pull-based command channels need loop to check messages, the loop should be started with await here. + public abstract run(): Promise; + + protected abstract sendCommandInternal(environment: EnvironmentInformation, message: string): Promise; + protected abstract createRunnerConnection(environment: EnvironmentInformation): RunnerConnection; + + public async sendCommand(environment: EnvironmentInformation, commandType: string, data: any): Promise { + const command = encodeCommand(commandType, JSON.stringify(data)); + this.log.debug(`CommandChannel: env ${environment.id} sending command: ${command}`); + await this.sendCommandInternal(environment, command.toString("utf8")); + } + + public async open(environment: EnvironmentInformation): Promise { + if (this.runnerConnections.has(environment.id)) { + throw new Error(`CommandChannel: env ${environment.id} is opened already, shouldn't be opened again.`); + } + const connection = this.createRunnerConnection(environment); + this.runnerConnections.set(environment.id, connection); + await connection.open(); + } + + public async close(environment: EnvironmentInformation): Promise { + if (this.runnerConnections.has(environment.id)) { + const connection = this.runnerConnections.get(environment.id); + this.runnerConnections.delete(environment.id); + if (connection !== undefined) { + await connection.close(); + } + } + } + + protected parseCommands(content: string): [string, any][] { + const commands: [string, any][] = []; + + let matches = this.commandPattern.exec(content); + + while (matches) { + if (undefined !== matches.groups) { + const commandType = matches.groups["type"]; + const dataLength = parseInt(matches.groups["length"]); + const data: any = matches.groups["data"]; + if (dataLength !== data.length) { + throw new Error(`dataLength ${dataLength} not equal to actual length ${data.length}: ${data}`); + } + try { + const finalData = JSON.parse(data); + // to handle encode('utf8') of Python + commands.push([commandType, finalData]); + } catch (error) { + this.log.error(`CommandChannel: error on parseCommands ${error}, original: ${matches.groups["data"]}`); + throw error; + } + } + matches = this.commandPattern.exec(content); + } + + return commands; + } + + protected handleCommand(environment: EnvironmentInformation, content: string): void { + const parsedResults = this.parseCommands(content); + + for (const parsedResult of parsedResults) { + const commandType = parsedResult[0]; + const data = parsedResult[1]; + const command = new Command(environment, commandType, data); + this.commandEmitter.emit("command", command); + this.log.trace(`CommandChannel: env ${environment.id} emit command: ${commandType}, ${data}.`); + } + } +} diff --git a/ts/nni_manager/training_service/frameworkcontroller_v3/environment.ts b/ts/nni_manager/training_service/frameworkcontroller_v3/environment.ts new file mode 100644 index 0000000000..29a141e720 --- /dev/null +++ b/ts/nni_manager/training_service/frameworkcontroller_v3/environment.ts @@ -0,0 +1,199 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + + +import { EventEmitter } from "events"; +import { getLogger, Logger } from "common/log"; +import { TrialJobStatus } from "common/trainingService"; +import { GPUInfo } from "training_service/common/gpuData"; +import { CommandChannel } from "./commandChannel"; +import { WebCommandChannel } from './webCommandChannel'; + + +export type EnvironmentStatus = 'UNKNOWN' | 'WAITING' | 'RUNNING' | 'SUCCEEDED' | 'FAILED' | 'USER_CANCELED'; +export type Channel = "web" | "file" | "aml" | "ut"; + + +export class TrialGpuSummary { + // GPU count on the machine + public gpuCount: number; + // The timestamp when GPU summary data queried + public timestamp: string; + // The array of GPU information for each GPU card + public gpuInfos: GPUInfo[]; + // GPU assigned status + public assignedGpuIndexMap: Map = new Map(); + + constructor(gpuCount: number, timestamp: string, gpuInfos: GPUInfo[]) { + this.gpuCount = gpuCount; + this.timestamp = timestamp; + this.gpuInfos = gpuInfos; + } +} + +export class EnvironmentInformation { + // node id is 5 chars, so won't conflict. + private readonly defaultNodeId = "default"; + private log: Logger; + private isNoGpuWarned: boolean = false; + + // key states + // true: environment is running, waiting, or unknown. + public isAlive: boolean = true; + // true: Runner is initialized, and can receive trials. + public isRunnerReady: boolean = false; + // don't set status in environment directly, use setFinalState function to set a final state. + public status: EnvironmentStatus = "UNKNOWN"; + + // true: environment is ready to run trial. + public runningTrialCount: number = 0; + // uses to count how many trial runs on this environment. + // it can be used in many scenarios, but for now, it uses for reusable. + public assignedTrialCount: number = 0; + // it is used to get environment idle time interval + public latestTrialReleasedTime: number = -1; + + // NNI environment ID + public id: string; + // training platform unique job ID. + public envId: string; + // training platform job friendly name, in case it's different with job ID. + public name: string; + public trackingUrl: string = ""; + public workingFolder: string = ""; + public runnerWorkingFolder: string = ""; + public command: string = ""; + public nodeCount: number = 1; + + // it's used to aggregate node status for multiple node trial + public nodes: Map; + public gpuSummaries: Map = new Map(); + + // use can specify which gpus can be used by NNI. + // it's usable for sharable environment like remote machine. + public usableGpus?: number[]; + // user can specify how to use GPU resource for an environment, like local and remote. + public maxTrialNumberPerGpu?: number; + public useActiveGpu?: boolean; + + public environmentService?: EnvironmentService; + + constructor(id: string, name: string, envId?: string) { + this.log = getLogger('EnvironmentInformation'); + this.id = id; + this.name = name; + this.envId = envId ? envId : name; + this.nodes = new Map(); + } + + public setStatus(status: EnvironmentStatus): void { + if (this.status !== status) { + this.log.info(`EnvironmentInformation: ${this.envId} change status from ${this.status} to ${status}.`) + this.status = status; + } + } + + public setGpuSummary(nodeId: string, newGpuSummary: TrialGpuSummary): void { + if (nodeId === null || nodeId === undefined) { + nodeId = this.defaultNodeId; + } + + const originalGpuSummary = this.gpuSummaries.get(nodeId); + if (undefined === originalGpuSummary) { + newGpuSummary.assignedGpuIndexMap = new Map(); + this.gpuSummaries.set(nodeId, newGpuSummary); + } else { + originalGpuSummary.gpuCount = newGpuSummary.gpuCount; + originalGpuSummary.timestamp = newGpuSummary.timestamp; + originalGpuSummary.gpuInfos = newGpuSummary.gpuInfos; + } + } + + public get defaultGpuSummary(): TrialGpuSummary | undefined { + const gpuSummary = this.gpuSummaries.get(this.defaultNodeId); + if (gpuSummary === undefined) { + if (false === this.isNoGpuWarned) { + this.log.warning(`EnvironmentInformation: ${this.envId} no default gpu found. current gpu info`, this.gpuSummaries); + this.isNoGpuWarned = true; + } + } else { + this.isNoGpuWarned = false; + } + return gpuSummary; + } +} + +export abstract class EnvironmentService { + + public async init(): Promise { + return; + } + + public abstract get hasStorageService(): boolean; + public abstract stopEnvironment(environment: EnvironmentInformation): Promise; + public abstract startEnvironment(environment: EnvironmentInformation): Promise; + // Make public for ut + protected commandChannel: CommandChannel | undefined; + + // It is used to set prefetched environment count, default value is 0 for OpenPAI and AML mode, + // in remote mode, this value is set to the length of machine list. + public get prefetchedEnvironmentCount(): number { + return 0; + } + + public abstract get getName(): string; + + // Initialize command channel, use WebCommandChannel as default command channel + public initCommandChannel(eventEmitter: EventEmitter): void { + this.commandChannel = WebCommandChannel.getInstance(eventEmitter); + } + + public get getCommandChannel(): CommandChannel { + if (this.commandChannel === undefined) { + throw new Error("Command channel not initialized!"); + } + return this.commandChannel; + } + + // It depends on environment pressure and settings + // for example, OpenPAI relies on API calls, and there is an limitation for frequence, so it need to be bigger. + public get environmentMaintenceLoopInterval(): number { + return 5000; + } + + // it's needed in two scenario + // 1. remote machine has fixed number, so it can return false, when all environment are assigned. + // 2. If there are consistent error on requested environments, for example, authentication failure on platform. + public get hasMoreEnvironments(): boolean { + return true; + } + + public createEnvironmentInformation(envId: string, envName: string): EnvironmentInformation { + return new EnvironmentInformation(envId, envName); + } +} + +export class NodeInformation { + public id: string; + public status: TrialJobStatus = "UNKNOWN"; + public endTime?: number; + + constructor(id: string) { + this.id = id; + } +} + +export class RunnerSettings { + public experimentId: string = ""; + public platform: string = ""; + public nniManagerIP: string = ""; + public nniManagerPort: number = 8081; + public nniManagerVersion: string = ""; + public logCollection: string = "none"; + public command: string[] = []; + public enableGpuCollector: boolean = true; + + // specify which communication channel is used by runner. + // supported channel includes: rest, storage, aml + public commandChannel: Channel = "file"; +} diff --git a/ts/nni_manager/training_service/frameworkcontroller_v3/environmentService.ts b/ts/nni_manager/training_service/frameworkcontroller_v3/environmentService.ts new file mode 100644 index 0000000000..32f089a416 --- /dev/null +++ b/ts/nni_manager/training_service/frameworkcontroller_v3/environmentService.ts @@ -0,0 +1,343 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +'use strict'; + +import fs from 'fs'; +import path from 'path'; +import * as component from 'common/component'; +import { Logger, getLogger } from 'common/log'; +import { uniqueString } from 'common/utils'; +import { FrameworkControllerConfig, FrameworkControllerTaskRoleConfig, toMegaBytes } from 'common/experimentConfig'; +import { ExperimentStartupInfo } from 'common/experimentStartupInfo'; +import { GeneralK8sClient, KubernetesCRDClient } from 'training_service/kubernetes/kubernetesApiClient'; +import { EnvironmentInformation, EnvironmentService } from './environment'; +import { FrameworkControllerClientFactory } from '../kubernetes/frameworkcontroller/frameworkcontrollerApiClient'; +import type { FrameworkControllerTrialConfigTemplate } from '../kubernetes/frameworkcontroller/frameworkcontrollerConfig'; + + +export class FrameworkControllerEnvironmentService extends EnvironmentService { + private containerMountPath: string; + private NNI_KUBERNETES_TRIAL_LABEL: string; + private log: Logger; + private genericK8sClient: GeneralK8sClient; + private experimentId: string; + private environmentWorkingFolder: string; + private kubernetesCRDClient?: KubernetesCRDClient; + + private config: FrameworkControllerConfig; + private createStoragePromise?: Promise; + private readonly fcContainerPortMap: Map = new Map(); // store frameworkcontroller container port + + + constructor(config: FrameworkControllerConfig, info: ExperimentStartupInfo) { + super(); + this.log = getLogger('FrameworkControllerEnvironmentService'); + this.containerMountPath = '/tmp/mount'; + this.NNI_KUBERNETES_TRIAL_LABEL = 'nni-kubernetes-trial'; + this.genericK8sClient = new GeneralK8sClient(); + this.experimentId = info.experimentId; + this.environmentWorkingFolder = path.join(this.containerMountPath, 'nni', this.experimentId); + + this.config = config; + // Create kubernetesCRDClient + this.kubernetesCRDClient = FrameworkControllerClientFactory.createClient(this.config.namespace); + this.genericK8sClient.setNamespace = this.config.namespace ?? "default" + // Storage is mounted by user + } + + public get environmentMaintenceLoopInterval(): number { + return 5000; + } + + public get hasStorageService(): boolean { + return false; + } + + public get getName(): string { + return 'frameworkcontroller'; + } + + public get getContainerMountPath(): string { + return this.containerMountPath; + } + + public generatePortAndCommand(command: string): string { + this.generateContainerPort(this.config.taskRoles as any); + const patchedCommand: string = this.generateCommandScript(this.config.taskRoles, command); + return patchedCommand; + } + + public async startEnvironment(environment: EnvironmentInformation): Promise { + if (this.kubernetesCRDClient === undefined) { + throw new Error("kubernetesCRDClient not initialized!"); + } + if (this.createStoragePromise) { + await this.createStoragePromise; + } + + const frameworkcontrollerJobName: string = `nniexp${this.experimentId}env${environment.id}`.toLowerCase(); + environment.maxTrialNumberPerGpu = this.config.maxTrialNumberPerGpu; + // FIXME: create trial log and show it on webui + environment.trackingUrl = `${this.config.storage.localMountPath}/nni/${this.experimentId}/envs/${environment.id}/`; + // Generate kubeflow job resource config object + const frameworkcontrollerJobConfig: any = await this.prepareFrameworkControllerConfig( + environment.id, + path.join(this.environmentWorkingFolder, 'envs', environment.id), + frameworkcontrollerJobName + ); + // Create frameworkcontroller job based on generated kubeflow job resource config + await this.kubernetesCRDClient.createKubernetesJob(frameworkcontrollerJobConfig); + + return Promise.resolve(frameworkcontrollerJobConfig); + } + + public async stopEnvironment(environment: EnvironmentInformation): Promise { + if (this.kubernetesCRDClient === undefined) { + throw new Error('kubernetesCRDClient not initialized!'); + } + try { + await this.kubernetesCRDClient.deleteKubernetesJob(new Map( + [ + ['app', this.NNI_KUBERNETES_TRIAL_LABEL], + ['expId', this.experimentId], + ['envId', environment.id] + ] + )); + } catch (err) { + const errorMessage: string = `Delete env ${environment.id} failed: ${err}`; + this.log.error(errorMessage); + + return Promise.reject(errorMessage); + } + } + + public generatePodResource(memory: number, cpuNum: number, gpuNum: number): any { + const resources: any = { + memory: `${memory}Mi`, + cpu: `${cpuNum}` + }; + + if (gpuNum !== 0) { + resources['nvidia.com/gpu'] = `${gpuNum}`; + } + + return resources; + } + + /** + * generate trial's command for frameworkcontroller + * expose port and execute injector.sh before executing user's command + * @param command + */ + private generateCommandScript(taskRoles: FrameworkControllerTaskRoleConfig[], command: string): string { + let portScript: string = ''; + for (const taskRole of taskRoles) { + portScript += `FB_${taskRole.name.toUpperCase()}_PORT=${this.fcContainerPortMap.get( + taskRole.name + )} `; + } + return `${portScript} . /mnt/frameworkbarrier/injector.sh && ${command}`; + } + + private async prepareFrameworkControllerConfig(envId: string, trialWorkingFolder: string, frameworkcontrollerJobName: string): + Promise { + const podResources: any = []; + for (const taskRole of this.config.taskRoles) { + const resource: any = {}; + resource.requests = this.generatePodResource(toMegaBytes(taskRole.memorySize), taskRole.cpuNumber, taskRole.gpuNumber); + resource.limits = {...resource.requests}; + podResources.push(resource); + } + // Generate frameworkcontroller job resource config object + const frameworkcontrollerJobConfig: any = + await this.generateFrameworkControllerJobConfig(envId, trialWorkingFolder, frameworkcontrollerJobName, podResources); + + return Promise.resolve(frameworkcontrollerJobConfig); + } + + private generateContainerPort(taskRoles: FrameworkControllerTrialConfigTemplate[]): void { + if (taskRoles === undefined) { + throw new Error('frameworkcontroller trial config is not initialized'); + } + + let port: number = 4000; //The default port used in container + for (const index of taskRoles.keys()) { + this.fcContainerPortMap.set(taskRoles[index].name, port); + port += 1; + } + } + + private async createRegistrySecret(filePath: string | undefined): Promise { + if (filePath === undefined || filePath === '') { + return undefined; + } + const body = fs.readFileSync(filePath).toString('base64'); + const registrySecretName = `nni-secret-${uniqueString(8).toLowerCase()}`; + const namespace = this.genericK8sClient.getNamespace ?? "default"; + await this.genericK8sClient.createSecret( + { + apiVersion: 'v1', + kind: 'Secret', + metadata: { + name: registrySecretName, + namespace: namespace, + labels: { + app: this.NNI_KUBERNETES_TRIAL_LABEL, + expId: this.experimentId + } + }, + type: 'kubernetes.io/dockerconfigjson', + data: { + '.dockerconfigjson': body + } + } + ); + return registrySecretName; + } + + /** + * Generate frameworkcontroller resource config file + * @param trialJobId trial job id + * @param trialWorkingFolder working folder + * @param frameworkcontrollerJobName job name + * @param podResources pod template + */ + private async generateFrameworkControllerJobConfig(envId: string, trialWorkingFolder: string, + frameworkcontrollerJobName: string, podResources: any): Promise { + + const taskRoles: any = []; + for (const index of this.config.taskRoles.keys()) { + const containerPort: number | undefined = this.fcContainerPortMap.get(this.config.taskRoles[index].name); + if (containerPort === undefined) { + throw new Error('Container port is not initialized'); + } + + const taskRole: any = this.generateTaskRoleConfig( + trialWorkingFolder, + this.config.taskRoles[index].dockerImage, + `${envId}_run.sh`, + podResources[index], + containerPort, + await this.createRegistrySecret(this.config.taskRoles[index].privateRegistryAuthPath) + ); + taskRoles.push({ + name: this.config.taskRoles[index].name, + taskNumber: this.config.taskRoles[index].taskNumber, + frameworkAttemptCompletionPolicy: { + minFailedTaskCount: this.config.taskRoles[index].frameworkAttemptCompletionPolicy.minFailedTaskCount, + minSucceededTaskCount: this.config.taskRoles[index].frameworkAttemptCompletionPolicy.minSucceedTaskCount + }, + task: taskRole + }); + } + + return Promise.resolve({ + apiVersion: `frameworkcontroller.microsoft.com/v1`, + kind: 'Framework', + metadata: { + name: frameworkcontrollerJobName, + namespace: this.config.namespace ?? "default", + labels: { + app: this.NNI_KUBERNETES_TRIAL_LABEL, + expId: this.experimentId, + envId: envId + } + }, + spec: { + executionType: 'Start', + taskRoles: taskRoles + } + }); + } + + private generateTaskRoleConfig(trialWorkingFolder: string, replicaImage: string, runScriptFile: string, + podResources: any, containerPort: number, privateRegistrySecretName: string | undefined): any { + + const volumeSpecMap: Map = new Map(); + // Only support nfs for now + volumeSpecMap.set('nniVolumes', [ + { + name: 'nni-vol', + nfs: { + server: `${this.config.storage.server}`, + path: `${this.config.storage.path}` + } + }, { + name: 'frameworkbarrier-volume', + emptyDir: {} + }]); + + // const securityContext: any = { + // fsGroup: xxxx, + // runAsUser: xxxx, + // runAsGroup: xxxx + // }; + + const containers: any = [ + { + name: 'framework', + image: replicaImage, + // securityContext: securityContext, + command: ['sh', `${path.join(trialWorkingFolder, runScriptFile)}`], + volumeMounts: [ + { + name: 'nni-vol', + mountPath: this.containerMountPath + }, { + name: 'frameworkbarrier-volume', + mountPath: '/mnt/frameworkbarrier' + }], + resources: podResources, + ports: [{ + containerPort: containerPort + }] + }]; + + const initContainers: any = [ + { + name: 'frameworkbarrier', + image: 'frameworkcontroller/frameworkbarrier', + volumeMounts: [ + { + name: 'frameworkbarrier-volume', + mountPath: '/mnt/frameworkbarrier' + }] + }]; + + const spec: any = { + containers: containers, + initContainers: initContainers, + restartPolicy: 'OnFailure', + volumes: volumeSpecMap.get('nniVolumes'), + hostNetwork: false + }; + if (privateRegistrySecretName) { + spec.imagePullSecrets = [ + { + name: privateRegistrySecretName + } + ] + } + + if (this.config.serviceAccountName !== undefined) { + spec.serviceAccountName = this.config.serviceAccountName; + } + + return { + pod: { + spec: spec + } + }; + } + + public async getK8sJobInfo(environment: EnvironmentInformation): Promise { + if (this.kubernetesCRDClient === undefined) { + throw new Error("kubernetesCRDClient undefined"); + } + const kubeflowJobName: string = `nniexp${this.experimentId}env${environment.id}`.toLowerCase(); + const kubernetesJobInfo = await this.kubernetesCRDClient.getKubernetesJob(kubeflowJobName); + return Promise.resolve(kubernetesJobInfo); + } + +} diff --git a/ts/nni_manager/training_service/frameworkcontroller_v3/frameworkcontroller.ts b/ts/nni_manager/training_service/frameworkcontroller_v3/frameworkcontroller.ts new file mode 100644 index 0000000000..f63d7b2e9b --- /dev/null +++ b/ts/nni_manager/training_service/frameworkcontroller_v3/frameworkcontroller.ts @@ -0,0 +1,534 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { EventEmitter } from 'events'; +import fs from 'fs'; +import path from 'path'; +import { Logger, getLogger } from 'common/log'; +import type { FrameworkControllerConfig, TrainingServiceConfig } from 'common/experimentConfig'; +import type { EnvironmentInfo, Metric, Parameter, TrainingServiceV3 } from 'common/training_service_v3'; +import { ExperimentStartupInfo, getExperimentId, getBasePort } from 'common/experimentStartupInfo'; +import { delay, getExperimentRootDir, uniqueString, getIPV4Address, getVersion } from 'common/utils'; +import { GPU_INFO, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, REPORT_METRIC_DATA, STDOUT, TRIAL_END, VERSION_CHECK } from 'core/commands'; + +import { validateCodeDir, tarAdd, tarExtract } from '../common/util'; +import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData'; +import { CONTAINER_INSTALL_NNI_SHELL_FORMAT_FOR_WIN } from '../common/containerJobData'; +import { FrameworkControllerJobStatus, FrameworkControllerJobCompleteStatus } from '../kubernetes/frameworkcontroller/frameworkcontrollerConfig'; + +import type { Command, CommandChannel } from './commandChannel'; +import { Channel, EnvironmentInformation, RunnerSettings } from './environment'; +import { FrameworkControllerEnvironmentService } from './environmentService'; + +import { assert } from 'console'; + +export class FrameworkControllerTSV3 implements TrainingServiceV3 { + private config: FrameworkControllerConfig; + private log: Logger; + private commandEmitter: EventEmitter; + private commandChannel: CommandChannel | undefined; + private envService: FrameworkControllerEnvironmentService | undefined; + // TODO: use EnvironmentInfo instead of EnvironmentInformation + private envs: Map; + private stopping: boolean = false; + private envManageLoopPromise: Promise | undefined; + private trialToEnv: Map; + private parameters: [string, Parameter][]; + + constructor(trainingServiceId: string, config: TrainingServiceConfig) { + this.log = getLogger(`FrameworkControllerV3.${trainingServiceId}`); + this.log.info('Training sevice config:', config); + + this.config = config as FrameworkControllerConfig; + this.envs = new Map(); + this.trialToEnv = new Map(); + this.parameters = []; + this.commandEmitter = new EventEmitter(); + } + + /** + * Invoked during experiment initialization. + * + * It should verify the config and raise error if the specified training resource is not available. + * + * It should not start daemon on worker machine. + * If another component of the experiment failed to initialize, + * the process will abort without invoking any clean up function. + **/ + public async init(): Promise { + // validate storage config + assert(this.config.storage !== undefined, 'Storage config should be set'); + assert(this.config.storage.storageType === 'nfs', + 'FrameworkController training service v3 only supports NFS storage for now'); + assert(this.config.storage.server !== undefined, 'NFS server should be set'); + assert(this.config.storage.path !== undefined, 'NFS path should be set'); + assert(this.config.storage.localMountPath !== undefined, 'Local mount path should be set'); + + await validateCodeDir(this.config.trialCodeDirectory); + + const startupInfo = ExperimentStartupInfo.getInstance(); + this.envService = new FrameworkControllerEnvironmentService(this.config, startupInfo); + this.commandEmitter.on("command", (command: Command): void => { + this.handleCommand(command).catch((error: Error) => { + this.log.error(`Error on handling env ${command.environment.id} command: ${command.command}, + data: ${command.data}, error: ${error}`); + }) + }); + this.envService.initCommandChannel(this.commandEmitter); + this.commandChannel = this.envService.getCommandChannel; + return; + } + + /** + * Copy user's trial code directory to //code-copy/ + */ + private async copyTrialCodeToTempDir(codeCopyDir: string): Promise { + fs.mkdirSync(codeCopyDir); + const codeDir = path.resolve(this.config.trialCodeDirectory); + await tarAdd(path.join(codeCopyDir, "trialcode.tar.gz"), codeDir); + await fs.promises.writeFile(path.join(codeCopyDir, "install_nni.sh"), CONTAINER_INSTALL_NNI_SHELL_FORMAT); + await fs.promises.writeFile(path.join(codeCopyDir, "install_nni.ps1"), CONTAINER_INSTALL_NNI_SHELL_FORMAT_FOR_WIN); + return Promise.resolve(); + } + + /** + * Copy files from //code-copy/ to // + */ + private async copyTrialCodeToSharedStorage(sourceDir: string, sharedExpPath: string): Promise { + fs.mkdirSync(sharedExpPath, { recursive: true }); + fs.copyFileSync(path.join(sourceDir, 'trialcode.tar.gz'), path.join(sharedExpPath, 'trialcode.tar.gz'), + fs.constants.COPYFILE_EXCL); + fs.copyFileSync(path.join(sourceDir, 'install_nni.sh'), path.join(sharedExpPath, 'install_nni.sh'), + fs.constants.COPYFILE_EXCL); + fs.copyFileSync(path.join(sourceDir, 'install_nni.ps1'), path.join(sharedExpPath, 'install_nni.ps1'), + fs.constants.COPYFILE_EXCL); + return Promise.resolve(); + } + + /** + * Untar trial code to /nni//envs//code/ + * Write settings.json to /nni//envs// + * + * Here, the files are prepared to the /..., corresponding to the in container. + */ + private async prepareFilesToEnvironment(envId: string, sharedExpPath: string, envSettings: RunnerSettings): Promise { + // untar trial code + const envPath = path.join(sharedExpPath, 'envs', envId); + fs.mkdirSync(envPath, { recursive: true }); + // chmode to 777 because otherwise job container cannot create files in the folder + fs.chmodSync(envPath, 0o777); + const targetPath = path.join(envPath, 'code'); + fs.mkdirSync(targetPath); + await tarExtract(path.join(sharedExpPath, 'trialcode.tar.gz'), targetPath); + // chmod to 0o777 because this is trial's working directory, trial may download data to this folder + fs.chmodSync(targetPath, 0o777); + // write settings.json + await fs.promises.writeFile(path.join(envPath, "settings.json"), JSON.stringify(envSettings)); + return Promise.resolve(); + } + + /** + * @param trialId trial's id + * @param data data is a string of json format, it is passed through to tuner/strategy + */ + private async handleMetricData(trialId: string, data: any): Promise { + if (Array.isArray(data)) { + for (const subItem of data) { + this.commandEmitter.emit('metric', trialId, subItem); + } + } else { + this.commandEmitter.emit('metric', trialId, data); + } + } + + private async handleStdout(commandData: any): Promise { + this.log.debug(`Trial stdout: ${commandData["msg"]}`); + const metricPattern: RegExp = /NNISDK_MEb'(?.*a?)'$/gm; + try { + if (commandData["tag"] === 'trial' && commandData["msg"] !== undefined) { + const message: string = commandData["msg"]; + let metricsContent = metricPattern.exec(message); + while (metricsContent && metricsContent.groups) { + const key: string = 'metrics'; + const data = metricsContent.groups[key]; + await this.handleMetricData(commandData["trial"], data); + metricsContent = metricPattern.exec(message); + } + } + } catch (err) { + this.log.error(`TrialDispatcher: handleStdout error: ${err}`); + } + } + + private async handleCommand(command: Command): Promise { + this.log.debug(`TrialDispatcher: env ${command.environment.id} received command ${command.command}.`); + switch (command.command) { + case REPORT_METRIC_DATA: + // TODO: refactor this part, in the current implementation metric data is passed through STDOUT + this.log.warning(`Unexpected command: ${REPORT_METRIC_DATA}.`); + break; + case STDOUT: + await this.handleStdout(command.data); + break; + case INITIALIZED: + this.log.debug(`Init command: ${INITIALIZED}.`); + const env = this.envs.get(command.environment.id); + if (env === undefined) { + throw new Error(`Environment ${command.environment.id} not found.`); + } + env.isRunnerReady = true; + // this environment is ready + this.commandEmitter.emit('env_status_change', command.environment.id); + break; + case VERSION_CHECK: + this.log.debug(`Version command: ${VERSION_CHECK}.`); + break; + case GPU_INFO: + this.log.debug(`Gpu command: ${GPU_INFO}.`); + break; + case TRIAL_END: + // command.data: { "code": retCode, "time": end_time, "trial": self.id } + this.log.debug(`Trial end command: ${TRIAL_END}.`); + this.commandEmitter.emit('trial_end', command.environment.id, + command.data['trial'], command.data['code'], command.data['end_time']); + break; + } + } + + private async environmentSetting(channelName: Channel, platform: string): Promise { + const envSettings: RunnerSettings = new RunnerSettings(); + envSettings.nniManagerIP = this.config.nniManagerIp === undefined? await getIPV4Address() : this.config.nniManagerIp; + // FIXME: why we need another port? + envSettings.nniManagerPort = getBasePort() + 1; + envSettings.commandChannel = channelName; + // trialCommand might be empty, in which case trialCommand is specified in taskRoles + // FIXME: rethink the relation between config.trialCommand and the command(s) in trainingservice config, + // maybe we should not provide such flexibility. + envSettings.command = this.config.taskRoles.map((taskRole) => taskRole.command); + envSettings.nniManagerVersion = await getVersion(); + // FIXME: check if it is a valid log level + envSettings.logCollection = 'none'; + envSettings.platform = platform; + envSettings.experimentId = getExperimentId(); + envSettings.enableGpuCollector = this.config.taskRoles.map((taskRole) => taskRole.gpuNumber > 0) + .reduce((res, useGpu) => res || useGpu, false); + return Promise.resolve(envSettings); + } + + private async environmentManagementLoop(): Promise { + if (this.envService === undefined) { + throw new Error('Environment service is not initialized, please call init() first'); + } + while (!this.stopping) { + for (const env of this.envs.values()) { + const k8sJobInfo = await (this.envService as any).getK8sJobInfo(env); + if (k8sJobInfo.status && k8sJobInfo.status.state) { + this.log.debug(`k8sJobInfo.status.state: ${k8sJobInfo.status.state}`); + const frameworkJobType: FrameworkControllerJobStatus = k8sJobInfo.status.state; + // The status is defined here: + // https://github.com/microsoft/frameworkcontroller/blob/master/pkg/apis/frameworkcontroller/v1/types.go#L490 + switch (frameworkJobType) { + case 'AttemptCreationPending': + case 'AttemptCreationRequested': + case 'AttemptPreparing': + env.setStatus('WAITING'); + break; + case 'AttemptRunning': + if (env.status !== 'RUNNING') { + env.setStatus('RUNNING'); + // Still need to emit here, because this status change may happen after runner initialized is ready, + // due to the interval of status check. + this.commandEmitter.emit('env_status_change', env.id); + } + break; + case 'AttemptDeletionPending': + case 'AttemptDeletionRequested': + case 'AttemptDeleting': + case 'AttemptCompleted': + this.log.info(`Environment ${env.id} is in ${frameworkJobType} state`); + break; + case 'Completed': { + const completedJobType: FrameworkControllerJobCompleteStatus = + k8sJobInfo.status.attemptStatus.completionStatus.type.name; + switch (completedJobType) { + case 'Succeeded': + env.setStatus('SUCCEEDED'); + break; + case 'Failed': + env.setStatus('FAILED'); + break; + default: + this.log.warning(`Environment ${env.id} is in ${completedJobType} state`); + env.setStatus('UNKNOWN'); + } + this.commandEmitter.emit('env_status_change', env.id); + break; + } + default: + this.log.warning(`Environment ${env.id} is in ${frameworkJobType} state`); + env.setStatus('UNKNOWN'); + } + } + } + // TODO: remove dead environments + await delay(2000); // 2s + } + this.log.info('EnvironmentManagementLoop done'); + return Promise.resolve(); + } + + /** + * The files that should be provided to an environment are: + * 1. install_nni.sh and install_nni.ps1 + * 2. trial code + * 3. settings.json: the settings for trial runner + * These files are placed in the provided storage (i.e., nfs, ...), + * specifically, the uncompressed trial code and settings.json are placed + * under /nni//envs//, + * install_nni.sh and install_nni.ps1 are placed under /nni//. + */ + private async createEnvironment(sharedExpPath: string): Promise { + // TODO: do I need to check experiment stopped? + if (this.commandChannel === undefined|| this.envService === undefined) { + throw new Error('Environment service is not initialized, please call init() first'); + } + const envId = uniqueString(5); + this.log.info(`Creating a new environment ${envId}...`); + const envName = `nni_exp_${getExperimentId()}_env_${envId}`; + const env = this.envService.createEnvironmentInformation(envId, envName); + const envSettings = await this.environmentSetting(this.commandChannel.channelName, this.envService.getName); + // Upload the related files from local to the nfs folder of the to-be-created environment + await this.prepareFilesToEnvironment(envId, sharedExpPath, envSettings); + + // The job command (i.e., trial_runner) that replaces user trial command. + const expWorkingDir = `${this.envService.getContainerMountPath}/nni/${getExperimentId()}`; + // The while clause in the command is for dealing with container auto restart + // FIXME: does env need the field 'command'? + env.command = `cd ${expWorkingDir} && sh ./install_nni.sh ` + + `&& cd envs/${envId} ` + + `&& i=0; while [ -e "trialrunner_stdout$\{i\}" -o -e "trialrunner_stderr$\{i\}" ]; ` + + `do i=$((i+1)); done; ` + + `python3 -m nni.tools.trial_tool.trial_runnerv3 ` + + `1>${expWorkingDir}/envs/${envId}/trialrunner_stdout$\{i\} ` + + `2>${expWorkingDir}/envs/${envId}/trialrunner_stderr$\{i\}`; + const fullCommand = (this.envService as any).generatePortAndCommand(env.command); + const envPath = path.join(sharedExpPath, 'envs', envId); + await fs.promises.writeFile(path.join(envPath, `${envId}_run.sh`), fullCommand, { encoding: 'utf8' }); + + const fcJobConfig = await this.envService.startEnvironment(env); + // Dump the k8s job config for debug + await fs.promises.writeFile(path.join(envPath, 'job_k8s_config.yaml'), JSON.stringify(fcJobConfig), { encoding: 'utf8' }); + this.commandChannel.open(env); + + return env; + } + + /** + * Invoked after init(). + * It is suggested to resolve the promise after all daemons initialized. + * + * Return an empty list of environment because the environment is not ready at this point. + **/ + public async start(): Promise { + if (this.envService === undefined || this.commandChannel === undefined) { + throw new Error('Environment service or command channel is not initialized, please call init() first'); + } + this.envService.init(); // does nothing + await this.commandChannel.start(); + await this.commandChannel.run(); // does nothing + this.log.info(`TrialDispatcher: started channel: ${this.commandChannel.constructor.name}`); + + // Copy files (install_nni.sh/ps1 and trial code) to local experiment folder + const codeCopyDir = path.join(getExperimentRootDir(), "code-copy"); + await this.copyTrialCodeToTempDir(codeCopyDir); + + // Copy the .tar.gz trial code from the local experiment folder to the shared storage. + // Copy to /nni/exp-id/xxx.tar.gz + if (this.config.storage.localMountPath === undefined) { + throw new Error('localMountPath is not set in storage config'); + } + const sharedExpPath = path.join(this.config.storage.localMountPath, 'nni', getExperimentId()); + await this.copyTrialCodeToSharedStorage(codeCopyDir, sharedExpPath); + + // Create one environment + // FIXME: create new environment when requested + const env: EnvironmentInformation = await this.createEnvironment(sharedExpPath); + this.envs.set(env.id, env); + this.log.info('FrameworkController training service started.'); + this.envManageLoopPromise = this.environmentManagementLoop(); + // Return an empty list because the first environment has not been in running state. + // The available environments will be updated through onEnvironmentUpdate callback. + return [ ]; + } + + public async stop(): Promise { + if (this.envService === undefined) { + throw new Error('Environment service is not initialized'); + } + this.stopping = true; + await this.envManageLoopPromise; + // Stop environments + this.log.debug('Stopping environments...'); + const promises: Promise[] = []; + for (const env of this.envs.values()) { + promises.push(this.envService.stopEnvironment(env)); + } + await Promise.all(promises); + this.log.info('FrameworkController training service stopped.'); + return Promise.resolve(); + } + + /* Following methods are guaranteed to be invoked after start() and before stop(). */ + + public async uploadDirectory(_directoryName: string, _path: string): Promise { + // not in use, because uploading directory has been done in preparing environment. + return; + } + + /** + * By design _trialCommand and _directoryName should be used instead of the command in this.config, + * for now we just use the commands in this.config. + * + * Return trial ID on success. + * Return null if the environment is not available. + **/ + public async createTrial(environmentId: string, _trialCommand: string, _directoryName: string): Promise { + if (this.commandChannel === undefined) { + throw new Error('Command channel is not initialized.'); + } + this.log.info(`Create trial in environment ${environmentId}.`); + const env = this.envs.get(environmentId); + if (env === undefined) { + throw new Error(`Environment ${environmentId} not found`); + } else if (env.status === 'RUNNING' && env.runningTrialCount === 0) { + // FIXME: use existing trialId when resume + const trialId: string = uniqueString(5); + // The logic of obtaining parameter is a little complex + this.commandEmitter.emit('request_parameter', trialId); + const latestParam = this.parameters[this.parameters.length - 1]; + this.log.info(`The obtained parameter: ${latestParam}}`); + assert(latestParam[0] === trialId); + const settings = { + trialId: trialId, + gpuIndices: '', + sequenceId: 0, + parameter: latestParam[1], + } + // FIXME: sendCommand only needs envId + await this.commandChannel.sendCommand(env, NEW_TRIAL_JOB, settings); + env.runningTrialCount = 1; + this.trialToEnv.set(trialId, env); + this.commandEmitter.emit('trial_start', trialId); + return Promise.resolve(trialId); + } else { + this.log.warning(`Environment ${environmentId} is in ${env.status} state, is running ${env.runningTrialCount} trials.`); + return Promise.resolve(null); + } + } + + /** + * Kill a trial. + * The trial ID is guaranteed to exist, but the trial is not guaranteed to be running. + **/ + public async stopTrial(trialId: string): Promise { + this.log.info(`Stop trial ${trialId}.`); + if (this.commandChannel === undefined) { + throw new Error('Command channel is not initialized.'); + } + const env = this.trialToEnv.get(trialId); + if (env === undefined) { + this.log.warning(`Trial ${trialId} not found, stop trial ignored.`); + return Promise.resolve(); + } + await this.commandChannel.sendCommand(env, KILL_TRIAL_JOB, trialId); + return Promise.resolve(); + } + + // TODO: resume trial + + /** + * Send a hyperparameter configuration to a trial. + * Will only be invoked after onRequestParameter(). + **/ + public async sendParameter(trialId: string, parameter: Parameter): Promise { + if (this.commandChannel === undefined) { + throw new Error('Command channel is not initialized, please call init() first'); + } + this.log.debug(`Send parameter ${parameter} to trial ${trialId}.`); + this.parameters.push([trialId, parameter]); + return Promise.resolve(); + } + + /* Following methods are guaranteed to be invoked after init() and before start(). */ + + /** + * Invoke the callback when a trial invokes nni.get_next_parameter(). + **/ + public onRequestParameter(callback: (trialId: string) => Promise): void { + this.log.debug('onRequestParameter callback called.'); + this.commandEmitter.on('request_parameter', (trialId) => { + callback(trialId); + }); + } + + /** + * Invoke the callback when a trial invokes nni.report_final_result() and nni.report_intermediate_result(). + **/ + public onMetric(callback: (trialId: string, metric: Metric) => Promise): void { + this.log.debug('onMetric callback called.'); + this.commandEmitter.on('metric', (trialId, metricStr) => { + this.log.debug(`onMetric callback called with data from trial ${trialId}: ${metricStr}.`); + callback(trialId, metricStr); + }); + } + + /** + * Invoke the callback when a trial process is launched. + * + * If there are multiple listeners, `timestamp` should be consistent. + * + * If the training platform automatically retries failed jobs, the callback should only be invoked on first start. + **/ + public onTrialStart(callback: (trialId: string, timestamp: number) => Promise): void { + this.log.debug('onTrialStart callback called.'); + this.commandEmitter.on('trial_start', (trialId) => { + callback(trialId, Date.now()); + }); + } + + /** + * Invoke the callback when a trial stops. + * + * If the trial stops on its own, provide the exit code. + * If the trial is killed for any reason, set `exitCode` to null. + * + * If there are multiple listeners, `timestamp` should be consistent. + * + * If the training platform automatically retries failed jobs, the callback should only be invoked on last end. + **/ + public onTrialEnd(callback: (trialId: string, timestamp: number, exitCode: number | null) => Promise): void { + this.log.debug('onTrialEnd callback called.'); + this.commandEmitter.on('trial_end', (envId, trialId, exitCode, endTime) => { + const env = this.envs.get(envId) + if (env === undefined) { + throw new Error(`The finished trial's environment ${envId} not found`); + } + env.runningTrialCount = 0; + callback(trialId, endTime, exitCode); + }); + } + + /** + * Invoke the callback when any environment's status changes. + * + * Note that `environments` object should be immutable. + **/ + public onEnvironmentUpdate(callback: (environments: EnvironmentInformation[]) => Promise): void { + this.log.debug('onEnvironmentUpdate callback called.'); + // The callback is invoked when (1) environment is started or exits, (2) trial job is started or finished. + this.commandEmitter.on('env_status_change', () => { + callback(Array.from(this.envs.values()) + .filter((env) => env.status === 'RUNNING' && env.isRunnerReady && env.runningTrialCount === 0)); + }); + } +} diff --git a/ts/nni_manager/training_service/frameworkcontroller_v3/index.ts b/ts/nni_manager/training_service/frameworkcontroller_v3/index.ts new file mode 100644 index 0000000000..01f075d2f4 --- /dev/null +++ b/ts/nni_manager/training_service/frameworkcontroller_v3/index.ts @@ -0,0 +1,4 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +export { FrameworkControllerTSV3 } from './frameworkcontroller'; \ No newline at end of file diff --git a/ts/nni_manager/training_service/frameworkcontroller_v3/webCommandChannel.ts b/ts/nni_manager/training_service/frameworkcontroller_v3/webCommandChannel.ts new file mode 100644 index 0000000000..d5777350ef --- /dev/null +++ b/ts/nni_manager/training_service/frameworkcontroller_v3/webCommandChannel.ts @@ -0,0 +1,141 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { EventEmitter } from "events"; +import { Server as SocketServer } from "ws"; +import { getBasePort, getExperimentId } from "common/experimentStartupInfo"; +import { INITIALIZED } from 'core/commands'; +import { CommandChannel, RunnerConnection } from "./commandChannel"; +import { Channel, EnvironmentInformation } from "./environment"; + +class WebRunnerConnection extends RunnerConnection { + public readonly clients: WebSocket[] = []; + + public async close(): Promise { + await super.close(); + while (this.clients.length > 0) { + const client = this.clients.shift(); + if (client !== undefined) { + client.close(); + } + } + } + + public AddClient(client: WebSocket): void { + this.clients.push(client); + } +} + +export class WebCommandChannel extends CommandChannel { + private readonly expId: string = getExperimentId(); + private static commandChannel: WebCommandChannel; + private webSocketServer: SocketServer | undefined; + private clients: Map = new Map(); + + public get channelName(): Channel { + return "web"; + } + + public async config(_key: string, _value: any): Promise { + // do nothing + } + + // Set WebCommandChannel as singleton mode, one experiment could only start one webCommandChannel instance + private constructor(commandEmitter: EventEmitter) { + super(commandEmitter); + } + + public static getInstance(commandEmitter: EventEmitter): CommandChannel { + if (!this.commandChannel) { + this.commandChannel = new WebCommandChannel(commandEmitter); + } + return this.commandChannel; + } + + public async start(): Promise { + const port = getBasePort() + 1; + this.webSocketServer = new SocketServer({ port }); + + this.webSocketServer.on('connection', (client: WebSocket) => { + this.log.debug(`WebCommandChannel: received connection`); + client.onerror = (event): void => { + this.log.error('error on client', event); + } + + this.clients.set(client, undefined); + client.onmessage = (message): void => { + this.receivedWebSocketMessage(client, message); + }; + }).on('error', (error) => { + this.log.error(`error on websocket server ${error}`); + }); + } + + public async stop(): Promise { + if (this.webSocketServer !== undefined) { + this.webSocketServer.close(); + } + } + + public async run(): Promise{ + // do nothing + } + + protected async sendCommandInternal(environment: EnvironmentInformation, message: string): Promise { + if (this.webSocketServer === undefined) { + throw new Error(`WebCommandChannel: uninitialized!`) + } + const runnerConnection = this.runnerConnections.get(environment.id) as WebRunnerConnection; + if (runnerConnection !== undefined && runnerConnection.clients.length > 0) { + for (const client of runnerConnection.clients) { + client.send(message); + } + } else { + this.log.warning(`WebCommandChannel: cannot find client for env ${environment.id}, message is ignored.`); + } + } + + protected createRunnerConnection(environment: EnvironmentInformation): RunnerConnection { + return new WebRunnerConnection(environment); + } + + private receivedWebSocketMessage(client: WebSocket, message: MessageEvent): void { + let connection = this.clients.get(client) as WebRunnerConnection | undefined; + const rawCommands = message.data.toString(); + + if (connection === undefined) { + // undefined means it's expecting initializing message. + const commands = this.parseCommands(rawCommands); + let isValid = false; + this.log.debug('WebCommandChannel: received initialize message:', rawCommands); + + if (commands.length > 0) { + const commandType = commands[0][0]; + const result = commands[0][1]; + if (commandType === INITIALIZED && + result.expId === this.expId && + this.runnerConnections.has(result.runnerId) + ) { + const runnerConnection = this.runnerConnections.get(result.runnerId) as WebRunnerConnection; + this.clients.set(client, runnerConnection); + runnerConnection.AddClient(client); + connection = runnerConnection; + isValid = true; + this.log.debug(`WebCommandChannel: client of env ${runnerConnection.environment.id} initialized`); + } else { + this.log.warning(`WebCommandChannel: client is not initialized, runnerId: ${result.runnerId}, command: ${commandType}, expId: ${this.expId}, exists: ${this.runnerConnections.has(result.runnerId)}`); + } + } + + if (!isValid) { + this.log.warning(`WebCommandChannel: rejected client with invalid init message ${rawCommands}`); + client.close(); + this.clients.delete(client); + } + } + + if (connection !== undefined) { + this.handleCommand(connection.environment, rawCommands); + } + } +} diff --git a/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerConfig.ts b/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerConfig.ts index 839ca2729d..dfe57db0fd 100644 --- a/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerConfig.ts +++ b/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerConfig.ts @@ -133,6 +133,7 @@ export class FrameworkControllerClusterConfigFactory { } export type FrameworkControllerJobStatus = - 'AttemptRunning' | 'Completed' | 'AttemptCreationPending' | 'AttemptCreationRequested' | 'AttemptPreparing' | 'AttemptCompleted'; + 'AttemptRunning' | 'Completed' | 'AttemptCreationPending' | 'AttemptCreationRequested' | 'AttemptPreparing' | 'AttemptCompleted' + | 'AttemptDeletionPending' | 'AttemptDeletionRequested' | 'AttemptDeleting'; export type FrameworkControllerJobCompleteStatus = 'Succeeded' | 'Failed'; diff --git a/ts/nni_manager/training_service/reusable/environments/kubernetes/frameworkcontrollerEnvironmentService.ts b/ts/nni_manager/training_service/reusable/environments/kubernetes/frameworkcontrollerEnvironmentService.ts index 2e782dc59d..1d495d6540 100644 --- a/ts/nni_manager/training_service/reusable/environments/kubernetes/frameworkcontrollerEnvironmentService.ts +++ b/ts/nni_manager/training_service/reusable/environments/kubernetes/frameworkcontrollerEnvironmentService.ts @@ -72,8 +72,6 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment } let configTaskRoles: any = undefined; configTaskRoles = this.config.taskRoles; - this.log.info('zql configTaskRoles: ', configTaskRoles); - this.log.info('zql configTaskRoles: ', JSON.stringify(configTaskRoles)); //Generate the port used for taskRole this.generateContainerPort(configTaskRoles); @@ -96,11 +94,6 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment frameworkcontrollerJobName ); // Create kubeflow job based on generated kubeflow job resource config - this.log.info('zql environment service config: ', frameworkcontrollerJobConfig); - this.log.info('zql environment service config: ', JSON.stringify(frameworkcontrollerJobConfig)); - // FIXME: dump the config for easy debuggability - // ... - // throw new Error("zql throw test"); await this.kubernetesCRDClient.createKubernetesJob(frameworkcontrollerJobConfig); } @@ -143,15 +136,11 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment private async prepareFrameworkControllerConfig(envId: string, trialWorkingFolder: string, frameworkcontrollerJobName: string): Promise { const podResources: any = []; - this.log.info('zql this.config: ', this.config); - this.log.info('zql this.config: ', JSON.stringify(this.config)); for (const taskRole of this.config.taskRoles) { const resource: any = {}; resource.requests = this.generatePodResource(toMegaBytes(taskRole.memorySize), taskRole.cpuNumber, taskRole.gpuNumber); resource.limits = {...resource.requests}; podResources.push(resource); - this.log.info('zql resource: ', resource); - this.log.info('zql resource: ', JSON.stringify(resource)); } // Generate frameworkcontroller job resource config object const frameworkcontrollerJobConfig: any = @@ -258,17 +247,10 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment }]); } - const securityContext: any = { - fsGroup: 1013, - runAsUser: 1013, - runAsGroup: 1013 - }; - const containers: any = [ { name: 'framework', image: replicaImage, - securityContext: securityContext, command: ['sh', `${path.join(trialWorkingFolder, runScriptFile)}`], volumeMounts: [ { diff --git a/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts b/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts index bfffbeea9e..2cf8927125 100644 --- a/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts +++ b/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts @@ -151,7 +151,7 @@ export class KubernetesEnvironmentService extends EnvironmentService { protected async createNFSStorage(nfsServer: string, nfsPath: string): Promise { await cpp.exec(`mkdir -p ${this.nfsRootDir}`); try { - await cpp.exec(`echo 'quzha' | sudo -S mount ${nfsServer}:${nfsPath} ${this.nfsRootDir}`); + await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.nfsRootDir}`); } catch (error) { const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.nfsRootDir} failed, error is ${error}`; this.log.error(mountError); diff --git a/ts/nni_manager/training_service/reusable/trialDispatcher.ts b/ts/nni_manager/training_service/reusable/trialDispatcher.ts index 4b4afae161..65ce50a163 100644 --- a/ts/nni_manager/training_service/reusable/trialDispatcher.ts +++ b/ts/nni_manager/training_service/reusable/trialDispatcher.ts @@ -13,7 +13,7 @@ import { getBasePort, getExperimentId } from 'common/experimentStartupInfo'; import { getLogger, Logger } from 'common/log'; import { TrainingService, TrialJobApplicationForm, TrialJobMetric, TrialJobStatus } from 'common/trainingService'; import { delay, getExperimentRootDir, getIPV4Address, getLogLevel, getVersion, mkDirPSync, randomSelect, uniqueString } from 'common/utils'; -import { ExperimentConfig, SharedStorageConfig, TrainingServiceConfig, FrameworkControllerConfig } from 'common/experimentConfig'; +import { ExperimentConfig, SharedStorageConfig } from 'common/experimentConfig'; import { GPU_INFO, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, REPORT_METRIC_DATA, SEND_TRIAL_JOB_PARAMETER, STDOUT, TRIAL_END, VERSION_CHECK } from 'core/commands'; import { ScheduleResultType } from 'training_service/common/gpuData'; import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData'; @@ -30,7 +30,6 @@ import { SharedStorageService } from './sharedStorage'; import { NFSSharedStorageService } from './shared_storages/nfsStorageService' import { AzureBlobSharedStorageService } from './shared_storages/azureblobStorageService' import { TrialDetail } from './trial'; -import { assert } from 'console'; /** * It uses to manage jobs on training platforms @@ -229,6 +228,9 @@ class TrialDispatcher implements TrainingService { public async run(): Promise { await Promise.all(this.environmentServiceList.map(env => env.init())); for(const environmentService of this.environmentServiceList) { + + + await environmentService.getCommandChannel.start(); this.log.info(`TrialDispatcher: started channel: ${environmentService.getCommandChannel.constructor.name}`); @@ -493,7 +495,6 @@ class TrialDispatcher implements TrainingService { for (const environment of this.environments.values()) { if (environment.isAlive === true) { liveEnvironmentsCount++; - // FIXME: post warning message that environment/pod takes too long to be ready!!! if (environment.status === "RUNNING" && environment.isRunnerReady) { // if environment is not reusable and used, stop and not count as idle; const reuseMode = Array.isArray(this.config.trainingService) || (this.config.trainingService as any).reuseMode; @@ -639,7 +640,7 @@ class TrialDispatcher implements TrainingService { return randomSelect(validEnvironmentServiceList); } - private async prefetchEnvironments(): Promise { + private async prefetchEnvironments (): Promise { for (const environmentService of this.environmentServiceList) { const number = environmentService.prefetchedEnvironmentCount; this.log.info(`Initialize environments total number: ${number}`); @@ -649,19 +650,6 @@ class TrialDispatcher implements TrainingService { } } - private extractTrialCommands(trainingServiceName: string, trainingService: TrainingServiceConfig): string[] { - // FIXME: deal with different training services. - const trialCommands: string[] = []; - if (trainingServiceName === 'frameworkcontroller') { - // FIXME: deal with the mode of referencing backend's (e.g., frameworkcontroller) own configuration file - for (const taskRole of (trainingService as FrameworkControllerConfig).taskRoles) { - trialCommands.push(taskRole.command); - } - } - // else... - return trialCommands; - } - private async setEnvironmentSetting(environment: EnvironmentInformation): Promise { if (environment.environmentService === undefined) { throw new Error(`Environmentservice for ${environment.id} not initialized!`); @@ -672,35 +660,14 @@ class TrialDispatcher implements TrainingService { runnerSettings.nniManagerPort = getBasePort() + 1; runnerSettings.commandChannel = environmentService.getCommandChannel.channelName; runnerSettings.enableGpuCollector = this.enableGpuScheduler; - // trialCommand might be empty, in which case trialCommand is specified in taskRoles - // FIXME: rethink the relation between config.trialCommand and the command(s) in trainingservice config, - // maybe we should not provide such flexibility. - if (this.config.trialCommand === '') { - runnerSettings.command = [this.config.trialCommand]; - } - else { - if (this.config.trainingService instanceof Array) { - const foundTS = this.config.trainingService.find(element => element.platform === environmentService.getName); - if (foundTS !== undefined) - runnerSettings.command = this.extractTrialCommands(environmentService.getName, foundTS); - else - throw new Error(`${environmentService.getName} of the environment service cannot be found in config!`); - } - else { - runnerSettings.command = this.extractTrialCommands(environmentService.getName, this.config.trainingService); - } - } + runnerSettings.command = this.config.trialCommand; runnerSettings.nniManagerVersion = this.enableVersionCheck ? await getVersion() : ''; runnerSettings.logCollection = this.logCollection; runnerSettings.platform = environmentService.getName; runnerSettings.experimentId = this.experimentId; - this.log.info('zql setting config: ', this.config); - this.log.info('zql setting config: ', JSON.stringify(this.config)); const storageService: StorageService = this.getStorageService(environmentService); const envDir = storageService.joinPath("envs"); const runnerSettingsConfig = storageService.joinPath(envDir, environment.id, "settings.json"); - this.log.info('zql runnerSettings: ', runnerSettings); - this.log.info('zql runnerSettings: ', JSON.stringify(runnerSettings)); await storageService.save(JSON.stringify(runnerSettings), runnerSettingsConfig); } @@ -726,7 +693,6 @@ class TrialDispatcher implements TrainingService { // Generate setting.json file per environment to avoid conflict await this.setEnvironmentSetting(environment); - // FIXME: handles errors await environmentService.startEnvironment(environment); this.environments.set(environment.id, environment); diff --git a/ts/nni_manager/training_service/v3/compat.ts b/ts/nni_manager/training_service/v3/compat.ts index 8b88b21c48..6100ef1c02 100644 --- a/ts/nni_manager/training_service/v3/compat.ts +++ b/ts/nni_manager/training_service/v3/compat.ts @@ -4,10 +4,12 @@ import { EventEmitter } from 'events'; import { readFile } from 'fs/promises'; import path from 'path'; +import { setTimeout } from 'timers/promises'; import { Deferred } from 'common/deferred'; import type { TrainingServiceConfig } from 'common/experimentConfig'; import globals from 'common/globals'; +import { getLogger } from 'common/log'; import { TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus } from 'common/trainingService'; @@ -18,6 +20,20 @@ type MutableTrialJobDetail = { -readonly [Property in keyof TrialJobDetail]: TrialJobDetail[Property]; }; +const placeholderDetail: TrialJobDetail = { + id: '', + status: 'UNKNOWN', + submitTime: 0, + workingDirectory: '_unset_', + form: { + sequenceId: -1, + hyperParameters: { + value: 'null', + index: -1, + } + } +}; + export class V3asV1 implements TrainingService { private config: TrainingServiceConfig; private v3: TrainingServiceV3; @@ -27,7 +43,7 @@ export class V3asV1 implements TrainingService { private startDeferred: Deferred = new Deferred(); private trialJobs: Record = {}; - private parameters: Record = {}; + private parameters: Parameter[] = []; private environments: EnvironmentInfo[] = []; private lastEnvId: string = ''; @@ -56,21 +72,31 @@ export class V3asV1 implements TrainingService { public async submitTrialJob(form: TrialJobApplicationForm): Promise { await this.startDeferred.promise; let trialId: string | null = null; + let submitTime: number = 0; + this.parameters.push(form.hyperParameters.value); while (trialId === null) { const envId = this.schedule(); + if (envId === null) { + await setTimeout(1000); + continue; + } + submitTime = Date.now(); trialId = await this.v3.createTrial(envId, this.config.trialCommand, 'trial_code'); } - // In new interface, hyper parameters will be sent on demand. - this.parameters[trialId] = form.hyperParameters.value; - - this.trialJobs[trialId] = { - id: trialId, - status: 'WAITING', - submitTime: Date.now(), - workingDirectory: '_unset_', // never set in current remote training service, so it's optional - form: form, - }; + if (this.trialJobs[trialId] === undefined) { + this.trialJobs[trialId] = { + id: trialId, + status: 'WAITING', + submitTime, + workingDirectory: '_unset_', // never set in current remote training service, so it's optional + form: form, + }; + } else { + // `await createTrial()` is not atomic, so onTrialStart callback might be invoked before this + this.trialJobs[trialId].submitTime = submitTime; + this.trialJobs[trialId].form = form; + } return this.trialJobs[trialId]; } @@ -136,12 +162,20 @@ export class V3asV1 implements TrainingService { await this.v3.init(); this.v3.onRequestParameter(async (trialId) => { - await this.v3.sendParameter(trialId, this.parameters[trialId]); + if (this.parameters.length > 0) { + await this.v3.sendParameter(trialId, this.parameters.shift()!); + } else { + getLogger('TrainingServiceCompat').error('No parameters available'); + } }); this.v3.onMetric(async (trialId, metric) => { this.emitter.emit('metric', { id: trialId, data: metric }); }); this.v3.onTrialStart(async (trialId, timestamp) => { + if (this.trialJobs[trialId] === undefined) { + this.trialJobs[trialId] = structuredClone(placeholderDetail); + this.trialJobs[trialId].id = trialId; + } this.trialJobs[trialId].status = 'RUNNING'; this.trialJobs[trialId].startTime = timestamp; }); @@ -168,13 +202,16 @@ export class V3asV1 implements TrainingService { this.startDeferred.resolve(); } - private schedule(): string { + private schedule(): string | null { // Simple round-robin schedule. // Find the last used environment and select next one. // If the last used environment is not found (destroyed), use first environment. + if (this.environments.length === 0) { + return null; + } const prevIndex = this.environments.findIndex((env) => env.id === this.lastEnvId); const index = (prevIndex + 1) % this.environments.length; this.lastEnvId = this.environments[index].id; return this.lastEnvId; } -} +} \ No newline at end of file diff --git a/ts/nni_manager/training_service/v3/factory.ts b/ts/nni_manager/training_service/v3/factory.ts index 86d5132f25..d2eaf35ed0 100644 --- a/ts/nni_manager/training_service/v3/factory.ts +++ b/ts/nni_manager/training_service/v3/factory.ts @@ -11,6 +11,7 @@ import type { LocalConfig, RemoteConfig, TrainingServiceConfig } from 'common/ex import type { TrainingServiceV3 } from 'common/training_service_v3'; //import { LocalTrainingServiceV3 } from './local'; //import { RemoteTrainingServiceV3 } from './remote'; +import { FrameworkControllerTSV3 } from '../frameworkcontroller_v3'; export function trainingServiceFactoryV3(config: TrainingServiceConfig): TrainingServiceV3 { //if (config.platform === 'local_v3') { @@ -18,6 +19,9 @@ export function trainingServiceFactoryV3(config: TrainingServiceConfig): Trainin //} else if (config.platform === 'remote_v3') { // return new RemoteTrainingServiceV3(config); //} else { - throw new Error(`Bad training service platform: ${config.platform}`); - //} + if (config.platform === 'frameworkcontroller') { + return new FrameworkControllerTSV3('frameworkcontroller', config); + } else { + throw new Error(`Bad training service platform: ${config.platform}`); + } } From 179bf21561331614b6cd28092b3d670b87f1cb8f Mon Sep 17 00:00:00 2001 From: quzha Date: Sun, 19 Feb 2023 22:27:21 +0800 Subject: [PATCH 3/4] update --- .../frameworkcontroller_v3/frameworkcontroller.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ts/nni_manager/training_service/frameworkcontroller_v3/frameworkcontroller.ts b/ts/nni_manager/training_service/frameworkcontroller_v3/frameworkcontroller.ts index f63d7b2e9b..66aa7112d4 100644 --- a/ts/nni_manager/training_service/frameworkcontroller_v3/frameworkcontroller.ts +++ b/ts/nni_manager/training_service/frameworkcontroller_v3/frameworkcontroller.ts @@ -392,7 +392,8 @@ export class FrameworkControllerTSV3 implements TrainingServiceV3 { * Return trial ID on success. * Return null if the environment is not available. **/ - public async createTrial(environmentId: string, _trialCommand: string, _directoryName: string): Promise { + public async createTrial(environmentId: string, _trialCommand: string, + _directoryName: string, _sequenceId?: number): Promise { if (this.commandChannel === undefined) { throw new Error('Command channel is not initialized.'); } From eb8fcf7c4cd0b9fb6f5c1b9226363f201e308e55 Mon Sep 17 00:00:00 2001 From: quzha Date: Sun, 19 Feb 2023 22:36:04 +0800 Subject: [PATCH 4/4] update --- nni/tools/trial_tool/trial_runner.py | 2 +- .../frameworkcontrollerTrainingService.ts | 1 - .../kubernetes/kubernetesApiClient.ts | 12 ------------ .../training_service/reusable/environment.ts | 2 +- ts/nni_manager/training_service/v3/compat.ts | 2 +- 5 files changed, 3 insertions(+), 16 deletions(-) diff --git a/nni/tools/trial_tool/trial_runner.py b/nni/tools/trial_tool/trial_runner.py index 55b9a2d624..ef56b47485 100644 --- a/nni/tools/trial_tool/trial_runner.py +++ b/nni/tools/trial_tool/trial_runner.py @@ -251,4 +251,4 @@ def check_version(args): trial_runner_syslogger.close() # the process doesn't exit even main loop exit. So exit it explictly. - os._exit(0) \ No newline at end of file + os._exit(0) diff --git a/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts b/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts index 94f6862569..dcba6253d2 100644 --- a/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts +++ b/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts @@ -186,7 +186,6 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple frameworkcontrollerJobName ); } - this.log.info('zql config: ', frameworkcontrollerJobConfig); await this.kubernetesCRDClient.createKubernetesJob(frameworkcontrollerJobConfig); // Set trial job detail until create frameworkcontroller job successfully diff --git a/ts/nni_manager/training_service/kubernetes/kubernetesApiClient.ts b/ts/nni_manager/training_service/kubernetes/kubernetesApiClient.ts index 0e17d4ec52..2e8f15c044 100644 --- a/ts/nni_manager/training_service/kubernetes/kubernetesApiClient.ts +++ b/ts/nni_manager/training_service/kubernetes/kubernetesApiClient.ts @@ -182,18 +182,6 @@ abstract class KubernetesCRDClient { } public async createKubernetesJob(jobManifest: any): Promise { - // await this.operator.post({body: jobManifest}) - // .then((response: any) => { - // if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { - // return Promise.resolve(); - // } else { - // return Promise.reject(`KubernetesApiClient createKubernetesJob failed, statusCode is ${response.statusCode}`); - // } - // }) - // .catch((error: Error) => { - // return Promise.reject(`Failed in creating Kubernetes job: KubernetesApiClient post error ${error}`); - // }); - try { const response: any = await this.operator.post({body: jobManifest}); if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { diff --git a/ts/nni_manager/training_service/reusable/environment.ts b/ts/nni_manager/training_service/reusable/environment.ts index 19cf8855a2..2874ab2104 100644 --- a/ts/nni_manager/training_service/reusable/environment.ts +++ b/ts/nni_manager/training_service/reusable/environment.ts @@ -192,7 +192,7 @@ export class RunnerSettings { public nniManagerPort: number = 8081; public nniManagerVersion: string = ""; public logCollection: string = "none"; - public command: string[] = []; + public command: string = ""; public enableGpuCollector: boolean = true; // specify which communication channel is used by runner. diff --git a/ts/nni_manager/training_service/v3/compat.ts b/ts/nni_manager/training_service/v3/compat.ts index c686cbf377..ca733222fc 100644 --- a/ts/nni_manager/training_service/v3/compat.ts +++ b/ts/nni_manager/training_service/v3/compat.ts @@ -214,4 +214,4 @@ export class V3asV1 implements TrainingService { this.lastEnvId = this.environments[index].id; return this.lastEnvId; } -} \ No newline at end of file +}