From edc2d8eedc4f737d9d7f4c927f69e0cfe4415e95 Mon Sep 17 00:00:00 2001 From: atlas-booker Date: Fri, 6 May 2022 21:19:28 +0800 Subject: [PATCH] adding go plugin as a new method in Computed (#87) * update * newpatch * Abstract loader. * Use computed for all * Update with minor fix. * Update code , and test all * Support empty filter. * Delete comment * Update so, will complie it before test. * add missing dir. * Add go version info and sync stdout * Upgrade CI go version * Skip plugin test in CI Co-authored-by: YongHao Hu --- .github/workflows/test.yml | 2 +- .gitignore | 2 + go.mod | 7 +- go.sum | 4 +- internal/column/{ => computed}/computed.go | 108 +++++------------- .../column/{ => computed}/computed_test.go | 19 ++- internal/column/computed/lua.go | 24 ++++ internal/config/config.go | 7 +- internal/config/env/configurer.go | 7 +- internal/config/store.go | 5 +- internal/encoding/block/from_batch_test.go | 9 +- internal/encoding/block/from_csv.go | 3 + internal/encoding/block/transform.go | 4 +- internal/encoding/block/transform_test.go | 2 + internal/encoding/merge/orc.go | 3 + internal/encoding/merge/orc_test.go | 4 + internal/encoding/merge/parquet_test.go | 7 +- internal/encoding/parquet/parquet.go | 14 ++- internal/scripting/loader.go | 86 ++++++++++++++ internal/scripting/lualoader.go | 99 ++++++++++++++++ internal/scripting/plugin.go | 78 +++++++++++++ internal/scripting/plugin_test.go | 101 ++++++++++++++++ internal/scripting/plugin_test/main.go | 28 +++++ internal/scripting/script.go | 80 ------------- internal/scripting/script_test.go | 13 +-- internal/server/server.go | 9 +- internal/storage/flush/flush_test.go | 8 +- internal/storage/writer/base/base.go | 37 +++--- internal/storage/writer/base/base_test.go | 23 ++-- internal/storage/writer/pubsub/pubsub.go | 17 ++- internal/storage/writer/pubsub/pubsub_test.go | 12 +- internal/storage/writer/writer.go | 21 ++-- internal/storage/writer/writer_test.go | 3 - internal/table/log/log.go | 2 +- internal/table/timeseries/timeseries_test.go | 4 +- main.go | 22 +--- test/bench_test.go | 5 +- test/local.go | 5 +- 38 files changed, 590 insertions(+), 294 deletions(-) rename internal/column/{ => computed}/computed.go (54%) rename internal/column/{ => computed}/computed_test.go (76%) create mode 100644 internal/column/computed/lua.go create mode 100644 internal/scripting/loader.go create mode 100644 internal/scripting/lualoader.go create mode 100644 internal/scripting/plugin.go create mode 100644 internal/scripting/plugin_test.go create mode 100644 internal/scripting/plugin_test/main.go delete mode 100644 internal/scripting/script.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 78a241b2..53ee8a46 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,7 +12,7 @@ jobs: - name: Install Go uses: actions/setup-go@v1 with: - go-version: 1.16 + go-version: 1.17 id: go - name: Check out code into the Go module directory diff --git a/.gitignore b/.gitignore index 9afa6d9f..70bdfb73 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ testdata-* .envrc logs/ *.venv +talaria +*.so diff --git a/go.mod b/go.mod index d0ae10b9..8c4db046 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,6 @@ require ( github.com/fraugster/parquet-go v0.3.0 github.com/golang/snappy v0.0.3 github.com/gopherjs/gopherjs v0.0.0-20200209183636-89e6cbcd0b6d // indirect - github.com/gorilla/mux v1.7.4 github.com/grab/async v0.0.5 github.com/hako/durafmt v0.0.0-20191009132224-3f39dc1ed9f4 github.com/hashicorp/go-immutable-radix v1.2.0 // indirect @@ -61,10 +60,14 @@ require ( require github.com/Azure/go-autorest/autorest/adal v0.9.11 +require ( + github.com/Azure/go-autorest/autorest v0.11.17 + github.com/gorilla/mux v1.8.0 +) + require ( github.com/Azure/azure-pipeline-go v0.2.3 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect - github.com/Azure/go-autorest/autorest v0.11.17 // indirect github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 // indirect github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/logger v0.2.0 // indirect diff --git a/go.sum b/go.sum index de05df4d..f6d1003c 100644 --- a/go.sum +++ b/go.sum @@ -226,8 +226,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20200209183636-89e6cbcd0b6d h1:vr95xIx8Eg3vCzZPxY3rCwTfkjqNDt/FgVqTOk0WByk= github.com/gopherjs/gopherjs v0.0.0-20200209183636-89e6cbcd0b6d/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= -github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= -github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/grab/async v0.0.5 h1:QESBFAKiq5ocTtIcRh8gdha5Xgvu0yStGUefZsOWLPc= github.com/grab/async v0.0.5/go.mod h1:8zY9m1tryEmU2px8GLmWrHt7QXSQOhyurytRQ3LrzjQ= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= diff --git a/internal/column/computed.go b/internal/column/computed/computed.go similarity index 54% rename from internal/column/computed.go rename to internal/column/computed/computed.go index 310d66a9..a536de75 100644 --- a/internal/column/computed.go +++ b/internal/column/computed/computed.go @@ -1,117 +1,63 @@ // Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved. // Use of this source code is governed by an MIT-style license that can be found in the LICENSE file -package column +package computed import ( - "context" "crypto/rand" "encoding/binary" "encoding/hex" - "fmt" "sync/atomic" "time" - "github.com/kelindar/loader" "github.com/kelindar/lua" "github.com/kelindar/talaria/internal/encoding/typeof" - "github.com/kelindar/talaria/internal/scripting" + "github.com/kelindar/talaria/internal/monitor" + script "github.com/kelindar/talaria/internal/scripting" + mlog "github.com/kelindar/talaria/internal/scripting/log" + mnet "github.com/kelindar/talaria/internal/scripting/net" + mstats "github.com/kelindar/talaria/internal/scripting/stats" ) -// Loader is the default loader to use for loading computed columns. -var Loader = loader.New() - -// Default empty script -const emptyScript = `function main(row) - return null -end` +const ( + LuaLoaderTyp = "lua" + PluginLoaderTyp = "plugin" +) // Computed represents a computed column type Computed interface { - Name() string + Name() string // return column 's name Type() typeof.Type Value(map[string]interface{}) (interface{}, error) } // NewComputed creates a new script from a string -func NewComputed(name string, typ typeof.Type, uriOrCode string, loader *script.Loader) (Computed, error) { +func NewComputed(columnName, functionName string, outpuTyp typeof.Type, uriOrCode string, monitor monitor.Monitor) (Computed, error) { switch uriOrCode { case "make://identifier": - return newIdentifier(name), nil + return newIdentifier(columnName), nil case "make://timestamp": - return newTimestamp(name), nil + return newTimestamp(columnName), nil } - s, err := loader.Load(name, uriOrCode) - if err != nil { - return nil, err - } + pluginLoader := script.NewPluginLoader(functionName) + luaLoader := script.NewLuaLoader([]lua.Module{ + mlog.New(monitor), + mstats.New(monitor), + mnet.New(monitor), + }, outpuTyp) + l := script.NewHandlerLoader(pluginLoader, luaLoader) - return &scripted{ - code: s, - typ: typ, - }, nil -} - -// ------------------------------------------------------------------------------------------------------------ - -// scripted represents a computed column computed through a lua script -type scripted struct { - code *lua.Script // The script associated with the column - typ typeof.Type // The type of the column -} - -// Name returns the name of the column -func (c *scripted) Name() string { - return c.code.Name() -} - -// Type returns the type of the column -func (c *scripted) Type() typeof.Type { - return c.typ -} - -// Value computes the column value for the row -func (c *scripted) Value(row map[string]interface{}) (interface{}, error) { - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - - // Run the script - out, err := c.code.Run(ctx, row) + h, err := l.LoadHandler(uriOrCode) if err != nil { return nil, err } - // If there's no new row generated, return nil - if out.Type() == lua.TypeNil { - return nil, nil - } - - switch c.typ { - case typeof.Bool: - if v, ok := out.(lua.Bool); ok { - return bool(v), nil - } - case typeof.Int32: - if v, ok := out.(lua.Number); ok { - return int32(v), nil - } - case typeof.Int64, typeof.Timestamp: - if v, ok := out.(lua.Number); ok { - return int64(v), nil - } - case typeof.Float64: - if v, ok := out.(lua.Number); ok { - return float64(v), nil - } - case typeof.String, typeof.JSON: - if v, ok := out.(lua.String); ok { - return string(v), nil - } - } - - // Type mismatch - return nil, fmt.Errorf("script expects %s type but got %T", c.typ.String(), out) + return &loadComputed{ + name: columnName, + loader: h, + typ: outpuTyp, + }, nil } // ------------------------------------------------------------------------------------------------------------ diff --git a/internal/column/computed_test.go b/internal/column/computed/computed_test.go similarity index 76% rename from internal/column/computed_test.go rename to internal/column/computed/computed_test.go index 1d93b6df..f12ba1ed 100644 --- a/internal/column/computed_test.go +++ b/internal/column/computed/computed_test.go @@ -1,13 +1,12 @@ // Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved. // Use of this source code is governed by an MIT-style license that can be found in the LICENSE file -package column +package computed import ( "testing" "github.com/kelindar/talaria/internal/encoding/typeof" - "github.com/kelindar/talaria/internal/scripting" "github.com/stretchr/testify/assert" ) @@ -24,7 +23,7 @@ func Test_Computed(t *testing.T) { } func Test_Identifier(t *testing.T) { - c, err := NewComputed("id", typeof.String, "make://identifier", nil) + c, err := NewComputed("id", "", typeof.String, "make://identifier", nil) assert.NoError(t, err) out, err := c.Value(map[string]interface{}{ "a": 1, @@ -39,7 +38,7 @@ func Test_Identifier(t *testing.T) { } func Test_Timestamp(t *testing.T) { - c, err := NewComputed("ts", typeof.String, "make://timestamp", nil) + c, err := NewComputed("ts", "", typeof.String, "make://timestamp", nil) assert.NoError(t, err) out, err := c.Value(map[string]interface{}{ "a": 1, @@ -54,8 +53,8 @@ func Test_Timestamp(t *testing.T) { } func Test_Download(t *testing.T) { - l := script.NewLoader(nil) - c, err := NewComputed("data", typeof.JSON, "https://raw.githubusercontent.com/kelindar/lua/master/fixtures/json.lua", l) + c, err := NewComputed("data", "main", typeof.JSON, "https://raw.githubusercontent.com/kelindar/lua/master/fixtures/json.lua", nil) + assert.NoError(t, err) out, err := c.Value(map[string]interface{}{ "a": 1, "b": "hello", @@ -67,13 +66,13 @@ func Test_Download(t *testing.T) { } func newDataColumn(t *testing.T) Computed { - l := script.NewLoader(nil) - c, err := NewComputed("data", typeof.JSON, ` + c, err := NewComputed("data", "main", typeof.JSON, ` local json = require("json") - function main(row) + function main(row) return json.encode(row) - end`, l) + end`, nil) + assert.NotNil(t, c) assert.NoError(t, err) return c } diff --git a/internal/column/computed/lua.go b/internal/column/computed/lua.go new file mode 100644 index 00000000..f5ecb0d9 --- /dev/null +++ b/internal/column/computed/lua.go @@ -0,0 +1,24 @@ +package computed + +import ( + "github.com/kelindar/talaria/internal/encoding/typeof" + script "github.com/kelindar/talaria/internal/scripting" +) + +type loadComputed struct { + name string // Name of the column + loader script.Handler + typ typeof.Type +} + +func (l *loadComputed) Name() string { + return l.name +} + +func (l *loadComputed) Type() typeof.Type { + return l.typ +} + +func (l *loadComputed) Value(row map[string]interface{}) (interface{}, error) { + return l.loader.Value(row) +} diff --git a/internal/config/config.go b/internal/config/config.go index d200be89..c5d1c882 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -114,9 +114,10 @@ type StatsD struct { // Computed represents a computed column type Computed struct { - Name string `json:"name"` - Type typeof.Type `json:"type"` - Func string `json:"func"` + Name string `json:"name"` + Type typeof.Type `json:"type"` + Func string `json:"func"` + FuncName string `json:"funcname"` } // Compaction represents a configuration for compaction sinks diff --git a/internal/config/env/configurer.go b/internal/config/env/configurer.go index 9bdebe1b..6617e0f6 100644 --- a/internal/config/env/configurer.go +++ b/internal/config/env/configurer.go @@ -5,6 +5,7 @@ package env import ( "errors" + "io/ioutil" "os" "reflect" "strconv" @@ -34,7 +35,11 @@ func New(key string) *Configurer { // Configure fetches the values of the env variable for file name and sets that in the config func (e *Configurer) Configure(c *config.Config) error { if v, ok := os.LookupEnv(e.key); ok { - return yaml.Unmarshal([]byte(v), c) + yamlFile, err := ioutil.ReadFile(v) + if err != nil { + return yaml.Unmarshal([]byte(v), c) + } + return yaml.Unmarshal([]byte(yamlFile), c) } populate(c, e.key) diff --git a/internal/config/store.go b/internal/config/store.go index 40e21aea..15df7d24 100644 --- a/internal/config/store.go +++ b/internal/config/store.go @@ -5,6 +5,7 @@ package config import ( "context" + "fmt" "log" "sync/atomic" "time" @@ -27,7 +28,7 @@ func newStore(li time.Duration, co []Configurer) *store { c, err := s.value() if err != nil { - panic("unable to load config") + panic(fmt.Sprintf("unable to load config, err is %v", err)) } s.config.Store(c) return s @@ -56,7 +57,7 @@ func (cs *store) value() (*Config, error) { // Iterate through all the loaders to fill this config object for _, p := range cs.configurers { if err := p.Configure(c); err != nil { - log.Printf("%s : error in loadig config %s", p, err) + log.Printf("%+v : error in loadig config %s", p, err) return nil, err } } diff --git a/internal/encoding/block/from_batch_test.go b/internal/encoding/block/from_batch_test.go index 5a9fff9b..f27e8dad 100644 --- a/internal/encoding/block/from_batch_test.go +++ b/internal/encoding/block/from_batch_test.go @@ -7,9 +7,8 @@ import ( "encoding/json" "testing" - "github.com/kelindar/talaria/internal/column" + "github.com/kelindar/talaria/internal/column/computed" "github.com/kelindar/talaria/internal/encoding/typeof" - script "github.com/kelindar/talaria/internal/scripting" talaria "github.com/kelindar/talaria/proto" "github.com/stretchr/testify/assert" ) @@ -108,11 +107,11 @@ func TestBlock_FromBatch(t *testing.T) { assert.Contains(t, string(row["data"].(json.RawMessage)), "event3") } -func newDataColumn() (column.Computed, error) { - return column.NewComputed("data", typeof.JSON, ` +func newDataColumn() (computed.Computed, error) { + return computed.NewComputed("data", "main", typeof.JSON, ` local json = require("json") function main(input) return json.encode(input) - end`, script.NewLoader(nil)) + end`, nil) } diff --git a/internal/encoding/block/from_csv.go b/internal/encoding/block/from_csv.go index 46a99abe..c65894c0 100644 --- a/internal/encoding/block/from_csv.go +++ b/internal/encoding/block/from_csv.go @@ -20,6 +20,9 @@ func FromCSVBy(input []byte, partitionBy string, filter *typeof.Schema, apply ap // Read the header first r, err := rdr.Read() + if err != nil { + return nil, err + } header := r // Find the partition index diff --git a/internal/encoding/block/transform.go b/internal/encoding/block/transform.go index 88208c04..20ca02ad 100644 --- a/internal/encoding/block/transform.go +++ b/internal/encoding/block/transform.go @@ -4,12 +4,12 @@ package block import ( - "github.com/kelindar/talaria/internal/column" + "github.com/kelindar/talaria/internal/column/computed" "github.com/kelindar/talaria/internal/encoding/typeof" ) // Transform runs the computed Values and overwrites/appends them to the set. -func Transform(filter *typeof.Schema, computed ...column.Computed) applyFunc { +func Transform(filter *typeof.Schema, computed ...computed.Computed) applyFunc { return func(r Row) (Row, error) { // Create a new output row and copy the column values from the input schema := make(typeof.Schema, len(r.Schema)) diff --git a/internal/encoding/block/transform_test.go b/internal/encoding/block/transform_test.go index 718386e3..3acb5756 100644 --- a/internal/encoding/block/transform_test.go +++ b/internal/encoding/block/transform_test.go @@ -38,6 +38,7 @@ func TestTransform(t *testing.T) { // Run a transformation apply := Transform(&filter, dataColumn) out, err := apply(in) + assert.NoError(t, err) assert.NotNil(t, out) // Make sure input is not changed @@ -69,6 +70,7 @@ func TestTransform_NoFilter(t *testing.T) { // Run a transformation apply := Transform(nil, dataColumn) out, err := apply(in) + assert.NoError(t, err) assert.NotNil(t, out) // Make sure input is not changed diff --git a/internal/encoding/merge/orc.go b/internal/encoding/merge/orc.go index f70bbe1c..b1ee90a9 100644 --- a/internal/encoding/merge/orc.go +++ b/internal/encoding/merge/orc.go @@ -26,6 +26,9 @@ func ToOrc(blocks []block.Block, schema typeof.Schema) ([]byte, error) { writer, err := eorc.NewWriter(buffer, eorc.SetSchema(orcSchema), eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression})) + if err != nil { + return nil, err + } for _, blk := range blocks { rows, err := blk.Select(blk.Schema()) diff --git a/internal/encoding/merge/orc_test.go b/internal/encoding/merge/orc_test.go index 58ed8a08..5691cf60 100644 --- a/internal/encoding/merge/orc_test.go +++ b/internal/encoding/merge/orc_test.go @@ -44,7 +44,9 @@ func TestToOrc(t *testing.T) { apply := block.Transform(nil) block1, err := block.FromOrcBy(orcBuffer1.Bytes(), "col0", nil, apply) + assert.NoError(t, err) block2, err := block.FromOrcBy(orcBuffer2.Bytes(), "col0", nil, apply) + assert.NoError(t, err) mergedBlocks := []block.Block{} for _, blk := range block1 { @@ -107,7 +109,9 @@ func TestMerge_DifferentSchema(t *testing.T) { apply := block.Transform(nil) block1, err := block.FromOrcBy(orcBuffer1.Bytes(), "col0", nil, apply) + assert.NoError(t, err) block2, err := block.FromOrcBy(orcBuffer2.Bytes(), "col0", nil, apply) + assert.NoError(t, err) mergedBlocks := []block.Block{} for _, blk := range block1 { diff --git a/internal/encoding/merge/parquet_test.go b/internal/encoding/merge/parquet_test.go index 7d8d6f4e..3e0c74c0 100644 --- a/internal/encoding/merge/parquet_test.go +++ b/internal/encoding/merge/parquet_test.go @@ -86,7 +86,9 @@ func TestToParquet(t *testing.T) { apply := block.Transform(nil) block1, err := block.FromParquetBy(parquetBuffer1.Bytes(), "col1", nil, apply) + assert.NoError(t, err) block2, err := block.FromParquetBy(parquetBuffer2.Bytes(), "col1", nil, apply) + assert.NoError(t, err) mergedBlocks := []block.Block{} for _, blk := range block1 { @@ -122,7 +124,7 @@ func TestMergeParquet_DifferentSchema(t *testing.T) { "col1": typeof.Int64, "col2": typeof.Float64, } - parquetSchema, fieldHandlers, err := deriveSchema(schema) + parquetSchema, _, err := deriveSchema(schema) if err != nil { t.Fatal(err) @@ -179,7 +181,9 @@ func TestMergeParquet_DifferentSchema(t *testing.T) { apply := block.Transform(nil) block1, err := block.FromParquetBy(parquetBuffer1.Bytes(), "col1", nil, apply) + assert.NoError(t, err) block2, err := block.FromParquetBy(parquetBuffer2.Bytes(), "col1", nil, apply) + assert.NoError(t, err) mergedBlocks := []block.Block{} for _, blk := range block1 { @@ -214,4 +218,3 @@ func TestMergeParquet_DifferentSchema(t *testing.T) { t.Fatal("Merged parquet value differ") } } - diff --git a/internal/encoding/parquet/parquet.go b/internal/encoding/parquet/parquet.go index e91d5c09..bf0dbbcb 100644 --- a/internal/encoding/parquet/parquet.go +++ b/internal/encoding/parquet/parquet.go @@ -2,12 +2,13 @@ package parquet import ( "bytes" + "io" + "os" + goparquet "github.com/fraugster/parquet-go" "github.com/fraugster/parquet-go/parquet" "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/kelindar/talaria/internal/monitor/errors" - "io" - "os" ) var errNoWriter = errors.New("unable to create Parquet writer") @@ -28,6 +29,9 @@ func FromFile(filename string) (Iterator, error) { } r, err := goparquet.NewFileReader(rf) + if err != nil { + return nil, err + } return &iterator{reader: r}, nil } @@ -79,10 +83,10 @@ func (i *iterator) Range(f func(int, []interface{}) bool, columns ...string) (in index++ // Prepare the row slice - for i, columnName := range columns{ + for i, columnName := range columns { if v, ok := row[columnName]; ok { arr[i] = v - }else{ + } else { arr[i] = nil } } @@ -115,7 +119,7 @@ func parquetTypeOf(c *goparquet.Column) parquet.Type { k := c.Element().GetLogicalType() - switch { + switch { case k.IsSetSTRING(): return parquet.Type_BYTE_ARRAY case k.IsSetJSON(): diff --git a/internal/scripting/loader.go b/internal/scripting/loader.go new file mode 100644 index 00000000..097f1da6 --- /dev/null +++ b/internal/scripting/loader.go @@ -0,0 +1,86 @@ +package script + +import ( + "bytes" + "context" + "io" + "net/url" + "strings" + "time" + + "github.com/grab/async" + "github.com/kelindar/loader" +) + +const ( + luaType = "lua" + pluginType = "plugin" +) + +type Loader struct { + loader *loader.Loader // The loader to use to load and watch for code updates +} + +type Handler interface { + Load(uriOrCode string) (Handler, error) + String() string + Value(map[string]interface{}) (interface{}, error) +} + +type HandlerLoader struct { + hs []Handler +} + +func NewHandlerLoader(handlers ...Handler) *HandlerLoader { + s := HandlerLoader{} + for _, h := range handlers { + s.hs = append(s.hs, h) + } + return &s +} + +func parseHandler(uriOrCode string) string { + if strings.HasSuffix(uriOrCode, ".so") { + return pluginType + } + + return luaType +} + +func (l *HandlerLoader) LoadHandler(uriOrCode string) (Handler, error) { + h := parseHandler(uriOrCode) + for _, d := range l.hs { + if d.String() == h { + return d.Load(uriOrCode) + } + } + + return nil, nil +} + +// Wtch starts watching for script updates +func (l *Loader) watch(uriOrCode string, onUpdate func(io.Reader) error) error { + if _, err := url.Parse(uriOrCode); err != nil { + return onUpdate(strings.NewReader(uriOrCode)) // Assume it's the actual lua code + } + + // Start watching on the URL + updates := l.loader.Watch(context.Background(), uriOrCode, 5*time.Minute) + u := <-updates + if u.Err != nil { + return u.Err + } + + // Read the updates asynchronously + async.Invoke(context.Background(), func(ctx context.Context) (interface{}, error) { + for u := range updates { + if u.Err == nil { + _ = onUpdate(bytes.NewReader(u.Data)) + } + } + return nil, nil + }) + + // Perform a first update + return onUpdate(bytes.NewReader(u.Data)) +} diff --git a/internal/scripting/lualoader.go b/internal/scripting/lualoader.go new file mode 100644 index 00000000..ec0bf885 --- /dev/null +++ b/internal/scripting/lualoader.go @@ -0,0 +1,99 @@ +package script + +import ( + "context" + "errors" + "fmt" + "log" + "time" + + loaderpkg "github.com/kelindar/loader" + "github.com/kelindar/lua" + "github.com/kelindar/talaria/internal/encoding/typeof" +) + +type LuaLoader struct { + modules []lua.Module // The modules for the scripting environment + Loader + code *lua.Script // The script associated with the column + typ typeof.Type // The type of the column +} + +// NewLuaLoader creates a new loader that can be used to load scripts +func NewLuaLoader(luaModules []lua.Module, typ typeof.Type) *LuaLoader { + return &LuaLoader{ + modules: luaModules, + Loader: Loader{loaderpkg.New()}, + typ: typ, + } +} +func (l *LuaLoader) String() string { return luaType } + +// Load creates a new script from code or URL and starts a watching if needed +func (l *LuaLoader) Load(uriOrCode string) (Handler, error) { + log.Println("LuaLoader loadLua: ", uriOrCode) + + // Default empty script + const emptyScript = `function main(row) + return null + end` + + // Create an empty script, we'll update it right away + var err error + l.code, err = lua.FromString("luaScript", emptyScript, l.modules...) + if err != nil { + return nil, err + } + + // If the string is actually a URL, try to download it + if err := l.watch(uriOrCode, l.code.Update); err != nil { + return nil, err + } + + return l, nil +} + +func (l *LuaLoader) Value(row map[string]interface{}) (interface{}, error) { + if l.code == nil { + return nil, errors.New("LuaLoader's code is not loaded, nil") + } + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + // Run the script + out, err := l.code.Run(ctx, row) + if err != nil { + return nil, err + } + + // If there's no new row generated, return nil + if out.Type() == lua.TypeNil { + return nil, nil + } + + switch l.typ { + case typeof.Bool: + if v, ok := out.(lua.Bool); ok { + return bool(v), nil + } + case typeof.Int32: + if v, ok := out.(lua.Number); ok { + return int32(v), nil + } + case typeof.Int64, typeof.Timestamp: + if v, ok := out.(lua.Number); ok { + return int64(v), nil + } + case typeof.Float64: + if v, ok := out.(lua.Number); ok { + return float64(v), nil + } + case typeof.String, typeof.JSON: + if v, ok := out.(lua.String); ok { + return string(v), nil + } + } + + // Type mismatch + return nil, fmt.Errorf("script expects %s type but got %T", l.typ.String(), out) +} diff --git a/internal/scripting/plugin.go b/internal/scripting/plugin.go new file mode 100644 index 00000000..295a7c81 --- /dev/null +++ b/internal/scripting/plugin.go @@ -0,0 +1,78 @@ +package script + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "log" + "plugin" + "time" + + loaderpkg "github.com/kelindar/loader" +) + +type mainFunc = func(map[string]interface{}) (interface{}, error) + +type PluginLoader struct { + Loader + main mainFunc + functionName string +} + +func NewPluginLoader(functionName string) *PluginLoader { + return &PluginLoader{ + Loader: Loader{loaderpkg.New()}, + functionName: functionName, + } +} + +func (h *PluginLoader) Load(uriOrCode string) (Handler, error) { + log.Println("LoadGoPlugin: ", uriOrCode) + // try to download it + if err := h.watch(uriOrCode, h.updateGoPlugin); err != nil { + return nil, err + } + + return h, nil +} + +func (h *PluginLoader) String() string { return pluginType } + +func (h *PluginLoader) Value(row map[string]interface{}) (interface{}, error) { + return h.main(row) +} + +func (h *PluginLoader) updateGoPlugin(r io.Reader) error { + tmpFileName := fmt.Sprintf("%s.so", time.Now().Format("20060102150405")) + tmpFile, err := ioutil.TempFile("", tmpFileName) + if err != nil { + return err + } + + written, err := io.Copy(tmpFile, r) + if err != nil { + return err + } + if written == 0 { + return errors.New("PluginLoader load plugin content failed, content was empty") + } + + log.Printf("updateGoPlugin: write to file %s, try to open %s: ", tmpFileName, tmpFile.Name()) + p, err := plugin.Open(tmpFile.Name()) + if err != nil { + return err + } + + f, err := p.Lookup(h.functionName) + if err != nil { + return err + } + + ok := false + h.main, ok = f.(mainFunc) + if !ok { + return errors.New("type assertions on plugin funtion failed") + } + return nil +} diff --git a/internal/scripting/plugin_test.go b/internal/scripting/plugin_test.go new file mode 100644 index 00000000..1b1b71d7 --- /dev/null +++ b/internal/scripting/plugin_test.go @@ -0,0 +1,101 @@ +package script + +import ( + "context" + "fmt" + "io" + "os" + "os/exec" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func execCommand(ctx context.Context, cli string, params []string, dir string) error { + cmd := exec.CommandContext(ctx, cli, params...) + cmd.Dir = dir + var stdout, stderr io.ReadCloser + var err error + + if stdout, err = cmd.StdoutPipe(); err != nil { + return err + } + defer stdout.Close() + + if stderr, err = cmd.StderrPipe(); err != nil { + return err + } + defer stderr.Close() + + fmt.Printf("[execCommand] Start command: %s, params: %s\n", cli, params) + if err = cmd.Start(); err != nil { + return err + } + multireader := io.MultiReader(stdout, stderr) + _, err = io.Copy(os.Stdout, multireader) + if err != nil { + fmt.Printf("[execCommand] copy stdout to multireader met err %v\n", err) + } + + return cmd.Wait() +} + +func compliePlugin() { + os.Remove("./talaria_plugin.so") + + err := execCommand(context.Background(), "pwd", []string{"-P"}, "./plugin_test") + if err != nil { + panic(err) + } + + err = execCommand(context.Background(), "go", []string{"version"}, "./plugin_test") + if err != nil { + panic(err) + } + + params := []string{ + "build", "-buildmode=plugin", + "-o", "../talaria_plugin.so", + } + err = execCommand(context.Background(), "go", params, "./plugin_test") + if err != nil { + panic(err) + } +} + +func TestLoadGoPlugin(t *testing.T) { + if os.Getenv("CI") == "" { + compliePlugin() + l := NewPluginLoader("ComputeRow") + s, err := l.Load("file:///talaria_plugin.so") + assert.NotNil(t, s) + assert.NoError(t, err) + require.NoError(t, err) + out, err := s.Value(map[string]interface{}{ + "customKeyA": 12, + "time": 12999, + "uuid": "uuidValue", + "id": "idValue", + "customKeyB": "testCustomKeyB", + "customKeyC": "testCustomKeyC", + }) + require.NotNil(t, out) + require.NoError(t, err) + require.Equal(t, `{"customKeyA":12,"customKeyB":"testCustomKeyB","customKeyC":"testCustomKeyC"}`, out) + + } +} + +// func TestLoadS3GoPlugin(t *testing.T) { +// l := NewPluginLoader() +// s, err := l.LoadGoPlugin("data", "s3://mydata/talaria_go_function.so") +// require.NotNil(t, s) +// require.NoError(t, err) +// out, err := s.Value(map[string]interface{}{ +// "customKeyA": 12, "time": 12999, "uuid": "uuidValue", "id": "idValue", "customKeyB": "testCustomKeyB", +// }) +// require.NotNil(t, out) +// require.NoError(t, err) +// require.Equal(t, `{"customKeyA":12,"customKeyB":"testCustomKeyB"}`, out) +// } diff --git a/internal/scripting/plugin_test/main.go b/internal/scripting/plugin_test/main.go new file mode 100644 index 00000000..f0b06c55 --- /dev/null +++ b/internal/scripting/plugin_test/main.go @@ -0,0 +1,28 @@ +package main + +import "encoding/json" + +var fieldsDefined = []string{ + "time", "scope", "event", "update_at", "location", "city", "weather", + "uuid", "id", +} + +func isKeyInCommonfield(key string) bool { + for _, field := range fieldsDefined { + if key == field { + return true + } + } + return false +} + +func ComputeRow(rowArg map[string]interface{}) (interface{}, error) { + data := make(map[string]interface{}) + for key, val := range rowArg { + if !isKeyInCommonfield(key) { + data[key] = val + } + } + dataJSONStr, err := json.Marshal(data) + return string(dataJSONStr), err +} diff --git a/internal/scripting/script.go b/internal/scripting/script.go deleted file mode 100644 index 3319901b..00000000 --- a/internal/scripting/script.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved. -// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file - -package script - -import ( - "bytes" - "context" - "io" - "net/url" - "strings" - "time" - - "github.com/grab/async" - "github.com/kelindar/loader" - "github.com/kelindar/lua" -) - -// Default empty script -const emptyScript = `function main(row) - return null -end` - -// Loader represents a script loader -type Loader struct { - modules []lua.Module // The modules for the scripting environment - loader *loader.Loader // The loader to use to load and watch for code updates -} - -// NewLoader creates a new loader that can be used to load scripts -func NewLoader(modules []lua.Module) *Loader { - return &Loader{ - modules: modules, - loader: loader.New(), - } -} - -// Load creates a new script from code or URL and starts a watching if needed -func (l *Loader) Load(name string, uriOrCode string) (*lua.Script, error) { - - // Create an empty script, we'll update it right away - s, err := lua.FromString(name, emptyScript, l.modules...) - if err != nil { - return nil, err - } - - // If the string is actually a URL, try to download it - if err := l.watch(uriOrCode, s.Update); err != nil { - return nil, err - } - - return s, nil -} - -// Wtch starts watching for script updates -func (l *Loader) watch(uriOrCode string, onUpdate func(io.Reader) error) error { - if _, err := url.Parse(uriOrCode); err != nil { - return onUpdate(strings.NewReader(uriOrCode)) // Assume it's the actual lua code - } - - // Start watching on the URL - updates := l.loader.Watch(context.Background(), uriOrCode, 5*time.Minute) - u := <-updates - if u.Err != nil { - return u.Err - } - - // Read the updates asynchronously - async.Invoke(context.Background(), func(ctx context.Context) (interface{}, error) { - for u := range updates { - if u.Err == nil { - _ = onUpdate(bytes.NewReader(u.Data)) - } - } - return nil, nil - }) - - // Perform a first update - return onUpdate(bytes.NewReader(u.Data)) -} diff --git a/internal/scripting/script_test.go b/internal/scripting/script_test.go index 7be2aae3..3b0e02aa 100644 --- a/internal/scripting/script_test.go +++ b/internal/scripting/script_test.go @@ -4,25 +4,24 @@ package script import ( - "context" "testing" - "github.com/kelindar/lua" + "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/stretchr/testify/assert" ) -func Test_Download(t *testing.T) { - l := NewLoader(nil) - s, err := l.Load("data", "https://raw.githubusercontent.com/kelindar/lua/master/fixtures/json.lua") +func TestLoadLua(t *testing.T) { + l := NewLuaLoader(nil, typeof.String) + s, err := l.Load("https://raw.githubusercontent.com/kelindar/lua/master/fixtures/json.lua") assert.NotNil(t, s) assert.NoError(t, err) - out, err := s.Run(context.Background(), map[string]interface{}{ + out, err := s.Value(map[string]interface{}{ "a": 1, "b": "hello", }) assert.NotNil(t, out) assert.NoError(t, err) - assert.Equal(t, lua.String(`{"a":1,"b":"hello"}`), out) + assert.Equal(t, `{"a":1,"b":"hello"}`, out) } diff --git a/internal/server/server.go b/internal/server/server.go index 7197f8c3..299f0e7b 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -12,13 +12,12 @@ import ( "time" "github.com/grab/async" - "github.com/kelindar/talaria/internal/column" + "github.com/kelindar/talaria/internal/column/computed" "github.com/kelindar/talaria/internal/config" "github.com/kelindar/talaria/internal/ingress/s3sqs" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" "github.com/kelindar/talaria/internal/presto" - script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/server/thriftlog" "github.com/kelindar/talaria/internal/table" talaria "github.com/kelindar/talaria/proto" @@ -46,7 +45,7 @@ type Storage interface { // ------------------------------------------------------------------------------------------------------------ // New creates a new talaria server. -func New(conf config.Func, monitor monitor.Monitor, loader *script.Loader, tables ...table.Table) *Server { +func New(conf config.Func, monitor monitor.Monitor, tables ...table.Table) *Server { const maxMessageSize = 32 * 1024 * 1024 // 32 MB server := &Server{ server: grpc.NewServer(grpc.MaxRecvMsgSize(maxMessageSize)), @@ -57,7 +56,7 @@ func New(conf config.Func, monitor monitor.Monitor, loader *script.Loader, table // Load computed columns for _, c := range conf().Computed { - col, err := column.NewComputed(c.Name, c.Type, c.Func, loader) + col, err := computed.NewComputed(c.Name, c.FuncName, c.Type, c.Func, monitor) if err != nil { monitor.Error(err) continue @@ -86,7 +85,7 @@ type Server struct { monitor monitor.Monitor // The monitoring layer cancel context.CancelFunc // The cancellation function for the server tables map[string]table.Table // The list of tables - computed []column.Computed // The set of computed columns + computed []computed.Computed // The set of computed columns s3sqs *s3sqs.Ingress // The S3SQS Ingress (optional) } diff --git a/internal/storage/flush/flush_test.go b/internal/storage/flush/flush_test.go index 02e7fe69..ed852093 100644 --- a/internal/storage/flush/flush_test.go +++ b/internal/storage/flush/flush_test.go @@ -8,19 +8,18 @@ import ( "testing" eorc "github.com/crphang/orc" - "github.com/kelindar/talaria/internal/column" + "github.com/kelindar/talaria/internal/column/computed" "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/orc" "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/kelindar/talaria/internal/monitor" - script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/storage/writer/noop" "github.com/stretchr/testify/assert" ) func TestNameFunc(t *testing.T) { fileNameFunc := func(row map[string]interface{}) (string, error) { - lua, _ := column.NewComputed("fileName", typeof.String, ` + lua, _ := computed.NewComputed("fileName", "main", typeof.String, ` function main(row) -- Convert the time to a lua date @@ -46,7 +45,7 @@ func TestNameFunc(t *testing.T) { --localdate.isdst = false return os.difftime(os.time(localdate), os.time(utcdate)) end - `, script.NewLoader(nil)) + `, nil) output, err := lua.Value(row) return output.(string), err @@ -73,6 +72,7 @@ func TestNameFunc(t *testing.T) { apply := block.Transform(nil) blocks, err := block.FromOrcBy(orcBuffer.Bytes(), "col0", nil, apply) + assert.NoError(t, err) fileName := flusher.generateFileName(blocks[0]) assert.Equal(t, "year=46970/month=3/day=29/ns=eventName/0-0-0-127.0.0.1.orc", string(fileName)) diff --git a/internal/storage/writer/base/base.go b/internal/storage/writer/base/base.go index 237500bf..41ccf85e 100644 --- a/internal/storage/writer/base/base.go +++ b/internal/storage/writer/base/base.go @@ -4,32 +4,37 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/grab/async" - "github.com/kelindar/lua" "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/monitor/errors" - script "github.com/kelindar/talaria/internal/scripting" ) // Func encodes the payload type Func func(interface{}) ([]byte, error) +// FilterFunc used for filter +type FilterFunc func(map[string]interface{}) (interface{}, error) + // Writer is to filter and encode row of events type Writer struct { task async.Task Process func(context.Context) error - filter *lua.Script + filter FilterFunc name string encode Func } // New creates a new encoder -func New(filter, encoderFunc string, loader *script.Loader) (*Writer, error) { +func New(encoderFunc string, filter FilterFunc) (*Writer, error) { if encoderFunc == "" { encoderFunc = "json" } + if filter == nil { + filter = func(map[string]interface{}) (interface{}, error) { + return true, nil + } + } // Extendable encoder functions var encoder Func @@ -40,22 +45,11 @@ func New(filter, encoderFunc string, loader *script.Loader) (*Writer, error) { return nil, errors.Newf("encoder: unsupported encoder '%s'", encoderFunc) } - // If no filter was specified, create a base writer without a filter - if filter == "" { - return newWithEncoder(encoderFunc, nil, encoder) - } - - // Load the filter script if required - script, err := loader.Load(filter, filter) - if err != nil { - return nil, err - } - - return newWithEncoder(encoderFunc, script, encoder) + return newWithEncoder(encoderFunc, filter, encoder) } // newWithEncoder will generate a new encoder for a writer -func newWithEncoder(name string, filter *lua.Script, encoder Func) (*Writer, error) { +func newWithEncoder(name string, filter FilterFunc, encoder Func) (*Writer, error) { if encoder == nil { encoder = Func(json.Marshal) } @@ -112,12 +106,9 @@ func (w *Writer) applyFilter(row *block.Row) bool { return true } - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - // Runs the lua script - out, err := w.filter.Run(ctx, row.Values) - if err != nil || !out.(lua.Bool) { + out, err := w.filter(row.Values) + if err != nil || !out.(bool) { return false } return true diff --git a/internal/storage/writer/base/base_test.go b/internal/storage/writer/base/base_test.go index 9a3ee42e..8fe1bd19 100644 --- a/internal/storage/writer/base/base_test.go +++ b/internal/storage/writer/base/base_test.go @@ -6,8 +6,9 @@ import ( "time" "github.com/grab/async" + "github.com/kelindar/talaria/internal/column/computed" "github.com/kelindar/talaria/internal/encoding/block" - script "github.com/kelindar/talaria/internal/scripting" + "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/stretchr/testify/assert" ) @@ -22,8 +23,9 @@ func TestFilter(t *testing.T) { filter := `function main(row) return row['age'] > 10 end` - loader := script.NewLoader(nil) - enc1, _ := New(filter, "json", loader) + computedFilter, err := computed.NewComputed("", "", typeof.Bool, filter, nil) + assert.NoError(t, err) + enc1, _ := New("json", computedFilter.Value) data, err := enc1.Encode(row) assert.Equal(t, `{"age":30,"test":"Hello Talaria"}`, string(data)) @@ -32,17 +34,18 @@ func TestFilter(t *testing.T) { filter2 := `function main(row) return row['age'] < 10 end` - loader2 := script.NewLoader(nil) - enc2, _ := New(filter2, "json", loader2) + computedFilter, err = computed.NewComputed("", "", typeof.Bool, filter2, nil) + assert.NoError(t, err) + enc2, _ := New("json", computedFilter.Value) data2, err := enc2.Encode(row) + assert.NoError(t, err) assert.Nil(t, data2) } func TestRun(t *testing.T) { ctx := context.Background() - loader := script.NewLoader(nil) - w, _ := New("", "json", loader) + w, _ := New("json", nil) w.Process = func(context.Context) error { return nil } @@ -53,8 +56,7 @@ func TestRun(t *testing.T) { func TestCancel(t *testing.T) { ctx := context.Background() - loader := script.NewLoader(nil) - w, _ := New("", "json", loader) + w, _ := New("json", nil) w.Process = func(context.Context) error { time.Sleep(1 * time.Second) return nil @@ -66,8 +68,7 @@ func TestCancel(t *testing.T) { } func TestEmptyFilter(t *testing.T) { - loader := script.NewLoader(nil) - enc1, err := New("", "json", loader) + enc1, err := New("json", nil) assert.NotNil(t, enc1) assert.NoError(t, err) } diff --git a/internal/storage/writer/pubsub/pubsub.go b/internal/storage/writer/pubsub/pubsub.go index 0886ed68..fcc1fa2f 100644 --- a/internal/storage/writer/pubsub/pubsub.go +++ b/internal/storage/writer/pubsub/pubsub.go @@ -2,16 +2,18 @@ package pubsub import ( "context" + "log" "runtime" "time" "cloud.google.com/go/pubsub" "github.com/grab/async" + "github.com/kelindar/talaria/internal/column/computed" "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" + "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" - script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/storage/writer/base" "google.golang.org/api/option" ) @@ -28,7 +30,7 @@ type Writer struct { } // New creates a new writer -func New(project, topic, encoding, filter string, loader *script.Loader, monitor monitor.Monitor, opts ...option.ClientOption) (*Writer, error) { +func New(project, topic, encoding, filter string, monitor monitor.Monitor, opts ...option.ClientOption) (*Writer, error) { ctx := context.Background() client, err := pubsub.NewClient(ctx, project, opts...) @@ -36,8 +38,16 @@ func New(project, topic, encoding, filter string, loader *script.Loader, monitor return nil, errors.Newf("pubsub: %v", err) } + var filterF base.FilterFunc = nil + if filter != "" { + computed, err := computed.NewComputed("", "", typeof.Bool, filter, monitor) + if err != nil { + return nil, err + } + filterF = computed.Value + } // Load encoder - encoderWriter, err := base.New(filter, encoding, loader) + encoderWriter, err := base.New(encoding, filterF) if err != nil { return nil, err } @@ -66,6 +76,7 @@ func New(project, topic, encoding, filter string, loader *script.Loader, monitor } w.Process = w.process + log.Println("pubsub: New writter successfully, topic ", topic) return w, nil } diff --git a/internal/storage/writer/pubsub/pubsub_test.go b/internal/storage/writer/pubsub/pubsub_test.go index 06501882..e7b8d57b 100644 --- a/internal/storage/writer/pubsub/pubsub_test.go +++ b/internal/storage/writer/pubsub/pubsub_test.go @@ -34,7 +34,7 @@ func TestNew(t *testing.T) { conn := setup() // Fail because topic doesn't exist - c, err := New("gcp-project", "talaria", "", "", nil, monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), option.WithGRPCConn(conn)) + c, err := New("gcp-project", "talaria", "", "", monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), option.WithGRPCConn(conn)) assert.Nil(t, c) assert.Error(t, err) @@ -42,10 +42,12 @@ func TestNew(t *testing.T) { setup2(conn) // Doesn't fail after creating topic - c, err = New("gcp-project", "talaria", "", "", nil, monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), option.WithGRPCConn(conn)) + c, err = New("gcp-project", "talaria", "", "", monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), option.WithGRPCConn(conn)) - assert.IsType(t, &Writer{}, c) assert.NoError(t, err) + assert.IsType(t, &Writer{}, c) + assert.NotNil(t, c) + assert.NotNil(t, c.Writer) row := block.Row{ Values: map[string]interface{}{ @@ -78,7 +80,7 @@ func TestFullMessageBuffer(t *testing.T) { setup2(conn) // Doesn't fail after creating topic - c, err := New("gcp-project", "talaria", "", "", nil, monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), option.WithGRPCConn(conn)) + c, err := New("gcp-project", "talaria", "", "", monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), option.WithGRPCConn(conn)) assert.IsType(t, &Writer{}, c) assert.NoError(t, err) @@ -104,7 +106,7 @@ func TestWrite(t *testing.T) { conn := setup() setup2(conn) - c, _ := New("gcp-project", "talaria", "", "", nil, nil, option.WithGRPCConn(conn)) + c, _ := New("gcp-project", "talaria", "", "", nil, option.WithGRPCConn(conn)) res := c.Write(nil, nil) assert.Nil(t, res) } diff --git a/internal/storage/writer/writer.go b/internal/storage/writer/writer.go index 61ba3ac1..29eb2476 100644 --- a/internal/storage/writer/writer.go +++ b/internal/storage/writer/writer.go @@ -7,12 +7,11 @@ import ( "sort" "time" - "github.com/kelindar/talaria/internal/column" + "github.com/kelindar/talaria/internal/column/computed" "github.com/kelindar/talaria/internal/config" "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" - script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/storage" "github.com/kelindar/talaria/internal/storage/compact" "github.com/kelindar/talaria/internal/storage/flush" @@ -30,8 +29,8 @@ import ( var seed = maphash.MakeSeed() // ForStreaming creates a streaming writer -func ForStreaming(config config.Streams, monitor monitor.Monitor, loader *script.Loader) (storage.Streamer, error) { - writer, err := newStreamer(config, monitor, loader) +func ForStreaming(config config.Streams, monitor monitor.Monitor) (storage.Streamer, error) { + writer, err := newStreamer(config, monitor) if err != nil { monitor.Error(err) } @@ -40,8 +39,8 @@ func ForStreaming(config config.Streams, monitor monitor.Monitor, loader *script } // ForCompaction creates a compaction writer -func ForCompaction(config *config.Compaction, monitor monitor.Monitor, store storage.Storage, loader *script.Loader) (*compact.Storage, error) { - writer, err := newWriter(config.Sinks, monitor, loader) +func ForCompaction(config *config.Compaction, monitor monitor.Monitor, store storage.Storage) (*compact.Storage, error) { + writer, err := newWriter(config.Sinks, monitor) if err != nil { return nil, err } @@ -55,7 +54,7 @@ func ForCompaction(config *config.Compaction, monitor monitor.Monitor, store sto // If name function was specified, use it nameFunc := defaultNameFunc if config.NameFunc != "" { - if fn, err := column.NewComputed("nameFunc", typeof.String, config.NameFunc, loader); err == nil { + if fn, err := computed.NewComputed("nameFunc", "main", typeof.String, config.NameFunc, monitor); err == nil { nameFunc = func(row map[string]interface{}) (s string, e error) { val, err := fn.Value(row) if err != nil { @@ -81,7 +80,7 @@ func ForCompaction(config *config.Compaction, monitor monitor.Monitor, store sto } // NewWriter creates a new writer from the configuration. -func newWriter(config config.Sinks, monitor monitor.Monitor, loader *script.Loader) (flush.Writer, error) { +func newWriter(config config.Sinks, monitor monitor.Monitor) (flush.Writer, error) { var writers []multi.SubWriter // Configure S3 writer if present @@ -149,7 +148,7 @@ func newWriter(config config.Sinks, monitor monitor.Monitor, loader *script.Load // Configure Google Pub/Sub writer if present if config.PubSub != nil { - w, err := pubsub.New(config.PubSub.Project, config.PubSub.Topic, config.PubSub.Encoder, config.PubSub.Filter, loader, nil) + w, err := pubsub.New(config.PubSub.Project, config.PubSub.Topic, config.PubSub.Encoder, config.PubSub.Filter, monitor) if err != nil { return nil, err } @@ -166,7 +165,7 @@ func newWriter(config config.Sinks, monitor monitor.Monitor, loader *script.Load } // newStreamer creates a new streamer from the configuration. -func newStreamer(config config.Streams, monitor monitor.Monitor, loader *script.Loader) (flush.Writer, error) { +func newStreamer(config config.Streams, monitor monitor.Monitor) (flush.Writer, error) { var writers []multi.SubWriter // If no streams were configured, error out @@ -175,7 +174,7 @@ func newStreamer(config config.Streams, monitor monitor.Monitor, loader *script. } for _, v := range config { - w, err := newWriter(v, monitor, loader) + w, err := newWriter(v, monitor) if err != nil { return noop.New(), err } diff --git a/internal/storage/writer/writer_test.go b/internal/storage/writer/writer_test.go index 88096331..35e2480b 100644 --- a/internal/storage/writer/writer_test.go +++ b/internal/storage/writer/writer_test.go @@ -7,7 +7,6 @@ import ( "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/logging" "github.com/kelindar/talaria/internal/monitor/statsd" - script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/storage/disk" "github.com/stretchr/testify/assert" ) @@ -24,7 +23,6 @@ func TestForCompaction(t *testing.T) { compact, err := ForCompaction(cfg, monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), disk.New(monitor.NewNoop()), - script.NewLoader(nil), ) assert.NoError(t, err) assert.NotNil(t, compact) @@ -34,7 +32,6 @@ func TestForStreaming(t *testing.T) { cfg := config.Streams{} compact, err := ForStreaming(cfg, monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), - script.NewLoader(nil), ) assert.Nil(t, err) diff --git a/internal/table/log/log.go b/internal/table/log/log.go index 8152245b..e84b3273 100644 --- a/internal/table/log/log.go +++ b/internal/table/log/log.go @@ -42,7 +42,7 @@ func New(cfg config.Func, cluster Membership, monitor monitor.Monitor) *Table { store := disk.Open(cfg().Storage.Directory, name, monitor, cfg().Storage.Badger) // Create a noop streamer - streams, _ := writer.ForStreaming(config.Streams{}, monitor, nil) + streams, _ := writer.ForStreaming(config.Streams{}, monitor) base := timeseries.New(name, cluster, monitor, store, &config.Table{ TTL: 24 * 3600, // 1 day diff --git a/internal/table/timeseries/timeseries_test.go b/internal/table/timeseries/timeseries_test.go index 4e96bd4e..26c7480d 100644 --- a/internal/table/timeseries/timeseries_test.go +++ b/internal/table/timeseries/timeseries_test.go @@ -56,7 +56,7 @@ func TestTimeseries_DynamicSchema(t *testing.T) { monitor := monitor2.NewNoop() store := disk.Open(dir, name, monitor, config.Badger{}) - streams, _ := writer.ForStreaming(config.Streams{}, monitor, nil) + streams, _ := writer.ForStreaming(config.Streams{}, monitor) // Start the server and open the database eventlog := timeseries.New(name, new(noopMembership), monitor, store, &tableConf, streams) @@ -150,7 +150,7 @@ int1: int64 monitor := monitor2.NewNoop() store := disk.Open(dir, name, monitor, config.Badger{}) - streams, _ := writer.ForStreaming(config.Streams{}, monitor, nil) + streams, _ := writer.ForStreaming(config.Streams{}, monitor) // Start the server and open the database eventlog := timeseries.New(name, new(noopMembership), monitor, store, &tableConf, streams) diff --git a/main.go b/main.go index c5cb50ae..20c3de86 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,6 @@ import ( eorc "github.com/crphang/orc" "github.com/gorilla/mux" - "github.com/kelindar/lua" "github.com/kelindar/talaria/internal/config" "github.com/kelindar/talaria/internal/config/env" "github.com/kelindar/talaria/internal/config/s3" @@ -23,10 +22,6 @@ import ( "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/logging" "github.com/kelindar/talaria/internal/monitor/statsd" - script "github.com/kelindar/talaria/internal/scripting" - mlog "github.com/kelindar/talaria/internal/scripting/log" - mnet "github.com/kelindar/talaria/internal/scripting/net" - mstats "github.com/kelindar/talaria/internal/scripting/stats" "github.com/kelindar/talaria/internal/server" "github.com/kelindar/talaria/internal/server/cluster" "github.com/kelindar/talaria/internal/storage" @@ -67,21 +62,14 @@ func main() { // Updating the logger to use the composite logger. This is to make sure the logs from the config is sent to log table as well as stdout s3Configurer.SetLogger(logger) - // Create a script loader - loader := script.NewLoader([]lua.Module{ - mlog.New(monitor), - mstats.New(monitor), - mnet.New(monitor), - }) - // Open every table configured tables := []table.Table{nodes.New(gossip), logTable} for name, tableConf := range conf.Tables { - tables = append(tables, openTable(name, conf.Storage, tableConf, gossip, monitor, loader)) + tables = append(tables, openTable(name, conf.Storage, tableConf, gossip, monitor)) } // Start the new server - server := server.New(configure, monitor, loader, tables...) + server := server.New(configure, monitor, tables...) // onSignal will be called when a OS-level signal is received. onSignal(func(_ os.Signal) { @@ -108,21 +96,21 @@ func main() { } // openTable creates a new table with storage & optional compaction fully configured -func openTable(name string, storageConf config.Storage, tableConf config.Table, cluster cluster.Membership, monitor monitor.Monitor, loader *script.Loader) table.Table { +func openTable(name string, storageConf config.Storage, tableConf config.Table, cluster cluster.Membership, monitor monitor.Monitor) table.Table { monitor.Info("server: opening table %s...", name) // Create a new storage layer and optional compaction store := storage.Storage(disk.Open(storageConf.Directory, name, monitor, storageConf.Badger)) if tableConf.Compact != nil { var err error - store, err = writer.ForCompaction(tableConf.Compact, monitor, store, loader) + store, err = writer.ForCompaction(tableConf.Compact, monitor, store) if err != nil { panic(err) } } // Returns noop streamer if array is empty - streams, err := writer.ForStreaming(tableConf.Streams, monitor, loader) + streams, err := writer.ForStreaming(tableConf.Streams, monitor) if err != nil { panic(err) } diff --git a/test/bench_test.go b/test/bench_test.go index a26c9c43..9eee4516 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -13,7 +13,6 @@ import ( "github.com/kelindar/talaria/internal/config" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/presto" - script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/server" "github.com/kelindar/talaria/internal/storage/disk" "github.com/kelindar/talaria/internal/storage/writer" @@ -61,7 +60,7 @@ func BenchmarkQuery(b *testing.B) { // create monitor monitor := monitor.NewNoop() store := disk.Open(cfg().Storage.Directory, tableName, monitor, cfg().Storage.Badger) - streams, _ := writer.ForStreaming(config.Streams{}, monitor, script.NewLoader(nil)) + streams, _ := writer.ForStreaming(config.Streams{}, monitor) // Start the server and open the database eventlog := timeseries.New(tableName, new(noopMembership), monitor, store, &config.Table{ @@ -71,7 +70,7 @@ func BenchmarkQuery(b *testing.B) { Schema: "", }, streams) - server := server.New(cfg, monitor, script.NewLoader(nil), eventlog) + server := server.New(cfg, monitor, nil, eventlog) defer server.Close() // Append some files diff --git a/test/local.go b/test/local.go index c24a9a71..931da5e3 100644 --- a/test/local.go +++ b/test/local.go @@ -15,7 +15,6 @@ import ( "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/logging" "github.com/kelindar/talaria/internal/monitor/statsd" - script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/server" "github.com/kelindar/talaria/internal/server/cluster" "github.com/kelindar/talaria/internal/storage/disk" @@ -69,7 +68,7 @@ func main() { gossip.JoinHostname("localhost") store := disk.Open(cfg().Storage.Directory, tableName, monitor, config.Badger{}) - streams, _ := writer.ForStreaming(config.Streams{}, monitor, script.NewLoader(nil)) + streams, _ := writer.ForStreaming(config.Streams{}, monitor) // Start the server and open the database eventlog := timeseries.New(tableName, gossip, monitor, store, &config.Table{ @@ -79,7 +78,7 @@ func main() { Schema: "", }, streams) - server := server.New(cfg, monitor, script.NewLoader(nil), + server := server.New(cfg, monitor, nil, eventlog, nodes.New(gossip), )