|
14 | 14 | "cell_type": "markdown",
|
15 | 15 | "metadata": {},
|
16 | 16 | "source": [
|
17 |
| - "# Store Kerchunk Reference Files as Parquet" |
| 17 | + "# Store virtual datasets as Kerchunk Parquet references" |
18 | 18 | ]
|
19 | 19 | },
|
20 | 20 | {
|
|
24 | 24 | "source": [
|
25 | 25 | "## Overview\n",
|
26 | 26 | " \n",
|
27 |
| - "In this notebook we will cover how to store Kerchunk references as Parquet files instead of json. For large reference datasets, using Parquet should have performance implications as the overall reference file size should be smaller and the memory overhead of combining the reference files should be lower. \n", |
| 27 | + "In this notebook we will cover how to store virtual datasets as Kerchunk Parquet references instead of Kerchunk JSON references. For large virtual datasets, using Parquet should have performance implications as the overall reference file size should be smaller and the memory overhead of combining the reference files should be lower. \n", |
28 | 28 | "\n",
|
29 | 29 | "\n",
|
30 | 30 | "This notebook builds upon the [Kerchunk Basics](notebooks/foundations/01_kerchunk_basics.ipynb), [Multi-File Datasets with Kerchunk](notebooks/foundations/02_kerchunk_multi_file.ipynb) and the [Kerchunk and Dask](notebooks/foundations/03_kerchunk_dask.ipynb) notebooks. \n",
|
31 | 31 | "\n",
|
32 | 32 | "## Prerequisites\n",
|
33 | 33 | "| Concepts | Importance | Notes |\n",
|
34 | 34 | "| --- | --- | --- |\n",
|
35 |
| - "| [Kerchunk Basics](../foundations/kerchunk_basics) | Required | Core |\n", |
36 |
| - "| [Multiple Files and Kerchunk](../foundations/kerchunk_multi_file) | Required | Core |\n", |
37 |
| - "| [Introduction to Xarray](https://foundations.projectpythia.org/core/xarray/xarray-intro.html) | Recommended | IO/Visualization |\n", |
38 |
| - "| [Intro to Dask](https://tutorial.dask.org/00_overview.html) | Required | Parallel Processing |\n", |
| 35 | + "| [Basics of virtual Zarr stores](../foundations/01_kerchunk_basics.ipynb) | Required | Core |\n", |
| 36 | + "| [Multi-file virtual datasets with VirtualiZarr](../foundations/02_kerchunk_multi_file.ipynb) | Required | Core |\n", |
| 37 | + "| [Parallel virtual dataset creation with VirtualiZarr, Kerchunk, and Dask](../foundations/03_kerchunk_dask) | Required | Core |\n", |
| 38 | + "| [Introduction to Xarray](https://foundations.projectpythia.org/core/xarray/xarray-intro.html) | Required | IO/Visualization |\n", |
39 | 39 | "\n",
|
40 | 40 | "- **Time to learn**: 30 minutes\n",
|
41 | 41 | "---"
|
|
47 | 47 | "metadata": {},
|
48 | 48 | "source": [
|
49 | 49 | "## Imports\n",
|
50 |
| - "In addition to the previous imports we used throughout the tutorial, we are adding a few imports:\n", |
51 |
| - "- `LazyReferenceMapper` and `ReferenceFileSystem` from `fsspec.implementations.reference` for lazy Parquet.\n", |
52 | 50 | "\n",
|
53 | 51 | "\n"
|
54 | 52 | ]
|
|
60 | 58 | "outputs": [],
|
61 | 59 | "source": [
|
62 | 60 | "import logging\n",
|
63 |
| - "import os\n", |
64 | 61 | "\n",
|
65 | 62 | "import dask\n",
|
66 | 63 | "import fsspec\n",
|
67 | 64 | "import xarray as xr\n",
|
68 | 65 | "from distributed import Client\n",
|
69 |
| - "from fsspec.implementations.reference import LazyReferenceMapper, ReferenceFileSystem\n", |
70 |
| - "from kerchunk.combine import MultiZarrToZarr\n", |
71 |
| - "from kerchunk.hdf import SingleHdf5ToZarr" |
| 66 | + "from virtualizarr import open_virtual_dataset" |
72 | 67 | ]
|
73 | 68 | },
|
74 | 69 | {
|
|
111 | 106 | "files_paths = fs_read.glob(\"s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/*\")\n",
|
112 | 107 | "\n",
|
113 | 108 | "# Here we prepend the prefix 's3://', which points to AWS.\n",
|
114 |
| - "file_pattern = sorted([\"s3://\" + f for f in files_paths])\n", |
115 |
| - "\n", |
116 |
| - "# Grab the first seven files to speed up example.\n", |
117 |
| - "file_pattern = file_pattern[0:2]" |
| 109 | + "files_paths = sorted([\"s3://\" + f for f in files_paths])" |
| 110 | + ] |
| 111 | + }, |
| 112 | + { |
| 113 | + "cell_type": "markdown", |
| 114 | + "metadata": {}, |
| 115 | + "source": [ |
| 116 | + "### Subset the Data\n", |
| 117 | + "To speed up our example, lets take a subset of the year of data. " |
| 118 | + ] |
| 119 | + }, |
| 120 | + { |
| 121 | + "cell_type": "code", |
| 122 | + "execution_count": null, |
| 123 | + "metadata": {}, |
| 124 | + "outputs": [], |
| 125 | + "source": [ |
| 126 | + "# If the subset_flag == True (default), the list of input files will\n", |
| 127 | + "# be subset to speed up the processing\n", |
| 128 | + "subset_flag = True\n", |
| 129 | + "if subset_flag:\n", |
| 130 | + " files_paths = files_paths[0:4]" |
118 | 131 | ]
|
119 | 132 | },
|
120 | 133 | {
|
|
123 | 136 | "metadata": {},
|
124 | 137 | "source": [
|
125 | 138 | "# Generate Lazy References\n",
|
126 |
| - "Below we will create a `fsspec` filesystem to read the references from `s3` and create a function to generate dask delayed tasks." |
| 139 | + "\n", |
| 140 | + "Here we create a function to generate a list of Dask delayed objects." |
127 | 141 | ]
|
128 | 142 | },
|
129 | 143 | {
|
|
132 | 146 | "metadata": {},
|
133 | 147 | "outputs": [],
|
134 | 148 | "source": [
|
135 |
| - "# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk`\n", |
136 |
| - "# index from a NetCDF file.\n", |
137 |
| - "fs_read = fsspec.filesystem(\"s3\", anon=True, skip_instance_cache=True)\n", |
138 |
| - "so = dict(mode=\"rb\", anon=True, default_fill_cache=False, default_cache_type=\"first\")\n", |
139 |
| - "\n", |
140 |
| - "\n", |
141 |
| - "def generate_json_reference(fil):\n", |
142 |
| - " with fs_read.open(fil, **so) as infile:\n", |
143 |
| - " h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)\n", |
144 |
| - " return h5chunks.translate() # outf\n", |
| 149 | + "def generate_virtual_dataset(file, storage_options):\n", |
| 150 | + " return open_virtual_dataset(\n", |
| 151 | + " file, indexes={}, reader_options={\"storage_options\": storage_options}\n", |
| 152 | + " )\n", |
145 | 153 | "\n",
|
146 | 154 | "\n",
|
| 155 | + "storage_options = dict(anon=True, default_fill_cache=False, default_cache_type=\"first\")\n", |
147 | 156 | "# Generate Dask Delayed objects\n",
|
148 |
| - "tasks = [dask.delayed(generate_json_reference)(fil) for fil in file_pattern]" |
| 157 | + "tasks = [\n", |
| 158 | + " dask.delayed(generate_virtual_dataset)(file, storage_options)\n", |
| 159 | + " for file in files_paths\n", |
| 160 | + "]" |
149 | 161 | ]
|
150 | 162 | },
|
151 | 163 | {
|
|
163 | 175 | "metadata": {},
|
164 | 176 | "outputs": [],
|
165 | 177 | "source": [
|
166 |
| - "single_refs = dask.compute(tasks)[0]" |
| 178 | + "virtual_datasets = list(dask.compute(*tasks))" |
| 179 | + ] |
| 180 | + }, |
| 181 | + { |
| 182 | + "attachments": {}, |
| 183 | + "cell_type": "markdown", |
| 184 | + "metadata": {}, |
| 185 | + "source": [ |
| 186 | + "## Combine virtual datasets using VirtualiZarr" |
167 | 187 | ]
|
168 | 188 | },
|
169 | 189 | {
|
|
172 | 192 | "metadata": {},
|
173 | 193 | "outputs": [],
|
174 | 194 | "source": [
|
175 |
| - "len(single_refs)" |
| 195 | + "combined_vds = xr.combine_nested(\n", |
| 196 | + " virtual_datasets, concat_dim=[\"time\"], coords=\"minimal\", compat=\"override\"\n", |
| 197 | + ")\n", |
| 198 | + "combined_vds" |
176 | 199 | ]
|
177 | 200 | },
|
178 | 201 | {
|
179 |
| - "attachments": {}, |
180 | 202 | "cell_type": "markdown",
|
181 | 203 | "metadata": {},
|
182 | 204 | "source": [
|
183 |
| - "## Combine In-Memory References with MultiZarrToZarr\n", |
184 |
| - "This section will look notably different than the previous examples that have written to `.json`.\n", |
185 |
| - "\n", |
186 |
| - "In the following code block we are:\n", |
187 |
| - "- Creating an `fsspec` filesystem.\n", |
188 |
| - "- Create a empty `parquet` file to write to. \n", |
189 |
| - "- Creating an `fsspec` `LazyReferenceMapper` to pass into `MultiZarrToZarr`\n", |
190 |
| - "- Building a `MultiZarrToZarr` object of the combined references.\n", |
191 |
| - "- Calling `.flush()` on our LazyReferenceMapper, to write the combined reference to our `parquet` file." |
| 205 | + "## Write the virtual dataset to a Kerchunk Parquet reference" |
192 | 206 | ]
|
193 | 207 | },
|
194 | 208 | {
|
|
197 | 211 | "metadata": {},
|
198 | 212 | "outputs": [],
|
199 | 213 | "source": [
|
200 |
| - "fs = fsspec.filesystem(\"file\")\n", |
201 |
| - "\n", |
202 |
| - "if os.path.exists(\"combined.parq\"):\n", |
203 |
| - " import shutil\n", |
204 |
| - "\n", |
205 |
| - " shutil.rmtree(\"combined.parq\")\n", |
206 |
| - "os.makedirs(\"combined.parq\")\n", |
207 |
| - "\n", |
208 |
| - "out = LazyReferenceMapper.create(root=\"combined.parq\", fs=fs, record_size=1000)\n", |
209 |
| - "\n", |
210 |
| - "mzz = MultiZarrToZarr(\n", |
211 |
| - " single_refs,\n", |
212 |
| - " remote_protocol=\"s3\",\n", |
213 |
| - " concat_dims=[\"time\"],\n", |
214 |
| - " identical_dims=[\"y\", \"x\"],\n", |
215 |
| - " remote_options={\"anon\": True},\n", |
216 |
| - " out=out,\n", |
217 |
| - ").translate()\n", |
218 |
| - "\n", |
219 |
| - "out.flush()" |
| 214 | + "combined_vds.virtualize.to_kerchunk(\"combined.parq\", format=\"parquet\")" |
220 | 215 | ]
|
221 | 216 | },
|
222 | 217 | {
|
|
255 | 250 | "metadata": {},
|
256 | 251 | "outputs": [],
|
257 | 252 | "source": [
|
258 |
| - "fs = ReferenceFileSystem(\n", |
| 253 | + "storage_options = {\n", |
| 254 | + " \"remote_protocol\": \"s3\",\n", |
| 255 | + " \"skip_instance_cache\": True,\n", |
| 256 | + " \"remote_options\": {\"anon\": True},\n", |
| 257 | + " \"target_protocol\": \"file\",\n", |
| 258 | + " \"lazy\": True,\n", |
| 259 | + "} # options passed to fsspec\n", |
| 260 | + "open_dataset_options = {\"chunks\": {}} # opens passed to xarray\n", |
| 261 | + "\n", |
| 262 | + "ds = xr.open_dataset(\n", |
259 | 263 | " \"combined.parq\",\n",
|
260 |
| - " remote_protocol=\"s3\",\n", |
261 |
| - " target_protocol=\"file\",\n", |
262 |
| - " lazy=True,\n", |
263 |
| - " remote_options={\"anon\": True},\n", |
| 264 | + " engine=\"kerchunk\",\n", |
| 265 | + " storage_options=storage_options,\n", |
| 266 | + " open_dataset_options=open_dataset_options,\n", |
264 | 267 | ")\n",
|
265 |
| - "ds = xr.open_dataset(\n", |
266 |
| - " fs.get_mapper(), engine=\"zarr\", backend_kwargs={\"consolidated\": False}\n", |
267 |
| - ")" |
268 |
| - ] |
269 |
| - }, |
270 |
| - { |
271 |
| - "cell_type": "code", |
272 |
| - "execution_count": null, |
273 |
| - "metadata": {}, |
274 |
| - "outputs": [], |
275 |
| - "source": [ |
276 | 268 | "ds"
|
277 | 269 | ]
|
278 | 270 | }
|
279 | 271 | ],
|
280 | 272 | "metadata": {
|
281 | 273 | "kernelspec": {
|
282 |
| - "display_name": "kerchunk-cookbook-dev", |
| 274 | + "display_name": "kerchunk-cookbook", |
283 | 275 | "language": "python",
|
284 | 276 | "name": "python3"
|
285 | 277 | },
|
|
293 | 285 | "name": "python",
|
294 | 286 | "nbconvert_exporter": "python",
|
295 | 287 | "pygments_lexer": "ipython3",
|
296 |
| - "version": "3.11.9" |
| 288 | + "version": "3.12.7" |
297 | 289 | }
|
298 | 290 | },
|
299 | 291 | "nbformat": 4,
|
|
0 commit comments