Skip to content

Commit

Permalink
Implement Parquet Support for Reads (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
atris authored Mar 31, 2021
1 parent 804ed1d commit 607ac56
Show file tree
Hide file tree
Showing 13 changed files with 668 additions and 177 deletions.
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ require (
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dnaeon/go-vcr v1.0.1 // indirect
github.com/emitter-io/address v1.0.0
github.com/gogo/protobuf v1.3.1
github.com/fraugster/parquet-go v0.3.0
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.1
github.com/gopherjs/gopherjs v0.0.0-20200209183636-89e6cbcd0b6d // indirect
github.com/gorilla/mux v1.7.4
Expand All @@ -48,13 +49,12 @@ require (
github.com/stretchr/testify v1.5.1
github.com/twmb/murmur3 v1.1.3
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb // indirect
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 // indirect
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f // indirect
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/net v0.0.0-20210326060303-6b1517762897 // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
golang.org/x/tools v0.0.0-20200512001501-aaeff5de670a // indirect
google.golang.org/api v0.24.0
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380 // indirect
google.golang.org/grpc v1.31.0
google.golang.org/genproto v0.0.0-20210325224202-eed09b1b5210 // indirect
google.golang.org/grpc v1.36.1
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 // indirect
gopkg.in/yaml.v2 v2.2.8
)
57 changes: 57 additions & 0 deletions go.sum

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions internal/encoding/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,33 @@ func BenchmarkBlockRead(b *testing.B) {
})
}

// BenchmarkBlockReadForParquet/read-12 469470 2482 ns/op 1665 B/op 16 allocs/op
func BenchmarkBlockReadForParquet(b *testing.B) {
o, err := ioutil.ReadFile(testFileForParquet)
noerror(err)

apply := Transform(nil)
blk, err := FromParquetBy(o, "foo", nil, apply)
noerror(err)

// 122MB uncompressed
// 13MB snappy compressed
buf, err := blk[0].Encode()
noerror(err)

columns := typeof.Schema{
"foo": typeof.String,
}

b.Run("read", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
_, _ = Read(buf, columns)
}
})
}

// BenchmarkFrom/orc-8 3434 309327 ns/op 789779 B/op 1906 allocs/op
// BenchmarkFrom/batch-8 50971 22366 ns/op 19824 B/op 206 allocs/op
func BenchmarkFrom(b *testing.B) {
Expand All @@ -113,6 +140,19 @@ func BenchmarkFrom(b *testing.B) {
}
})

b.Run("parquet", func(b *testing.B) {
o, err := ioutil.ReadFile(testFileForParquet)
noerror(err)

apply := Transform(nil)

b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
_, err = FromParquetBy(o, "bar", nil, apply)
noerror(err)
}
})
}

func noerror(err error) {
Expand Down
94 changes: 94 additions & 0 deletions internal/encoding/block/from_parquet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package block

import (
"github.com/kelindar/talaria/internal/column"
"github.com/kelindar/talaria/internal/encoding/parquet"
"github.com/kelindar/talaria/internal/encoding/typeof"
)

// FromParquetBy decodes a set of blocks from a Parquet file and repartitions
// it by the specified partition key.
func FromParquetBy(payload []byte, partitionBy string, filter *typeof.Schema, apply applyFunc) ([]Block, error) {
const max = 10000000 // 10MB

iter, err := parquet.FromBuffer(payload)
if err != nil {
return nil, err
}

// Find the partition index
schema := iter.Schema()
cols := schema.Columns()
partitionIdx, ok := findString(cols, partitionBy)
if !ok {
return nil, nil // Skip the file if it has no partition column
}

// The resulting set of blocks, repartitioned and chunked
blocks := make([]Block, 0, 128)

// Create presto columns and iterate
result, size := make(map[string]column.Columns, 16), 0
_, _ = iter.Range(func(rowIdx int, r []interface{}) bool {
if size >= max {
pending, err := makeBlocks(result)
if err != nil {
return true
}

size = 0 // Reset the size
blocks = append(blocks, pending...)
result = make(map[string]column.Columns, 16)
}

// Get the partition value, must be a string
partition, ok := convertToString(r[partitionIdx])
if !ok {
return true
}

// Skip the record if the partition is actually empty
if partition == "" {
return false
}

// Get the block for that partition
columns, exists := result[partition]
if !exists {
columns = column.MakeColumns(filter)
result[partition] = columns
}

// Prepare a row for transformation
row := NewRow(schema, len(r))
for i, v := range r {
columnName := cols[i]
columnType := schema[columnName]

// Encode to JSON
if columnType == typeof.JSON {
if encoded, ok := convertToJSON(v); ok {
v = encoded
}
}

row.Set(columnName, v)
}

// Append computed columns and fill nulls for the row
out, _ := apply(row)

size += out.AppendTo(columns)
size += columns.FillNulls()
return false
}, cols...)

// Write the last chunk
last, err := makeBlocks(result)
if err != nil {
return nil, err
}

blocks = append(blocks, last...)
return blocks, nil
}
20 changes: 20 additions & 0 deletions internal/encoding/block/from_parquet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package block

import (
"github.com/stretchr/testify/assert"
"io/ioutil"
"testing"
)

const testFileForParquet = "../../../test/test2.parquet"

func TestFromParquet_Nested(t *testing.T) {
o, err := ioutil.ReadFile(testFileForParquet)
assert.NotEmpty(t, o)
assert.NoError(t, err)

apply := Transform(nil)
b, err := FromParquetBy(o, "foo", nil, apply)
assert.NoError(t, err)
assert.Equal(t, 10000, len(b))
}
2 changes: 2 additions & 0 deletions internal/encoding/block/from_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func FromRequestBy(request *talaria.IngestRequest, partitionBy string, filter *t
return FromCSVBy(data.Csv, partitionBy, filter, apply)
case *talaria.IngestRequest_Url:
return FromURLBy(data.Url, partitionBy, filter, apply)
case *talaria.IngestRequest_Parquet:
return FromParquetBy(data.Parquet, partitionBy, filter, apply)
case nil: // The field is not set.
return nil, nil
default:
Expand Down
2 changes: 2 additions & 0 deletions internal/encoding/block/from_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func FromURLBy(uri string, partitionBy string, filter *typeof.Schema, apply appl
handler = FromOrcBy
case ".csv":
handler = FromCSVBy
case ".parquet":
handler = FromParquetBy
default:
return nil, errors.Newf("block: unsupported file extension %s", filepath.Ext(uri))
}
Expand Down
125 changes: 125 additions & 0 deletions internal/encoding/parquet/parquet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package parquet

import (
"bytes"
goparquet "github.com/fraugster/parquet-go"
"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/monitor/errors"
"io"
"os"
"sort"
)

var errNoWriter = errors.New("unable to create Parquet writer")

// Iterator represents parquet data frame.
type Iterator interface {
io.Closer
Range(f func(int, []interface{}) bool, columns ...string) (int, bool)
Schema() typeof.Schema
}

// FromFile creates an iterator from a file.
func FromFile(filename string) (Iterator, error) {
rf, err := os.Open(filename)

if err != nil {
return nil, err
}

r, err := goparquet.NewFileReader(rf)
return &iterator{reader: r}, nil
}

// FromBuffer creates an iterator from a buffer.
func FromBuffer(b []byte) (Iterator, error) {
r, err := goparquet.NewFileReader(bytes.NewReader(b))
if err != nil {
return nil, err
}

return &iterator{reader: r}, nil
}

// Range is a helper function that ranges over a set of columns in a Parquet buffer
func Range(payload []byte, f func(int, []interface{}) bool, columns ...string) error {
i, err := FromBuffer(payload)
if err != nil {
return err
}

_, _ = i.Range(f, columns...)
return nil
}

// First selects a first row only, then stops.
func First(payload []byte, columns ...string) (result []interface{}, err error) {
err = Range(payload, func(_ int, v []interface{}) bool {
result = v
return true // No need to iterate further, we just take 1st element
}, columns...)
return
}

// Iterator represents parquet data frame.
type iterator struct {
reader *goparquet.FileReader
}

// Range iterates through the reader.
func (i *iterator) Range(f func(int, []interface{}) bool, columns ...string) (index int, stop bool) {
//TODO: Do this once the release is done
//c := i.reader.SchemaReader.setSelectedColumns
r := i.reader
for {
row, err := r.NextRow()
if err == io.EOF {
break
}

var arr []interface{}

// We need to ensure that the row has columns ordered by name since that is how columns are generated
// in the upstream schema
keys := make([]string, len(row))
i := 0
for k := range row {
keys[i] = k
i++
}
sort.Strings(keys)

for k := range keys {
k := keys[k]
v := row[k]

arr = append(arr, v)
}

if stop = f(index-1, arr); stop {
return index, false
}
}

return index, true
}

// Schema gets the SQL schema for the iterator.
func (i *iterator) Schema() typeof.Schema {
schema := i.reader.SchemaReader
result := make(typeof.Schema, len(schema.Columns()))
for _, c := range schema.Columns() {
t := c.Type()

if t, supported := typeof.FromParquet(t); supported {
result[c.Name()] = t
}
}
return result
}

// Close closes the iterator.
func (i *iterator) Close() error {
// No Op
return nil
}
Loading

0 comments on commit 607ac56

Please sign in to comment.