-
Notifications
You must be signed in to change notification settings - Fork 141
feat(go/adbc): add IngestStream helper for one-call ingestion and add TestIngestStream #3150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(go/adbc): add IngestStream helper for one-call ingestion and add TestIngestStream #3150
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this make more sense as a freestanding function? It can be implemented entirely in terms of existing APIs anyways, and that way we don't have to make a breaking change to the core API
I was thinking the same as what @lidavidm suggested, a free function would be better in this case to avoid a breaking change to the interface |
Wrap the ingestion steps into a single, freestanding `adbc.IngestStream` function for easier Arrow data ingestion. Add `TestIngestStream` in drivermgr_test.go to verify end-to-end functionality. Closes apache#3142
Wrap the ingestion steps into a single, freestanding `adbc.IngestStream` function for easier Arrow data ingestion. Add `TestIngestStream` in drivermgr_test.go to verify end-to-end functionality. Closes apache#3142
go/adbc/adbc.go
Outdated
// the five-step boilerplate of NewStatement, SetOption, Bind, | ||
// Execute, and Close. | ||
// | ||
// This is not part of the ADBC API specification. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this to ext.go? https://github.com/apache/arrow-adbc/blob/main/go/adbc/ext.go
// 1) Create the target table | ||
st, err := dm.conn.NewStatement() | ||
dm.Require().NoError(err) | ||
defer validation.CheckedClose(dm.T(), st) | ||
|
||
dm.NoError(st.SetSqlQuery(` | ||
CREATE TABLE IF NOT EXISTS ingest_test ( | ||
col1 INTEGER, | ||
col2 TEXT | ||
) | ||
`)) | ||
n, err := st.ExecuteUpdate(dm.ctx) | ||
dm.NoError(err) | ||
dm.Equal(int64(0), n, "CREATE TABLE should report 0 rows affected") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ingest can create the table itself, so this isn't necessarily necessary
defer b.Release() | ||
|
||
// first batch: 3 rows | ||
b.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArrayFromJSON might make this easier
go/adbc/adbc.go
Outdated
// Execute, and Close. | ||
// | ||
// This is not part of the ADBC API specification. | ||
func IngestStream(ctx context.Context, cnxn Connection, reader array.RecordReader, opts map[string]string) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should accept things like target table directly as parameters? Or have an explicit parameters struct? (We can keep the map for any other options but I think the idea ought to be that we make the common parameters into formal parameters instead of requiring the option)
go/adbc/adbc.go
Outdated
if err != nil { | ||
return -1, fmt.Errorf("IngestStream: NewStatement: %w", err) | ||
} | ||
defer stmt.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close is fallible, so you have to handle the error. You need something like this (with a named error return value)
defer func() {
err = errors.Join(err, stmt.Close())
}()
Wrap the ingestion steps into a single, freestanding `adbc.IngestStream` function for easier Arrow data ingestion. Add `TestIngestStream` in drivermgr_test to verify end-to-end functionality. Closes apache#3142
Wrap the ingestion steps into a single, freestanding `adbc.IngestStream` function for easier Arrow data ingestion. Add `TestIngestStream` in drivermgr_test to verify end-to-end functionality. Closes apache#3142
Wrap the ingestion steps into a single, freestanding `adbc.IngestStream` function for easier Arrow data ingestion. Add `TestIngestStream` in drivermgr_test to verify end-to-end functionality. Closes apache#3142
go/adbc/ext.go
Outdated
// 1) Create a new statement | ||
stmt, err := cnxn.NewStatement() | ||
if err != nil { | ||
return -1, fmt.Errorf("IngestStream: NewStatement: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the convention for golang error strings is that they do not start with a capitalized letter. Perhaps something more like:
fmt.Errorf("error during ingestion: NewStatement: %w", err)
go/adbc/ext.go
Outdated
} | ||
|
||
// Set required options | ||
if err := stmt.SetOption(OptionKeyIngestTargetTable, opt.TargetTable); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the :
, you're shadowing the err
from before so the errors.Join
isn't going to work this way.
go/adbc/ext.go
Outdated
} | ||
|
||
// 4) Execute the update | ||
count, err := stmt.ExecuteUpdate(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as before, you're shadowing the err
var here. Usually not a problem, but will be an issue with the defer from the top
go/adbc/ext.go
Outdated
type IngestStreamOption struct { | ||
TargetTable string // required | ||
IngestMode string // required, e.g. adbc.OptionValueIngestModeCreateAppend, or OptionValueIngestModeReplace | ||
Extra map[string]string // any other stmt.SetOption(...) args |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since these are required, should we make them explicit parameters instead of behind this struct? i.e.
func IngestStream(ctx context.Context, cnxn Connection, reader array.RecordReader, target, mode string, extra map[string]string) (int64, error)
what do you think? @lidavidm thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could go either way, but the struct may make it easier to add new parameters in the future? That said I don't mind table/mode being just formal parameters and having struct fields for things like temporary/catalog/schema, and keeping Extra for any driver-specific options
Wrap the ingestion steps into a single, freestanding `adbc.IngestStream` function for easier Arrow data ingestion. Add `TestIngestStream` in drivermgr_test to verify end-to-end functionality. Closes apache#3142
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks really good, just a couple comments.
go/adbc/ext.go
Outdated
|
||
// IngestStreamOption bundles the IngestStream options. | ||
// Driver specific options can go into Extra. | ||
type IngestStreamOption struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would pluralize this to be IngestStreamOptions
personally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternately, we could potentially do something like:
type IngestStreamOption func(adbc.Statement) error
func ingestSetOption(name, value string) IngestStreamOption {
return func(st adbc.Statement) {
return st.SetOption(name, cat)
}
}
func WithIngestCatalog(cat string) IngestStreamOption {
return ingestSetOption(adbc.OptionValueIngestTargetCatalog, cat)
}
func WithIngestTemp() IngestStreamOption {
return ingestSetOption(adbc.OptionValueIngestTemporary, adbc.OptionValueEnabled)
}
...
func IngestStream(ctx context.Context, cnxn Connection, reader array.RecordReader, targetTable, ingestMode string, opts ...IngestStreamOption) (count int64, err error) {
...
for _, o := range opts {
if err = o(stmt); err != nil {
err = fmt.Errorf("error during ingestion: %w", err)
return
}
}
....
}
which would not only allow for the options we provide but make it fairly easy for users to create their own options if they like and want to add, while still making this extensible.
I'm not saying we definitely should do this, just putting it out as a suggestion. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine as-is in terms of extensibility
go/adbc/ext.go
Outdated
|
||
// IngestStreamOption bundles the IngestStream options. | ||
// Driver specific options can go into Extra. | ||
type IngestStreamOption struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine as-is in terms of extensibility
go/adbc/ext.go
Outdated
|
||
// Set required options | ||
if err = stmt.SetOption(OptionKeyIngestTargetTable, targetTable); err != nil { | ||
return 0, fmt.Errorf("error during ingestion: SetOption(target_table=%s): %w", targetTable, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why do we go from returning -1 to 0 as the sentinel value? (Not that it matters either way, but I think we usually use -1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it. I've switched the sentinel back to −1. My pair programmer (AI assistant) must have accidentally used 0, and I overlooked it. Thanks for catching that!
Wrap the ingestion steps into a single, freestanding `adbc.IngestStream` function for easier Arrow data ingestion. Add `TestIngestStream` in drivermgr_test to verify end-to-end functionality. Closes apache#3142
Wrap the ingestion steps into a single, freestanding `adbc.IngestStream` function for easier Arrow data ingestion. Add `TestIngestStream` in drivermgr_test to verify end-to-end functionality. Closes apache#3142
Solves #3142
Changes: