Skip to content

Commit

Permalink
Convert to flushing by row and increase orc compression chunk size fo…
Browse files Browse the repository at this point in the history
…r stability (#37)

Co-authored-by: Ankit <[email protected]>
  • Loading branch information
crphang and a9kitkumarsinha authored Jul 30, 2020
1 parent 6083b6f commit d745aee
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/Knetic/govaluate v3.0.0+incompatible
github.com/armon/go-metrics v0.3.3 // indirect
github.com/aws/aws-sdk-go v1.30.25
github.com/crphang/orc v0.0.5
github.com/crphang/orc v0.0.3
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200626160443-3042e3776798
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/crphang/orc v0.0.3 h1:izgRzMsYn7vjkBKA33mN60qpBW8w8nLEDbnmQpSS95A=
github.com/crphang/orc v0.0.3/go.mod h1:+siY09J77eYa8+0+UXZxeyv8nrmhvOY40DbqB7gRVhU=
github.com/crphang/orc v0.0.4 h1:lNaaedlcIDuhQUOOb6ruOT5ggS86dFimnWIYXkOy7v8=
github.com/crphang/orc v0.0.4/go.mod h1:+siY09J77eYa8+0+UXZxeyv8nrmhvOY40DbqB7gRVhU=
github.com/crphang/orc v0.0.5 h1:Rua/VojAWCoDAiHXBtnK+5JX5YdXV3Sn8KOZB78D4gA=
github.com/crphang/orc v0.0.5/go.mod h1:+siY09J77eYa8+0+UXZxeyv8nrmhvOY40DbqB7gRVhU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
72 changes: 72 additions & 0 deletions internal/presto/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ func (b *PrestoThriftInteger) Range(from int, until int, f func(int, interface{}
return nil
}

func (b *PrestoThriftInteger) At(index int) interface{} {
if index >= len(b.Ints) || b.Nulls[index] {
return nil
}

return b.Ints[index]
}

// ------------------------------------------------------------------------------------------------------------

// Append adds a value to the block.
Expand Down Expand Up @@ -261,6 +269,14 @@ func (b *PrestoThriftBigint) Range(from int, until int, f func(int, interface{})
return nil
}

func (b *PrestoThriftBigint) At(index int) interface{} {
if index >= len(b.Longs) || b.Nulls[index] {
return nil
}

return b.Longs[index]
}

// ------------------------------------------------------------------------------------------------------------

// Append adds a value to the block.
Expand Down Expand Up @@ -368,6 +384,14 @@ func (b *PrestoThriftDouble) Range(from int, until int, f func(int, interface{})
return nil
}

func (b *PrestoThriftDouble) At(index int) interface{} {
if index >= len(b.Doubles) || b.Nulls[index] {
return nil
}

return b.Doubles[index]
}

// ------------------------------------------------------------------------------------------------------------

// Append adds a value to the block.
Expand Down Expand Up @@ -493,6 +517,22 @@ func (b *PrestoThriftVarchar) Range(from int, until int, f func(int, interface{}
return nil
}

func (b *PrestoThriftVarchar) At(index int) interface{} {
if index >= len(b.Sizes) || b.Nulls[index] {
return nil
}

var offset int32
// Seek to current offset
for k := 0; k < index; k++ {
offset += b.Sizes[k]
}

size := b.Sizes[index]
v := b.Bytes[offset:offset+size]
return binaryToString(&v)
}

// ------------------------------------------------------------------------------------------------------------

// Append adds a value to the block.
Expand Down Expand Up @@ -600,6 +640,14 @@ func (b *PrestoThriftBoolean) Range(from int, until int, f func(int, interface{}
return nil
}

func (b *PrestoThriftBoolean) At(index int) interface{} {
if index >= len(b.Booleans) || b.Nulls[index] {
return nil
}

return b.Booleans[index]
}

// ------------------------------------------------------------------------------------------------------------

// Append adds a value to the block.
Expand Down Expand Up @@ -716,6 +764,14 @@ func (b *PrestoThriftTimestamp) Range(from int, until int, f func(int, interface
return nil
}

func (b *PrestoThriftTimestamp) At(index int) interface{} {
if index >= len(b.Timestamps) || b.Nulls[index] {
return nil
}

return b.Timestamps[index]
}

// ------------------------------------------------------------------------------------------------------------

// Append adds a value to the block.
Expand Down Expand Up @@ -852,6 +908,22 @@ func (b *PrestoThriftJson) Range(from int, until int, f func(int, interface{}) e
return nil
}

func (b *PrestoThriftJson) At(index int) interface{} {
if index >= len(b.Sizes) || b.Nulls[index] {
return nil
}

var offset int32
// Seek to current offset
for k := 0; k < index; k++ {
offset += b.Sizes[k]
}

size := b.Sizes[index]
v := b.Bytes[offset:offset+size]
return binaryToString(&v)
}

// Converts binary to string in a zero-alloc manner
func binaryToString(b *[]byte) string {
return *(*string)(unsafe.Pointer(b))
Expand Down
1 change: 1 addition & 0 deletions internal/presto/presto.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Column interface {
AsThrift() *PrestoThriftBlock
AsProto() *talaria.Column
Range(from int, until int, f func(int, interface{}) error) error
At(index int) interface{}
}

// Serve creates and serves thrift RPC for presto. Context is used for cancellation purposes.
Expand Down
16 changes: 11 additions & 5 deletions internal/storage/flush/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,20 @@ func (s *Storage) Merge(blocks []block.Block, schema typeof.Schema) ([]byte, []b

cols.FillNulls()

colIterator := []eorc.ColumnIterator{}
allCols := []column.Column{}

for _, colName := range schema.Columns() {
colIterator = append(colIterator, cols[colName])
allCols = append(allCols, cols[colName])
}

if err := writer.WriteColumns(colIterator); err != nil {
s.monitor.Error(errors.Internal("flush: error writing columns", err))
return nil, nil
for i := 0; i < allCols[0].Count(); i++ {
row := []interface{}{}
for j := 0; j < len(allCols); j++ {
row = append(row, allCols[j].At(i))
}
if err := writer.Write(row...); err != nil {
s.monitor.Error(errors.Internal("flush: error writing row", err))
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"syscall"
"time"

eorc "github.com/crphang/orc"
"github.com/gorilla/mux"
"github.com/kelindar/lua"
"github.com/kelindar/talaria/internal/config"
Expand Down Expand Up @@ -41,6 +42,7 @@ const (
)

func main() {
eorc.DefaultCompressionChunkSize = 16 * eorc.DefaultCompressionChunkSize
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down

0 comments on commit d745aee

Please sign in to comment.