Skip to content

Commit 3e7212c

Browse files
cjboyleZohebShaikhdanielballan
authored
Fix reading out arrays from database storage (#1010)
* check for array of arrays and convert to ndarray * Add missing asserts * Update xdi test * Update regular expression slightly * Add length check to ensure content but not order * Use \s in re * Add type ignore to expected warnings * Add assert for composite test * Add workaround for tests * add tests for nested arrays where outer dtype is "object" * Add explicit support for array types in Postgres and DuckDB * clean up duplicate tests and fixtures * fix testing on py3.9 due to missing numpy.unstack * Update CHANGELOG.md * remove duplicated functionality * revert test to main * Update check * Add change log * use RNG and fix tests comparing wrong expected+actual pairs * remove unused sqlite fixtures --------- Co-authored-by: Zoheb Shaikh <[email protected]> Co-authored-by: Dan Allan <[email protected]>
1 parent 4a08ca2 commit 3e7212c

File tree

4 files changed

+308
-0
lines changed

4 files changed

+308
-0
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ Write the date in place of the "Unreleased" in the case a new version is release
33

44
# Changelog
55

6+
## v0.1.0-b32 (Unreleased)
7+
8+
### Fixed
9+
10+
- Uniform array columns read from Postgres/DuckDB are now aggregated to an
11+
NDArray (e.g. scanned `waveform` PVs)
612

713
## v0.1.0-b32 (2025-08-04)
814

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
from typing import Callable, cast
2+
3+
import numpy as np
4+
import pyarrow as pa
5+
import pytest
6+
7+
from tiled._tests.adapters.test_sql import adapter_duckdb_many_partitions # noqa: F401
8+
from tiled._tests.adapters.test_sql import adapter_duckdb_one_partition # noqa: F401
9+
from tiled._tests.adapters.test_sql import adapter_psql_many_partitions # noqa: F401
10+
from tiled._tests.adapters.test_sql import adapter_psql_one_partition # noqa: F401
11+
from tiled._tests.adapters.test_sql import assert_same_rows
12+
from tiled.adapters.sql import SQLAdapter
13+
from tiled.storage import SQLStorage, parse_storage, register_storage
14+
from tiled.structures.core import StructureFamily
15+
from tiled.structures.data_source import DataSource, Management
16+
from tiled.structures.table import TableStructure
17+
18+
rng = np.random.default_rng(42)
19+
20+
names = ["i0", "i1", "i2", "i3", "f4", "f5"]
21+
batch_size = 5
22+
data0 = [
23+
pa.array(
24+
[rng.integers(-100, 100, size=10, dtype=np.int8) for _ in range(batch_size)]
25+
),
26+
pa.array(
27+
[rng.integers(-100, 100, size=11, dtype=np.int16) for _ in range(batch_size)]
28+
),
29+
pa.array(
30+
[rng.integers(-100, 100, size=12, dtype=np.int32) for _ in range(batch_size)]
31+
),
32+
pa.array(
33+
[rng.integers(-100, 100, size=13, dtype=np.int64) for _ in range(batch_size)]
34+
),
35+
pa.array([rng.random(size=14, dtype=np.float32) for _ in range(batch_size)]),
36+
pa.array([rng.random(size=15, dtype=np.float64) for _ in range(batch_size)]),
37+
]
38+
batch_size = 8
39+
data1 = [
40+
pa.array(
41+
[rng.integers(-100, 100, size=10, dtype=np.int8) for _ in range(batch_size)]
42+
),
43+
pa.array(
44+
[rng.integers(-100, 100, size=11, dtype=np.int16) for _ in range(batch_size)]
45+
),
46+
pa.array(
47+
[rng.integers(-100, 100, size=12, dtype=np.int32) for _ in range(batch_size)]
48+
),
49+
pa.array(
50+
[rng.integers(-100, 100, size=13, dtype=np.int64) for _ in range(batch_size)]
51+
),
52+
pa.array([rng.random(size=14, dtype=np.float32) for _ in range(batch_size)]),
53+
pa.array([rng.random(size=15, dtype=np.float64) for _ in range(batch_size)]),
54+
]
55+
batch_size = 3
56+
data2 = [
57+
pa.array(
58+
[rng.integers(-100, 100, size=10, dtype=np.int8) for _ in range(batch_size)]
59+
),
60+
pa.array(
61+
[rng.integers(-100, 100, size=11, dtype=np.int16) for _ in range(batch_size)]
62+
),
63+
pa.array(
64+
[rng.integers(-100, 100, size=12, dtype=np.int32) for _ in range(batch_size)]
65+
),
66+
pa.array(
67+
[rng.integers(-100, 100, size=13, dtype=np.int64) for _ in range(batch_size)]
68+
),
69+
pa.array([rng.random(size=14, dtype=np.float32) for _ in range(batch_size)]),
70+
pa.array([rng.random(size=15, dtype=np.float64) for _ in range(batch_size)]),
71+
]
72+
73+
batch0 = pa.record_batch(data0, names=names)
74+
batch1 = pa.record_batch(data1, names=names)
75+
batch2 = pa.record_batch(data2, names=names)
76+
77+
78+
@pytest.fixture
79+
def data_source_from_init_storage() -> Callable[[str, int], DataSource[TableStructure]]:
80+
def _data_source_from_init_storage(
81+
data_uri: str, num_partitions: int
82+
) -> DataSource[TableStructure]:
83+
table = pa.Table.from_arrays(data0, names)
84+
structure = TableStructure.from_arrow_table(table, npartitions=num_partitions)
85+
data_source = DataSource(
86+
management=Management.writable,
87+
mimetype="application/x-tiled-sql-table",
88+
structure_family=StructureFamily.table,
89+
structure=structure,
90+
assets=[],
91+
)
92+
93+
storage = cast(SQLStorage, parse_storage(data_uri))
94+
register_storage(storage)
95+
return SQLAdapter.init_storage(data_source=data_source, storage=storage)
96+
97+
return _data_source_from_init_storage
98+
99+
100+
@pytest.mark.parametrize(
101+
"adapter_name", [("adapter_duckdb_one_partition"), ("adapter_psql_one_partition")]
102+
)
103+
def test_write_read_one_batch_one_part(
104+
adapter_name: str, request: pytest.FixtureRequest
105+
) -> None:
106+
# get adapter from fixture
107+
adapter: SQLAdapter = request.getfixturevalue(adapter_name)
108+
109+
# test appending and reading a table as a whole
110+
test_table = pa.Table.from_arrays(data0, names)
111+
112+
adapter.append_partition(batch0, 0)
113+
result_read = adapter.read()
114+
assert test_table == pa.Table.from_pandas(result_read)
115+
116+
# test appending and reading a partition in a table
117+
result_read_partition = adapter.read_partition(0)
118+
assert test_table == pa.Table.from_pandas(result_read_partition)
119+
120+
121+
@pytest.mark.parametrize(
122+
"adapter_name", [("adapter_duckdb_one_partition"), ("adapter_psql_one_partition")]
123+
)
124+
def test_write_read_list_batch_one_part(
125+
adapter_name: str, request: pytest.FixtureRequest
126+
) -> None:
127+
# get adapter from fixture
128+
adapter: SQLAdapter = request.getfixturevalue(adapter_name)
129+
130+
test_table = pa.Table.from_batches([batch0, batch1, batch2])
131+
# test appending a list of batches to a table and read as a whole
132+
adapter.append_partition([batch0, batch1, batch2], 0)
133+
result_read = adapter.read()
134+
135+
assert test_table == pa.Table.from_pandas(result_read)
136+
137+
# test appending and reading a partition in a table
138+
result_read_partition = adapter.read_partition(0)
139+
140+
assert test_table == pa.Table.from_pandas(result_read_partition)
141+
142+
# test appending few more times done correctly
143+
test_table = pa.Table.from_batches(
144+
[batch0, batch1, batch2, batch2, batch0, batch1, batch1, batch2, batch0]
145+
)
146+
adapter.append_partition([batch2, batch0, batch1], 0)
147+
adapter.append_partition([batch1, batch2, batch0], 0)
148+
result_read = adapter.read()
149+
150+
assert test_table == pa.Table.from_pandas(result_read)
151+
152+
# test appending a few times and reading done correctly
153+
result_read_partition = adapter.read_partition(0)
154+
155+
assert test_table == pa.Table.from_pandas(result_read_partition)
156+
157+
158+
@pytest.mark.parametrize(
159+
"adapter_name",
160+
[("adapter_duckdb_many_partitions"), ("adapter_psql_many_partitions")],
161+
)
162+
def test_append_single_partition(
163+
adapter_name: str, request: pytest.FixtureRequest
164+
) -> None:
165+
# get adapter from fixture
166+
adapter: SQLAdapter = request.getfixturevalue(adapter_name)
167+
168+
# test writing an entire pyarrow table to a single partition
169+
table = pa.Table.from_batches([batch0, batch1, batch2])
170+
adapter.append_partition(table, 0)
171+
172+
result_read = adapter.read()
173+
assert table == pa.Table.from_pandas(result_read)
174+
175+
# test reading a specific partition
176+
result_read_partition = adapter.read_partition(0)
177+
assert table == pa.Table.from_pandas(result_read_partition)
178+
179+
180+
@pytest.mark.parametrize("adapter_name", [("adapter_psql_many_partitions")])
181+
@pytest.mark.parametrize("field", names)
182+
def test_write_read_one_batch_many_part(
183+
adapter_name: str, request: pytest.FixtureRequest, field: str
184+
) -> None:
185+
# get adapter from fixture
186+
adapter: SQLAdapter = request.getfixturevalue(adapter_name)
187+
188+
# test writing to many partitions and reading it whole
189+
adapter.append_partition(batch0, 0)
190+
adapter.append_partition(batch1, 1)
191+
adapter.append_partition(batch2, 2)
192+
193+
result_read = adapter.read()
194+
195+
assert pa.Table.from_batches([batch0, batch1, batch2]) == pa.Table.from_pandas(
196+
result_read
197+
)
198+
199+
# test reading a specific partition
200+
result_read_partition = adapter.read_partition(0)
201+
assert pa.Table.from_arrays(data0, names) == pa.Table.from_pandas(
202+
result_read_partition
203+
)
204+
205+
result_read_partition = adapter.read_partition(1)
206+
assert pa.Table.from_arrays(data1, names) == pa.Table.from_pandas(
207+
result_read_partition
208+
)
209+
210+
result_read_partition = adapter.read_partition(2)
211+
assert pa.Table.from_arrays(data2, names) == pa.Table.from_pandas(
212+
result_read_partition
213+
)
214+
215+
# test appending a few times and reading done correctly
216+
adapter.append_partition(batch0, 1)
217+
adapter.append_partition(batch1, 2)
218+
adapter.append_partition(batch2, 0)
219+
220+
result_read = adapter.read()
221+
222+
# Check that each partition matches
223+
assert_same_rows(
224+
pa.Table.from_batches([batch0, batch2]),
225+
pa.Table.from_pandas(adapter.read_partition(0)),
226+
)
227+
assert_same_rows(
228+
pa.Table.from_batches([batch1, batch0]),
229+
pa.Table.from_pandas(adapter.read_partition(1)),
230+
)
231+
assert_same_rows(
232+
pa.Table.from_batches([batch2, batch1]),
233+
pa.Table.from_pandas(adapter.read_partition(2)),
234+
)
235+
assert_same_rows(
236+
pa.Table.from_batches([batch0, batch2, batch1, batch0, batch2, batch1]),
237+
pa.Table.from_pandas(result_read),
238+
)
239+
240+
# read a specific field
241+
result_read = adapter.read_partition(0, fields=[field])
242+
field_index = names.index(field)
243+
assert np.array_equal(
244+
[*data0[field_index].tolist(), *data2[field_index].tolist()],
245+
result_read[field].tolist(),
246+
)
247+
result_read = adapter.read_partition(1, fields=[field])
248+
assert np.array_equal(
249+
[*data1[field_index].tolist(), *data0[field_index].tolist()],
250+
result_read[field].tolist(),
251+
)
252+
result_read = adapter.read_partition(2, fields=[field])
253+
assert np.array_equal(
254+
[*data2[field_index].tolist(), *data1[field_index].tolist()],
255+
result_read[field].tolist(),
256+
)

tiled/_tests/test_array.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,20 @@
6565
}
6666
)
6767

68+
nd_array = numpy.arange(9).reshape((3, 3))
69+
uniform_array = numpy.empty((3,), dtype=object)
70+
for i in range(uniform_array.shape[0]):
71+
uniform_array[i] = nd_array[i]
72+
ragged_array = numpy.array([numpy.arange(3), numpy.arange(4, 10)], dtype=object)
73+
object_array = numpy.full((10,), {"a": 1}, dtype=object)
74+
nested_arrays_tree = MapAdapter(
75+
{
76+
"uniform": ArrayAdapter.from_array(uniform_array),
77+
"ragged": ArrayAdapter.from_array(ragged_array),
78+
"objects": ArrayAdapter.from_array(object_array),
79+
}
80+
)
81+
6882

6983
@pytest.fixture(scope="module")
7084
def context():
@@ -75,6 +89,7 @@ def context():
7589
"inf": inf_tree,
7690
"scalar": scalar_tree,
7791
"zero": zero_tree,
92+
"nested_arrays": nested_arrays_tree,
7893
}
7994
)
8095
app = build_app(tree)
@@ -166,6 +181,25 @@ def test_array_interface(context):
166181
v.dims
167182

168183

184+
def test_uniform_nested_array_projected_to_ndarray(context):
185+
client = from_context(context)["nested_arrays"]["uniform"]
186+
assert client.dtype == numpy.int_
187+
assert client.read().dtype == numpy.int_
188+
assert numpy.array_equal(client.read(), nd_array)
189+
190+
191+
@pytest.mark.parametrize("kind", ["ragged", "objects"])
192+
def test_unparsable_nested_array_stringified(kind, context):
193+
# This behavior is due to the fact that ragged Numpy arrays, and those with
194+
# non-numeric types (except for strings) will likely have dtype=object,
195+
# which may not be parsable or reducible. As such we fallback to taking the
196+
# string representations of the array elements.
197+
client = from_context(context)["nested_arrays"][kind]
198+
assert "<U" in client.dtype.str
199+
assert "<U" in client.read().dtype.str
200+
assert isinstance(client[0], str)
201+
202+
169203
@pytest.mark.parametrize("kind", list(array_cases))
170204
def test_as_buffer(kind):
171205
output = as_buffer(array_cases[kind], {})

tiled/adapters/array.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import contextlib
12
from typing import Any, List, Optional, Set, Tuple
23

34
import dask.array
@@ -82,6 +83,17 @@ def from_array(
8283
if not hasattr(array, "__array__"):
8384
array = numpy.asanyarray(array)
8485

86+
# Convert array of arrays to ND array to expose the underlying dtype
87+
is_array_of_arrays = (
88+
array.dtype == "object"
89+
and array.shape[0]
90+
and isinstance(array[0], numpy.ndarray)
91+
)
92+
if is_array_of_arrays:
93+
with contextlib.suppress(ValueError):
94+
# only uniform arrays (with same dimensions) are stackable
95+
array = numpy.vstack(array)
96+
8597
# Convert (experimental) pandas.StringDtype to numpy's unicode string dtype
8698
is_likely_string_dtype = isinstance(array.dtype, pandas.StringDtype) or (
8799
array.dtype == "object" and array.dtype.fields is None

0 commit comments

Comments
 (0)