-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Closed
Description
Hi there!
It seems there is a regression in version 22.0.0 related to pyarrow.flight.GeneratorStream and this change #47115 (based on the error message).
When using the GeneratorStream backed with anything other than pyarrow.RecordBatch iterable (like pyarrow.Table iterable or pyarrow.RecordBatchReader iterable) , it now fails with
Unknown error: Writer should be initialized before reading Next batches
Here is a minimal reproducer:
from collections.abc import Generator
import pyarrow
import pyarrow.flight
_TABLE = pyarrow.Table.from_pydict({
"city": ["Prague", "Paris", "London"],
"population": [1_397_880, 2_048_472, 9_089_736]
})
def batch_iterator(table: pyarrow.Table) -> Generator[pyarrow.RecordBatch, None, None]:
for batch in table.to_batches():
yield batch
_TABLE_TICKET= pyarrow.flight.Ticket(b"table")
_BATCH_READER_TICKET= pyarrow.flight.Ticket(b"rbr")
_BATCH_TICKET= pyarrow.flight.Ticket(b"batch")
class FlightServer(pyarrow.flight.FlightServerBase):
def do_get(self, context, ticket):
if ticket == _TABLE_TICKET:
return pyarrow.flight.GeneratorStream(_TABLE.schema, [_TABLE])
elif ticket == _BATCH_READER_TICKET:
return pyarrow.flight.GeneratorStream(_TABLE.schema, [_TABLE.to_reader()])
else:
return pyarrow.flight.GeneratorStream(_TABLE.schema, batch_iterator(_TABLE))
def main():
server = FlightServer()
print("Serving on port", server.port)
client = pyarrow.flight.FlightClient(f"grpc+tcp://localhost:{server.port}")
# Does not work in pyarrow 22, works in 21
# f = client.do_get(_TABLE_TICKET)
# print(f.read_all())
# Does not work in pyarrow 22, works in 21
# f = client.do_get(_BATCH_READER_TICKET)
# print(f.read_all())
# Works in pyarrow 22 and 21
f = client.do_get(_BATCH_TICKET)
print(f.read_all())
if __name__ == '__main__':
main()Only the iterator that goes batch-by-batch still works in 22.0.0, in older versions all three worked.
I am running on macOS 26.1 on M1 but I presume the bug is platform-agnostic.
Component(s)
FlightRPC, Python