Skip to content

Commit

Permalink
dask_scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
jpn-- committed May 1, 2024
1 parent 600efad commit db05787
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion sharrow/shared_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ def emit(k, a, is_coord):
buffer = mem.buf

tasks = []
task_names = []
for w in wrappers:
_is_sparse = w.get("sparse", False)
_size = w["nbytes"]
Expand Down Expand Up @@ -391,12 +392,18 @@ def emit(k, a, is_coord):
)
if isinstance(a, xr.DataArray) and isinstance(a.data, da.Array):
tasks.append(da.store(a.data, mem_arr, lock=False, compute=False))
task_names.append(_name)
else:
mem_arr[:] = a[:]
if tasks:
t = time.time()
logger.info(f"running {len(tasks)} dask data load tasks")
dask.compute(tasks, scheduler=dask_scheduler)
if dask_scheduler == "synchronous":
for task, task_name in zip(tasks, task_names):
logger.info(f"running load task: {task_name}")
dask.compute(task, scheduler=dask_scheduler)
else:
dask.compute(tasks, scheduler=dask_scheduler)
logger.info(f"completed dask data load in {time.time()-t:.3f} seconds")

if key.startswith("memmap:"):
Expand Down

0 comments on commit db05787

Please sign in to comment.