From db05787c2843bdd6f20c2a771e5f0d6d54d70f4b Mon Sep 17 00:00:00 2001 From: Jeff Newman Date: Wed, 1 May 2024 15:16:00 -0500 Subject: [PATCH] dask_scheduler --- sharrow/shared_memory.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sharrow/shared_memory.py b/sharrow/shared_memory.py index c5710e9..0098c06 100644 --- a/sharrow/shared_memory.py +++ b/sharrow/shared_memory.py @@ -352,6 +352,7 @@ def emit(k, a, is_coord): buffer = mem.buf tasks = [] + task_names = [] for w in wrappers: _is_sparse = w.get("sparse", False) _size = w["nbytes"] @@ -391,12 +392,18 @@ def emit(k, a, is_coord): ) if isinstance(a, xr.DataArray) and isinstance(a.data, da.Array): tasks.append(da.store(a.data, mem_arr, lock=False, compute=False)) + task_names.append(_name) else: mem_arr[:] = a[:] if tasks: t = time.time() logger.info(f"running {len(tasks)} dask data load tasks") - dask.compute(tasks, scheduler=dask_scheduler) + if dask_scheduler == "synchronous": + for task, task_name in zip(tasks, task_names): + logger.info(f"running load task: {task_name}") + dask.compute(task, scheduler=dask_scheduler) + else: + dask.compute(tasks, scheduler=dask_scheduler) logger.info(f"completed dask data load in {time.time()-t:.3f} seconds") if key.startswith("memmap:"):