|
5 | 5 | "cell_type": "markdown", |
6 | 6 | "metadata": {}, |
7 | 7 | "source": [ |
8 | | - "TODO: Advanced Dask parallelization for HPC" |
| 8 | + "# Parallelization\n", |
| 9 | + "\n", |
| 10 | + "TPOT2 uses the Dask package for parallelization either locally (dask.destributed.LocalCluster) or multi-node via a job schedule (dask-jobqueue). \n", |
| 11 | + "\n", |
| 12 | + "## Local Machine Parallelization\n", |
| 13 | + "\n", |
| 14 | + "TPOT2 can be easily parallelized on a local computer by setting the n_jobs and memory_limit parameters.\n", |
| 15 | + "\n", |
| 16 | + "`n_jobs` dictates how many dask workers to launch. In TPOT2 this corresponds to the number of pipelines to evaluate in parallel.\n", |
| 17 | + "\n", |
| 18 | + "`memory_limit` is the amount of RAM to use per worker. " |
| 19 | + ] |
| 20 | + }, |
| 21 | + { |
| 22 | + "cell_type": "code", |
| 23 | + "execution_count": null, |
| 24 | + "metadata": {}, |
| 25 | + "outputs": [], |
| 26 | + "source": [ |
| 27 | + "import tpot2\n", |
| 28 | + "import sklearn\n", |
| 29 | + "import sklearn.datasets\n", |
| 30 | + "import numpy as np\n", |
| 31 | + "scorer = sklearn.metrics.get_scorer('roc_auc_ovr')\n", |
| 32 | + "X, y = sklearn.datasets.load_digits(return_X_y=True)\n", |
| 33 | + "X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)\n", |
| 34 | + "\n", |
| 35 | + "\n", |
| 36 | + "est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit=\"4GB\", verbose=1)\n", |
| 37 | + "est.fit(X_train, y_train)\n", |
| 38 | + "print(scorer(est, X_test, y_test))" |
9 | 39 | ] |
10 | 40 | }, |
11 | 41 | { |
12 | 42 | "attachments": {}, |
13 | 43 | "cell_type": "markdown", |
14 | 44 | "metadata": {}, |
15 | 45 | "source": [ |
16 | | - "Dask Dashboard\n", |
| 46 | + "## Manual Dask Clients and Dashboard\n", |
| 47 | + "\n", |
| 48 | + "You can also manually initialize a dask client. This can be useful to gain additional control over the parallelization, debugging, as well as viewing a dashboard of the live performance of TPOT2.\n", |
| 49 | + "\n", |
| 50 | + "You can find more details in the official [documentation here.](https://docs.dask.org/en/stable/)\n", |
| 51 | + "\n", |
| 52 | + "\n", |
| 53 | + "[Dask Python Tutorial](https://docs.dask.org/en/stable/deploying-python.html)\n", |
| 54 | + "[Dask Dashboard](https://docs.dask.org/en/stable/dashboard.html)" |
| 55 | + ] |
| 56 | + }, |
| 57 | + { |
| 58 | + "attachments": {}, |
| 59 | + "cell_type": "markdown", |
| 60 | + "metadata": {}, |
| 61 | + "source": [ |
| 62 | + "Initializing a basic dask local cluster" |
| 63 | + ] |
| 64 | + }, |
| 65 | + { |
| 66 | + "cell_type": "code", |
| 67 | + "execution_count": null, |
| 68 | + "metadata": {}, |
| 69 | + "outputs": [], |
| 70 | + "source": [ |
| 71 | + "from dask.distributed import Client, LocalCluster\n", |
| 72 | + "\n", |
| 73 | + "n_jobs = 4\n", |
| 74 | + "memory_limit = \"4GB\"\n", |
17 | 75 | "\n", |
18 | | - "https://docs.dask.org/en/stable/dashboard.html" |
| 76 | + "cluster = LocalCluster(n_workers=n_jobs, #if no client is passed in and no global client exists, create our own\n", |
| 77 | + " threads_per_worker=1,\n", |
| 78 | + " memory_limit=memory_limit)\n", |
| 79 | + "client = Client(cluster)" |
19 | 80 | ] |
20 | 81 | }, |
21 | 82 | { |
22 | 83 | "attachments": {}, |
23 | 84 | "cell_type": "markdown", |
24 | 85 | "metadata": {}, |
25 | 86 | "source": [ |
26 | | - "Click the link to get to a live dashboard" |
| 87 | + "Get the link to view the dask Dashboard. " |
27 | 88 | ] |
28 | 89 | }, |
29 | 90 | { |
|
32 | 93 | "metadata": {}, |
33 | 94 | "outputs": [], |
34 | 95 | "source": [ |
35 | | - "#TODO\n", |
36 | | - "from dask.distributed import Client\n", |
37 | | - "client = Client() # start distributed scheduler locally.\n", |
38 | | - "client" |
| 96 | + " client.dashboard_link" |
39 | 97 | ] |
40 | 98 | }, |
41 | 99 | { |
42 | 100 | "attachments": {}, |
43 | 101 | "cell_type": "markdown", |
44 | 102 | "metadata": {}, |
45 | 103 | "source": [ |
46 | | - "Dask single node" |
| 104 | + "Pass into TPOT to Train.\n", |
| 105 | + "Note that the if a client is passed in manually, TPOT will ignore n_jobs and memory_limit.\n", |
| 106 | + "If there is no client passed in, TPOT will ignore any global/existing client and create its own." |
47 | 107 | ] |
48 | 108 | }, |
49 | 109 | { |
|
52 | 112 | "metadata": {}, |
53 | 113 | "outputs": [], |
54 | 114 | "source": [ |
55 | | - "#TODO" |
| 115 | + "est = tpot2.TPOTClassifier(population_size= 8, generations=5, client=client verbose=1)\n", |
| 116 | + "# this is equivalent to: \n", |
| 117 | + "# est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit=\"4GB\", verbose=1)\n", |
| 118 | + "est.fit(X_train, y_train)\n", |
| 119 | + "print(scorer(est, X_test, y_test))\n", |
| 120 | + "\n", |
| 121 | + "#It is good to close the client and cluster when you are done with them\n", |
| 122 | + "client.close()\n", |
| 123 | + "cluster.close()" |
56 | 124 | ] |
57 | 125 | }, |
58 | 126 | { |
59 | 127 | "attachments": {}, |
60 | 128 | "cell_type": "markdown", |
61 | 129 | "metadata": {}, |
62 | 130 | "source": [ |
63 | | - "Dask multiple nodes" |
| 131 | + "Option 2\n", |
| 132 | + "\n", |
| 133 | + "You can initialize the cluster and client with a context manager that will automatically close them. " |
64 | 134 | ] |
65 | 135 | }, |
66 | 136 | { |
|
69 | 139 | "metadata": {}, |
70 | 140 | "outputs": [], |
71 | 141 | "source": [ |
72 | | - "#TODO" |
| 142 | + "from dask.distributed import Client, LocalCluster\n", |
| 143 | + "import tpot2\n", |
| 144 | + "import sklearn\n", |
| 145 | + "import sklearn.datasets\n", |
| 146 | + "import numpy as np\n", |
| 147 | + "\n", |
| 148 | + "scorer = sklearn.metrics.get_scorer('roc_auc_ovr')\n", |
| 149 | + "X, y = sklearn.datasets.load_digits(return_X_y=True)\n", |
| 150 | + "X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)\n", |
| 151 | + "\n", |
| 152 | + "\n", |
| 153 | + "n_jobs = 4\n", |
| 154 | + "memory_limit = \"4GB\"\n", |
| 155 | + "\n", |
| 156 | + "with LocalCluster( \n", |
| 157 | + " n_workers=n_jobs,\n", |
| 158 | + " threads_per_worker=1,\n", |
| 159 | + " memory_limit='4GB',\n", |
| 160 | + ") as cluster, Client(cluster) as client:\n", |
| 161 | + " est = tpot2.TPOTClassifier(population_size= 8, generations=5, client=client, verbose=1)\n", |
| 162 | + " est.fit(X_train, y_train)\n", |
| 163 | + " print(scorer(est, X_test, y_test))" |
| 164 | + ] |
| 165 | + }, |
| 166 | + { |
| 167 | + "attachments": {}, |
| 168 | + "cell_type": "markdown", |
| 169 | + "metadata": {}, |
| 170 | + "source": [ |
| 171 | + "## Dask multi node parallelization\n", |
| 172 | + "\n", |
| 173 | + "Dask can parallelize across multiple nodes via job queueing systems. This is done using the dask-jobqueue package. More information can be found in the official [documentation here.]( https://jobqueue.dask.org/en/latest/)\n", |
| 174 | + "\n", |
| 175 | + "To parallelize TPOT2 with dask-jobqueue, simply pass in a client based on a jobqueue cluster with desired settings into the client parameter. Each job will evaluate a single pipeline.\n", |
| 176 | + "\n", |
| 177 | + "Note that TPOT will ignore n_jobs and memory_limit as these should be set inside the dask cluster. " |
| 178 | + ] |
| 179 | + }, |
| 180 | + { |
| 181 | + "cell_type": "code", |
| 182 | + "execution_count": null, |
| 183 | + "metadata": {}, |
| 184 | + "outputs": [], |
| 185 | + "source": [ |
| 186 | + "from dask.distributed import Client, LocalCluster\n", |
| 187 | + "import sklearn\n", |
| 188 | + "import sklearn.datasets\n", |
| 189 | + "import sklearn.metrics\n", |
| 190 | + "import sklearn.model_selection\n", |
| 191 | + "import tpot2\n", |
| 192 | + "\n", |
| 193 | + "from dask_jobqueue import SGECluster # or SLURMCluster, PBSCluster, etc. Replace SGE with your scheduler.\n", |
| 194 | + "cluster = SGECluster(\n", |
| 195 | + " queue='all.q',\n", |
| 196 | + " cores=2,\n", |
| 197 | + " memory=\"50 GB\"\n", |
| 198 | + "\n", |
| 199 | + ")\n", |
| 200 | + "\n", |
| 201 | + "cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs\n", |
| 202 | + "\n", |
| 203 | + "client = Client(cluster)\n", |
| 204 | + "\n", |
| 205 | + "scorer = sklearn.metrics.get_scorer('roc_auc_ovr')\n", |
| 206 | + "X, y = sklearn.datasets.load_digits(return_X_y=True)\n", |
| 207 | + "X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)\n", |
| 208 | + "\n", |
| 209 | + "est = tpot2.TPOTClassifier(population_size= 100, generations=5, client=client, verbose=1)\n", |
| 210 | + "# this is equivalent to: \n", |
| 211 | + "# est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit=\"4GB\", verbose=1)\n", |
| 212 | + "est.fit(X_train, y_train)\n", |
| 213 | + "print(scorer(est, X_test, y_test))\n", |
| 214 | + "\n", |
| 215 | + "#It is good to close the client and cluster when you are done with them\n", |
| 216 | + "client.close()\n", |
| 217 | + "cluster.close()" |
73 | 218 | ] |
74 | 219 | } |
75 | 220 | ], |
|
89 | 234 | "name": "python", |
90 | 235 | "nbconvert_exporter": "python", |
91 | 236 | "pygments_lexer": "ipython3", |
92 | | - "version": "3.10.9" |
| 237 | + "version": "3.10.10" |
93 | 238 | }, |
94 | 239 | "orig_nbformat": 4, |
95 | 240 | "vscode": { |
|
0 commit comments