Skip to content

Commit de19af9

Browse files
authored
apacheGH-43359: [Go][Parquet] ReadRowGroups panics with canceled context (apache#43360)
### Rationale for this change `ReadRowGroups` needs to support externally canceled contexts, e.g. for request-scoped contexts in servers like gRPC. ### What changes are included in this PR? Additionnaly, `releaseColumns` needs to ignore columns with uninitialized data as it used in a `defer` statement. ### Are these changes tested? Yes: a new test `TestArrowReaderCanceledContext` is included. ### Are there any user-facing changes? None * GitHub Issue: apache#43359 Authored-by: sebdotv <[email protected]> Signed-off-by: Joel Lubinitsky <[email protected]>
1 parent a88f0cd commit de19af9

File tree

3 files changed

+31
-1
lines changed

3 files changed

+31
-1
lines changed

go/parquet/pqarrow/file_reader.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package pqarrow
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"io"
2324
"sync"
@@ -375,6 +376,10 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []in
375376
data.data.Release()
376377
}
377378

379+
// if the context is in error, but we haven't set an error yet, then it means that the parent context
380+
// was cancelled. In this case, we should exit early as some columns may not have been read yet.
381+
err = errors.Join(err, ctx.Err())
382+
378383
if err != nil {
379384
// if we encountered an error, consume any waiting data on the channel
380385
// so the goroutines don't leak and so memory can get cleaned up. we already

go/parquet/pqarrow/file_reader_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,29 @@ func TestArrowReaderAdHocReadFloat16s(t *testing.T) {
167167
}
168168
}
169169

170+
func TestArrowReaderCanceledContext(t *testing.T) {
171+
dataDir := getDataDir()
172+
173+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
174+
defer mem.AssertSize(t, 0)
175+
176+
filename := filepath.Join(dataDir, "int32_decimal.parquet")
177+
require.FileExists(t, filename)
178+
179+
rdr, err := file.OpenParquetFile(filename, false, file.WithReadProps(parquet.NewReaderProperties(mem)))
180+
require.NoError(t, err)
181+
defer rdr.Close()
182+
arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem)
183+
require.NoError(t, err)
184+
185+
// create a canceled context
186+
ctx, cancel := context.WithCancel(context.Background())
187+
cancel()
188+
189+
_, err = arrowRdr.ReadTable(ctx)
190+
require.ErrorIs(t, err, context.Canceled)
191+
}
192+
170193
func TestRecordReaderParallel(t *testing.T) {
171194
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
172195
defer mem.AssertSize(t, 0)

go/parquet/pqarrow/helpers.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ func releaseArrayData(data []arrow.ArrayData) {
3838

3939
func releaseColumns(columns []arrow.Column) {
4040
for _, col := range columns {
41-
col.Release()
41+
if col.Data() != nil { // data can be nil due to the way columns are constructed in ReadRowGroups
42+
col.Release()
43+
}
4244
}
4345
}

0 commit comments

Comments
 (0)