Open
Description
I am trying to register a mapper whose logic depends on the application.
The minimal reproducible example I came up with:
package bla_test
import (
"io"
"testing"
"github.com/chrislusf/gleam/flow"
"github.com/chrislusf/gleam/gio"
"github.com/chrislusf/gleam/util"
)
func Test(t *testing.T) {
gio.Init()
ds := flow.New("").Source("a", util.Range(0, 100))
// function to be used in Output. Stores the result in `result`
var result []interface{}
fn := func(r io.Reader) error {
err := util.ProcessMessage(r, func(encodedBytes []byte) error {
result = append(result, len(encodedBytes))
return nil
})
return err
}
ds.Output(fn).Run()
t.Log(result)
// register mapper, note that it uses "result" from above
mapper := func(row []interface{}) error {
row = append(row, result)
gio.Emit(row...)
return nil
}
addColumn := gio.RegisterMapper(mapper)
ds1 := ds.Map("id", addColumn)
ds1.Output(fn).Run()
t.Log(result) // should contain the result from the first and second execution
}
this code hangs, not with outputting an error: Missing mapper function m1
.
Note that this code is not interesting per-se, I am exemplifying a use-case on which the mapper may depend on app-specific logic.
Is this error expected behavior? I imagine that it is in theory, because spark allows this kind of functionality.
Metadata
Metadata
Assignees
Labels
No labels