Skip to content

Commit b419b81

Browse files
authored
feat(parquet/pqarrow): Add SeekToRow for RecordReader (#321)
### Rationale for this change As suggested by #278 (comment) allowing the RecordReader from the `pqarrow` package to also leverage the `SeekToRow` functionality to skip records from a parquet file while respected skipping particular row groups. ### What changes are included in this PR? Implementing a `SeekToRow` method for `pqarrow.RecordReader` to seek the record reader to a specific row where the next read will start from. ### Are these changes tested? Yes, unit tests are added for this. ### Are there any user-facing changes? Just the new functions.
1 parent 8c64432 commit b419b81

File tree

5 files changed

+290
-12
lines changed

5 files changed

+290
-12
lines changed

parquet/file/column_reader.go

+1
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ func (c *columnChunkReader) pager() PageReader { return c.rdr }
223223
func (c *columnChunkReader) setPageReader(rdr PageReader) {
224224
c.rdr, c.err = rdr, nil
225225
c.decoders = make(map[format.Encoding]encoding.TypedDecoder)
226+
c.newDictionary = false
226227
c.numBuffered, c.numDecoded = 0, 0
227228
}
228229

parquet/file/record_reader.go

+22-2
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type RecordReader interface {
6363
// ReleaseValues transfers the buffer of data with the values to the caller,
6464
// a new buffer will be allocated on subsequent calls.
6565
ReleaseValues() *memory.Buffer
66+
ResetValues()
6667
// NullCount returns the number of nulls decoded
6768
NullCount() int64
6869
// Type returns the parquet physical type of the column
@@ -78,6 +79,10 @@ type RecordReader interface {
7879
// Release decrements the ref count by one, releasing the internal buffers when
7980
// the ref count is 0.
8081
Release()
82+
// SeekToRow will shift the record reader so that subsequent reads will
83+
// start at the desired row. It will utilize Offset Indexes if they exist
84+
// to skip pages and seek.
85+
SeekToRow(int64) error
8186
}
8287

8388
// BinaryRecordReader provides an extra GetBuilderChunks function above and beyond
@@ -440,12 +445,27 @@ func (rr *recordReader) reserveValues(extra int64) error {
440445
return rr.recordReaderImpl.ReserveValues(extra, rr.leafInfo.HasNullableValues())
441446
}
442447

443-
func (rr *recordReader) resetValues() {
448+
func (rr *recordReader) ResetValues() {
444449
rr.recordReaderImpl.ResetValues()
445450
}
446451

452+
func (rr *recordReader) SeekToRow(recordIdx int64) error {
453+
if err := rr.recordReaderImpl.SeekToRow(recordIdx); err != nil {
454+
return err
455+
}
456+
457+
rr.atRecStart = true
458+
rr.recordsRead = 0
459+
// force re-reading the definition/repetition levels
460+
// calling SeekToRow on the underlying column reader will ensure that
461+
// the next reads will pull from the correct row
462+
rr.levelsPos, rr.levelsWritten = 0, 0
463+
464+
return nil
465+
}
466+
447467
func (rr *recordReader) Reset() {
448-
rr.resetValues()
468+
rr.ResetValues()
449469

450470
if rr.levelsWritten > 0 {
451471
remain := int(rr.levelsWritten - rr.levelsPos)

parquet/pqarrow/column_readers.go

+30-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (lr *leafReader) IsOrHasRepeatedChild() bool { return false }
9292

9393
func (lr *leafReader) LoadBatch(nrecords int64) (err error) {
9494
lr.releaseOut()
95-
lr.recordRdr.Reset()
95+
lr.recordRdr.ResetValues()
9696

9797
if err := lr.recordRdr.Reserve(nrecords); err != nil {
9898
return err
@@ -135,6 +135,16 @@ func (lr *leafReader) clearOut() (out *arrow.Chunked) {
135135

136136
func (lr *leafReader) Field() *arrow.Field { return lr.field }
137137

138+
func (lr *leafReader) SeekToRow(rowIdx int64) error {
139+
pr, offset, err := lr.input.FindChunkForRow(rowIdx)
140+
if err != nil {
141+
return err
142+
}
143+
144+
lr.recordRdr.SetPageReader(pr)
145+
return lr.recordRdr.SeekToRow(offset)
146+
}
147+
138148
func (lr *leafReader) nextRowGroup() error {
139149
pr, err := lr.input.NextChunk()
140150
if err != nil {
@@ -227,6 +237,21 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
227237
return sr.defRepLevelChild.GetRepLevels()
228238
}
229239

240+
func (sr *structReader) SeekToRow(rowIdx int64) error {
241+
var g errgroup.Group
242+
if !sr.props.Parallel {
243+
g.SetLimit(1)
244+
}
245+
246+
for _, rdr := range sr.children {
247+
g.Go(func() error {
248+
return rdr.SeekToRow(rowIdx)
249+
})
250+
}
251+
252+
return g.Wait()
253+
}
254+
230255
func (sr *structReader) LoadBatch(nrecords int64) error {
231256
// Load batches in parallel
232257
// When reading structs with large numbers of columns, the serial load is very slow.
@@ -356,6 +381,10 @@ func (lr *listReader) Field() *arrow.Field { return lr.field }
356381

357382
func (lr *listReader) IsOrHasRepeatedChild() bool { return true }
358383

384+
func (lr *listReader) SeekToRow(rowIdx int64) error {
385+
return lr.itemRdr.SeekToRow(rowIdx)
386+
}
387+
359388
func (lr *listReader) LoadBatch(nrecords int64) error {
360389
return lr.itemRdr.LoadBatch(nrecords)
361390
}

parquet/pqarrow/file_reader.go

+70-9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"io"
24+
"slices"
2425
"sync"
2526
"sync/atomic"
2627

@@ -116,6 +117,7 @@ type colReaderImpl interface {
116117
GetDefLevels() ([]int16, error)
117118
GetRepLevels() ([]int16, error)
118119
Field() *arrow.Field
120+
SeekToRow(int64) error
119121
IsOrHasRepeatedChild() bool
120122
Retain()
121123
Release()
@@ -427,6 +429,20 @@ func (fr *FileReader) getColumnReader(ctx context.Context, i int, colFactory itr
427429
type RecordReader interface {
428430
array.RecordReader
429431
arrio.Reader
432+
// SeekToRow will shift the record reader so that subsequent calls to Read
433+
// or Next will begin from the specified row.
434+
//
435+
// If the record reader was constructed with a request for a subset of row
436+
// groups, then rows are counted across the requested row groups, not the
437+
// entire file. This prevents reading row groups that were requested to be
438+
// skipped, and allows treating the subset of row groups as a single collection
439+
// of rows.
440+
//
441+
// If the file contains Offset indexes for a given column, then it will be
442+
// utilized to skip pages as needed to find the requested row. Otherwise page
443+
// headers will have to still be read to find the right page to being reading
444+
// from.
445+
SeekToRow(int64) error
430446
}
431447

432448
// GetRecordReader returns a record reader that reads only the requested column indexes and row groups.
@@ -537,12 +553,8 @@ func (fr *FileReader) getReader(ctx context.Context, field *SchemaField, arrowFi
537553
}
538554

539555
// because we performed getReader concurrently, we need to prune out any empty readers
540-
for n := len(childReaders) - 1; n >= 0; n-- {
541-
if childReaders[n] == nil {
542-
childReaders = append(childReaders[:n], childReaders[n+1:]...)
543-
childFields = append(childFields[:n], childFields[n+1:]...)
544-
}
545-
}
556+
childReaders = slices.DeleteFunc(childReaders,
557+
func(r *ColumnReader) bool { return r == nil })
546558
if len(childFields) == 0 {
547559
return nil, nil
548560
}
@@ -615,15 +627,45 @@ type columnIterator struct {
615627
rdr *file.Reader
616628
schema *schema.Schema
617629
rowGroups []int
630+
631+
rgIdx int
618632
}
619633

620-
func (c *columnIterator) NextChunk() (file.PageReader, error) {
634+
func (c *columnIterator) FindChunkForRow(rowIdx int64) (file.PageReader, int64, error) {
621635
if len(c.rowGroups) == 0 {
636+
return nil, 0, nil
637+
}
638+
639+
if rowIdx < 0 || rowIdx > c.rdr.NumRows() {
640+
return nil, 0, fmt.Errorf("invalid row index %d, file only has %d rows", rowIdx, c.rdr.NumRows())
641+
}
642+
643+
idx := int64(0)
644+
for i, rg := range c.rowGroups {
645+
rgr := c.rdr.RowGroup(rg)
646+
if idx+rgr.NumRows() > rowIdx {
647+
c.rgIdx = i + 1
648+
pr, err := rgr.GetColumnPageReader(c.index)
649+
if err != nil {
650+
return nil, 0, err
651+
}
652+
653+
return pr, rowIdx - idx, nil
654+
}
655+
idx += rgr.NumRows()
656+
}
657+
658+
return nil, 0, fmt.Errorf("%w: invalid row index %d, row group subset only has %d total rows",
659+
arrow.ErrInvalid, rowIdx, idx)
660+
}
661+
662+
func (c *columnIterator) NextChunk() (file.PageReader, error) {
663+
if len(c.rowGroups) == 0 || c.rgIdx >= len(c.rowGroups) {
622664
return nil, nil
623665
}
624666

625-
rgr := c.rdr.RowGroup(c.rowGroups[0])
626-
c.rowGroups = c.rowGroups[1:]
667+
rgr := c.rdr.RowGroup(c.rowGroups[c.rgIdx])
668+
c.rgIdx++
627669
return rgr.GetColumnPageReader(c.index)
628670
}
629671

@@ -643,6 +685,25 @@ type recordReader struct {
643685
refCount int64
644686
}
645687

688+
func (r *recordReader) SeekToRow(row int64) error {
689+
if r.cur != nil {
690+
r.cur.Release()
691+
r.cur = nil
692+
}
693+
694+
if row < 0 || row >= r.numRows {
695+
return fmt.Errorf("invalid row index %d, file only has %d rows", row, r.numRows)
696+
}
697+
698+
for _, fr := range r.fieldReaders {
699+
if err := fr.SeekToRow(row); err != nil {
700+
return err
701+
}
702+
}
703+
704+
return nil
705+
}
706+
646707
func (r *recordReader) Retain() {
647708
atomic.AddInt64(&r.refCount, 1)
648709
}

0 commit comments

Comments
 (0)