Skip to content

Commit 46db807

Browse files
Algorithm for multi-stage rechunking (#89)
* Add calculate_stage_chunks * Multi-stage rechunking algorithm * sort imports * Stop using more stages when IO increases * Hypothesis test for IO ops count * more test coverage * Fixes after merging in master * restore accidentally removed tests * remove rechunker.compat.prod (unnecessary for Python 3.8+) * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixes per review * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 96fcf41 commit 46db807

File tree

5 files changed

+443
-90
lines changed

5 files changed

+443
-90
lines changed

rechunker/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@
44
except ImportError:
55
__version__ = "unknown"
66

7+
from .algorithm import multistage_rechunking_plan, rechunking_plan
78
from .api import Rechunked, rechunk

rechunker/algorithm.py

+222-43
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
"""Core rechunking algorithm stuff."""
22
import logging
3+
import warnings
4+
from math import ceil, floor, prod
35
from typing import List, Optional, Sequence, Tuple
46

5-
from rechunker.compat import prod
7+
import numpy as np
8+
9+
from rechunker.compat import lcm
610

711
logger = logging.getLogger(__name__)
812

@@ -91,38 +95,124 @@ def consolidate_chunks(
9195
return tuple(new_chunks)
9296

9397

94-
def rechunking_plan(
98+
def _calculate_shared_chunks(
99+
read_chunks: Sequence[int], write_chunks: Sequence[int]
100+
) -> Tuple[int, ...]:
101+
# Intermediate chunks are the smallest possible chunks which fit
102+
# into both read_chunks and write_chunks.
103+
# Example:
104+
# read_chunks: (20, 5)
105+
# target_chunks: (4, 25)
106+
# intermediate_chunks: (4, 5)
107+
# We don't need to check their memory usage: they are guaranteed to be smaller
108+
# than both read and write chunks.
109+
return tuple(
110+
min(c_read, c_target) for c_read, c_target in zip(read_chunks, write_chunks)
111+
)
112+
113+
114+
def calculate_stage_chunks(
115+
read_chunks: Tuple[int, ...],
116+
write_chunks: Tuple[int, ...],
117+
stage_count: int = 1,
118+
) -> List[Tuple[int, ...]]:
119+
"""
120+
Calculate chunks after each stage of a multi-stage rechunking.
121+
122+
Each stage consists of "split" step followed by a "consolidate" step.
123+
124+
The strategy used here is to progressively enlarge or shrink chunks along
125+
each dimension by the same multiple in each stage (geometric spacing). This
126+
should roughly minimize the total number of arrays resulting from "split"
127+
steps in a multi-stage pipeline. It also keeps the total number of elements
128+
in each chunk constant, up to rounding error, so memory usage should remain
129+
constant.
130+
131+
Examples::
132+
133+
>>> calculate_stage_chunks((1_000_000, 1), (1, 1_000_000), stage_count=2)
134+
[(1000, 1000)]
135+
>>> calculate_stage_chunks((1_000_000, 1), (1, 1_000_000), stage_count=3)
136+
[(10000, 100), (100, 10000)]
137+
>>> calculate_stage_chunks((1_000_000, 1), (1, 1_000_000), stage_count=4)
138+
[(31623, 32), (1000, 1000), (32, 31623)]
139+
140+
TODO: consider more sophisticated algorithms. In particular, exact geometric
141+
spacing often requires irregular intermediate chunk sizes, which (currently)
142+
cannot be stored in Zarr arrays.
143+
"""
144+
approx_stages = np.geomspace(read_chunks, write_chunks, num=stage_count + 1)
145+
return [tuple(floor(c) for c in stage) for stage in approx_stages[1:-1]]
146+
147+
148+
def _count_intermediate_chunks(source_chunk: int, target_chunk: int, size: int) -> int:
149+
"""Count intermediate chunks required for rechunking along a dimension.
150+
151+
Intermediate chunks must divide both the source and target chunks, and in
152+
general do not need to have a regular size. The number of intermediate
153+
chunks is proportional to the number of required read/write operations.
154+
155+
For example, suppose we want to rechunk an array of size 20 from size 5
156+
chunks to size 7 chunks. We can draw out how the array elements are divided:
157+
0 1 2 3 4|5 6 7 8 9|10 11 12 13 14|15 16 17 18 19 (4 chunks)
158+
0 1 2 3 4 5 6|7 8 9 10 11 12 13|14 15 16 17 18 19 (3 chunks)
159+
160+
To transfer these chunks, we would need to divide the array into irregular
161+
intermediate chunks that fit into both the source and target:
162+
0 1 2 3 4|5 6|7 8 9|10 11 12 13|14|15 16 17 18 19 (6 chunks)
163+
164+
This matches what ``_count_intermediate_chunks()`` calculates::
165+
166+
>>> _count_intermediate_chunks(5, 7, 20)
167+
6
168+
"""
169+
multiple = lcm(source_chunk, target_chunk)
170+
splits_per_lcm = multiple // source_chunk + multiple // target_chunk - 1
171+
lcm_count, remainder = divmod(size, multiple)
172+
if remainder:
173+
splits_in_remainder = (
174+
ceil(remainder / source_chunk) + ceil(remainder / target_chunk) - 1
175+
)
176+
else:
177+
splits_in_remainder = 0
178+
return lcm_count * splits_per_lcm + splits_in_remainder
179+
180+
181+
def calculate_single_stage_io_ops(
182+
shape: Sequence[int], in_chunks: Sequence[int], out_chunks: Sequence[int]
183+
) -> int:
184+
"""Count the number of read/write operations required for rechunking."""
185+
return prod(map(_count_intermediate_chunks, in_chunks, out_chunks, shape))
186+
187+
188+
# not a tight upper bound, but ensures that the loop in
189+
# multistage_rechunking_plan always terminates.
190+
MAX_STAGES = 100
191+
192+
193+
_MultistagePlan = List[Tuple[Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]]
194+
195+
196+
class ExcessiveIOWarning(Warning):
197+
pass
198+
199+
200+
def multistage_rechunking_plan(
95201
shape: Sequence[int],
96202
source_chunks: Sequence[int],
97203
target_chunks: Sequence[int],
98204
itemsize: int,
205+
min_mem: int,
99206
max_mem: int,
100207
consolidate_reads: bool = True,
101208
consolidate_writes: bool = True,
102-
) -> Tuple[Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]:
103-
"""
104-
Parameters
105-
----------
106-
shape : Tuple
107-
Array shape
108-
source_chunks : Tuple
109-
Original chunk shape (must be in form (5, 10, 20), no irregular chunks)
110-
target_chunks : Tuple
111-
Target chunk shape (must be in form (5, 10, 20), no irregular chunks)
112-
itemsize: int
113-
Number of bytes used to represent a single array element
114-
max_mem : Int
115-
Maximum permissible chunk memory size, measured in units of itemsize
116-
consolidate_reads: bool, optional
117-
Whether to apply read chunk consolidation
118-
consolidate_writes: bool, optional
119-
Whether to apply write chunk consolidation
209+
) -> _MultistagePlan:
210+
"""Caculate a rechunking plan that can use multiple split/consolidate steps.
120211
121-
Returns
122-
-------
123-
new_chunks : tuple
124-
The new chunks, size guaranteed to be <= mam_mem
212+
For best results, max_mem should be significantly larger than min_mem (e.g.,
213+
10x). Otherwise an excessive number of rechunking steps will be required.
125214
"""
215+
126216
ndim = len(shape)
127217
if len(source_chunks) != ndim:
128218
raise ValueError(f"source_chunks {source_chunks} must have length {ndim}")
@@ -141,6 +231,11 @@ def rechunking_plan(
141231
f"Target chunk memory ({target_chunk_mem}) exceeds max_mem ({max_mem})"
142232
)
143233

234+
if max_mem < min_mem: # basic sanity check
235+
raise ValueError(
236+
f"max_mem ({max_mem}) cannot be smaller than min_mem ({min_mem})"
237+
)
238+
144239
if consolidate_writes:
145240
logger.debug(
146241
f"consolidate_write_chunks({shape}, {target_chunks}, {itemsize}, {max_mem})"
@@ -150,17 +245,16 @@ def rechunking_plan(
150245
write_chunks = tuple(target_chunks)
151246

152247
if consolidate_reads:
153-
read_chunk_limits: List[Optional[int]]
154-
read_chunk_limits = [] #
155-
for n_ax, (sc, wc) in enumerate(zip(source_chunks, write_chunks)):
156-
read_chunk_lim: Optional[int]
248+
read_chunk_limits: List[Optional[int]] = []
249+
for sc, wc in zip(source_chunks, write_chunks):
250+
limit: Optional[int]
157251
if wc > sc:
158252
# consolidate reads over this axis, up to the write chunk size
159-
read_chunk_lim = wc
253+
limit = wc
160254
else:
161255
# don't consolidate reads over this axis
162-
read_chunk_lim = None
163-
read_chunk_limits.append(read_chunk_lim)
256+
limit = None
257+
read_chunk_limits.append(limit)
164258

165259
logger.debug(
166260
f"consolidate_read_chunks({shape}, {source_chunks}, {itemsize}, {max_mem}, {read_chunk_limits})"
@@ -171,16 +265,101 @@ def rechunking_plan(
171265
else:
172266
read_chunks = tuple(source_chunks)
173267

174-
# Intermediate chunks are the smallest possible chunks which fit
175-
# into both read_chunks and write_chunks.
176-
# Example:
177-
# read_chunks: (20, 5)
178-
# target_chunks: (4, 25)
179-
# intermediate_chunks: (4, 5)
180-
# We don't need to check their memory usage: they are guaranteed to be smaller
181-
# than both read and write chunks.
182-
intermediate_chunks = [
183-
min(c_read, c_target) for c_read, c_target in zip(read_chunks, write_chunks)
184-
]
268+
prev_io_ops: Optional[float] = None
269+
prev_plan: Optional[_MultistagePlan] = None
270+
271+
# increase the number of stages until min_mem is exceeded
272+
for stage_count in range(1, MAX_STAGES):
273+
274+
stage_chunks = calculate_stage_chunks(read_chunks, write_chunks, stage_count)
275+
pre_chunks = [read_chunks] + stage_chunks
276+
post_chunks = stage_chunks + [write_chunks]
277+
278+
int_chunks = [
279+
_calculate_shared_chunks(pre, post)
280+
for pre, post in zip(pre_chunks, post_chunks)
281+
]
282+
plan = list(zip(pre_chunks, int_chunks, post_chunks))
283+
284+
int_mem = min(itemsize * prod(chunks) for chunks in int_chunks)
285+
if int_mem >= min_mem:
286+
return plan # success!
287+
288+
io_ops = sum(
289+
calculate_single_stage_io_ops(shape, pre, post)
290+
for pre, post in zip(pre_chunks, post_chunks)
291+
)
292+
if prev_io_ops is not None and io_ops > prev_io_ops:
293+
warnings.warn(
294+
"Search for multi-stage rechunking plan terminated before "
295+
"achieving the minimum memory requirement due to increasing IO "
296+
f"requirements. Smallest intermediates have size {int_mem}. "
297+
f"Consider decreasing min_mem ({min_mem}) or increasing "
298+
f"({max_mem}) to find a more efficient plan.",
299+
category=ExcessiveIOWarning,
300+
)
301+
assert prev_plan is not None
302+
return prev_plan
185303

186-
return read_chunks, tuple(intermediate_chunks), write_chunks
304+
prev_io_ops = io_ops
305+
prev_plan = plan
306+
307+
raise AssertionError(
308+
"Failed to find a feasible multi-staging rechunking scheme satisfying "
309+
f"min_mem ({min_mem}) and max_mem ({max_mem}) constraints. "
310+
"Please file a bug report on GitHub: "
311+
"https://github.com/pangeo-data/rechunker/issues\n\n"
312+
"Include the following debugging info:\n"
313+
f"shape={shape}, source_chunks={source_chunks}, "
314+
f"target_chunks={target_chunks}, itemsize={itemsize}, "
315+
f"min_mem={min_mem}, max_mem={max_mem}, "
316+
f"consolidate_reads={consolidate_reads}, "
317+
f"consolidate_writes={consolidate_writes}"
318+
)
319+
320+
321+
def rechunking_plan(
322+
shape: Sequence[int],
323+
source_chunks: Sequence[int],
324+
target_chunks: Sequence[int],
325+
itemsize: int,
326+
max_mem: int,
327+
consolidate_reads: bool = True,
328+
consolidate_writes: bool = True,
329+
) -> Tuple[Tuple[int, ...], Tuple[int, ...], Tuple[int, ...]]:
330+
"""
331+
Calculate a plan for rechunking arrays.
332+
333+
Parameters
334+
----------
335+
shape : Tuple
336+
Array shape
337+
source_chunks : Tuple
338+
Original chunk shape (must be in form (5, 10, 20), no irregular chunks)
339+
target_chunks : Tuple
340+
Target chunk shape (must be in form (5, 10, 20), no irregular chunks)
341+
itemsize: int
342+
Number of bytes used to represent a single array element
343+
max_mem : int
344+
Maximum permissible chunk memory size, measured in units of itemsize
345+
consolidate_reads: bool, optional
346+
Whether to apply read chunk consolidation
347+
consolidate_writes: bool, optional
348+
Whether to apply write chunk consolidation
349+
350+
Returns
351+
-------
352+
new_chunks : tuple
353+
The new chunks, size guaranteed to be <= mam_mem
354+
"""
355+
(stage,) = multistage_rechunking_plan(
356+
shape,
357+
source_chunks,
358+
target_chunks,
359+
itemsize=itemsize,
360+
min_mem=itemsize,
361+
max_mem=max_mem,
362+
consolidate_writes=consolidate_writes,
363+
consolidate_reads=consolidate_reads,
364+
)
365+
return stage

rechunker/compat.py

+8-11
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
1-
import operator
2-
from functools import reduce
3-
from typing import Sequence
1+
import math
42

3+
try:
4+
from math import lcm # type: ignore # Python 3.9
5+
except ImportError:
56

6-
def prod(iterable: Sequence[int]) -> int:
7-
"""Implementation of `math.prod()` all Python versions."""
8-
try:
9-
from math import prod as mathprod # type: ignore # Python 3.8
10-
11-
return mathprod(iterable)
12-
except ImportError:
13-
return reduce(operator.mul, iterable, 1)
7+
def lcm(a: int, b: int) -> int: # type: ignore
8+
"""Implementation of `math.lcm()` for all Python versions."""
9+
# https://stackoverflow.com/a/51716959/809705
10+
return a * b // math.gcd(a, b)

0 commit comments

Comments
 (0)