Skip to content

Commit f99e760

Browse files
committed
feat(ingest): better batching
1 parent 61ce12a commit f99e760

File tree

5 files changed

+51
-23
lines changed

5 files changed

+51
-23
lines changed

.golangci.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ issues:
5757
- linters:
5858
- staticcheck
5959
text: "SA1019: client.Datasets.QueryLegacy"
60+
- linters:
61+
- staticcheck
62+
text: "SA1019: client.QueryLegacy"
6063
- linters:
6164
- staticcheck
6265
text: 'SA1019: "github.com/axiomhq/axiom-go/axiom/querylegacy"'

.goreleaser.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ brews:
137137
name: axiom-automation
138138
139139
skip_upload: auto
140-
folder: Formula
140+
directory: Formula
141141
install: |
142142
bin.install "{{ .ProjectName }}"
143143
man1.install Dir["man/{{ .ProjectName }}*.1"]

internal/cmd/ingest/ingest.go

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,11 @@ type options struct {
5353
// Delimiter that separates CSV fields.
5454
Delimiter string
5555
// FlushEvery flushes the ingestion buffer after the specified duration. It
56-
// is only valid when ingesting a stream of newline delimited JSON objects
57-
// of unknown length.
56+
// is only valid when ingesting batchable data, e.g. newline delimited JSON
57+
// and CSV (with field names explicitly set) data that is not encoded.
5858
FlushEvery time.Duration
59+
// BatchSize to aim for when ingesting batchable data.
60+
BatchSize uint
5961
// ContentType of the data to ingest.
6062
ContentType axiom.ContentType
6163
contentType string // for the flag value
@@ -81,7 +83,7 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
8183
}
8284

8385
cmd := &cobra.Command{
84-
Use: "ingest <dataset-name> [(-f|--file) <filename> [ ...]] [--timestamp-field <timestamp-field>] [--timestamp-format <timestamp-format>] [--flush-every <duration>] [(-t|--content-type <content-type>] [(-e|--content-encoding <content-encoding>] [(-l|--label) <key>:<value> [ ...]]",
86+
Use: "ingest <dataset-name> [(-f|--file) <filename> [ ...]] [--timestamp-field <timestamp-field>] [--timestamp-format <timestamp-format>] [(-d|--delimiter <delimiter>] [--flush-every <duration>] [(-b|--batch-size <batch-size>] [(-t|--content-type <content-type>] [(-e|--content-encoding <content-encoding>] [(-l|--label) <key>:<value> [ ...]] [--csv-fields <field> [ ...]] [--continue-on-error <TRUE|FALSE>]",
8587
Short: "Ingest structured data",
8688
Long: heredoc.Doc(`
8789
Ingest structured data into an Axiom dataset.
@@ -193,15 +195,22 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
193195
if err := complete(cmd.Context(), opts); err != nil {
194196
return err
195197
}
196-
return run(cmd.Context(), opts, cmd.Flag("flush-every").Changed, cmd.Flag("csv-fields").Changed)
198+
return run(
199+
cmd.Context(),
200+
opts,
201+
cmd.Flag("flush-every").Changed,
202+
cmd.Flag("batch-size").Changed,
203+
cmd.Flag("csv-fields").Changed,
204+
)
197205
},
198206
}
199207

200208
cmd.Flags().StringSliceVarP(&opts.Filenames, "file", "f", nil, "File(s) to ingest (- to read from stdin). If stdin is a pipe the default value is -, otherwise this is a required parameter")
201209
cmd.Flags().StringVar(&opts.TimestampField, "timestamp-field", "", "Field to take the ingestion time from (defaults to _time)")
202210
cmd.Flags().StringVar(&opts.TimestampFormat, "timestamp-format", "", "Format used in the the timestamp field. Default uses a heuristic parser. Must be expressed using the reference time 'Mon Jan 2 15:04:05 -0700 MST 2006'")
203211
cmd.Flags().StringVarP(&opts.Delimiter, "delimiter", "d", "", "Delimiter that separates CSV fields (only valid when input is CSV")
204-
cmd.Flags().DurationVar(&opts.FlushEvery, "flush-every", time.Second, "Buffer flush interval for newline delimited JSON streams of unknown length")
212+
cmd.Flags().DurationVar(&opts.FlushEvery, "flush-every", time.Second*5, "Buffer flush interval for batchable data")
213+
cmd.Flags().UintVarP(&opts.BatchSize, "batch-size", "b", 10_000, "Batch size to aim for")
205214
cmd.Flags().StringVarP(&opts.contentType, "content-type", "t", "", "Content type of the data to ingest (will auto-detect if not set, must be set if content encoding is set and content type is not identity)")
206215
cmd.Flags().StringVarP(&opts.contentEncoding, "content-encoding", "e", axiom.Identity.String(), "Content encoding of the data to ingest")
207216
cmd.Flags().StringSliceVarP(&opts.labels, "label", "l", nil, "Labels to attach to the ingested events, server side")
@@ -212,9 +221,11 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
212221
_ = cmd.RegisterFlagCompletionFunc("timestamp-format", cmdutil.NoCompletion)
213222
_ = cmd.RegisterFlagCompletionFunc("delimiter", cmdutil.NoCompletion)
214223
_ = cmd.RegisterFlagCompletionFunc("flush-every", cmdutil.NoCompletion)
224+
_ = cmd.RegisterFlagCompletionFunc("batch-size", cmdutil.NoCompletion)
215225
_ = cmd.RegisterFlagCompletionFunc("content-type", contentTypeCompletion)
216226
_ = cmd.RegisterFlagCompletionFunc("content-encoding", contentEncodingCompletion)
217227
_ = cmd.RegisterFlagCompletionFunc("label", cmdutil.NoCompletion)
228+
_ = cmd.RegisterFlagCompletionFunc("csv-fields", cmdutil.NoCompletion)
218229
_ = cmd.RegisterFlagCompletionFunc("continue-on-error", cmdutil.NoCompletion)
219230

220231
if opts.IO.IsStdinTTY() {
@@ -265,7 +276,7 @@ func complete(ctx context.Context, opts *options) error {
265276
}, &opts.Dataset, opts.IO.SurveyIO())
266277
}
267278

268-
func run(ctx context.Context, opts *options, flushEverySet, csvFieldsSet bool) error {
279+
func run(ctx context.Context, opts *options, flushEverySet, batchSizeSet, csvFieldsSet bool) error {
269280
client, err := opts.Client(ctx)
270281
if err != nil {
271282
return err
@@ -305,20 +316,23 @@ func run(ctx context.Context, opts *options, flushEverySet, csvFieldsSet bool) e
305316
typ = opts.ContentType
306317
}
307318

308-
if flushEverySet && typ != axiom.NDJSON {
309-
return cmdutil.NewFlagErrorf("--flush-every not valid when content type is not newline delimited JSON")
310-
}
311319
if opts.Delimiter != "" && typ != axiom.CSV {
312320
return cmdutil.NewFlagErrorf("--delimier/-d not valid when content type is not CSV")
313321
}
314322

315323
var (
316-
batchable = typ == axiom.NDJSON || (typ == axiom.CSV && csvFieldsSet)
324+
batchable = (typ == axiom.NDJSON || (typ == axiom.CSV && csvFieldsSet)) &&
325+
opts.ContentEncoding == axiom.Identity
317326
ingestRes *ingest.Status
318327
)
319-
if filename == "stdin" && batchable && opts.ContentEncoding == axiom.Identity {
328+
if batchable {
320329
ingestRes, err = ingestEvery(ctx, client, r, typ, opts)
321330
} else {
331+
if flushEverySet {
332+
return cmdutil.NewFlagErrorf("--flush-every not valid when data is not batchable")
333+
} else if batchSizeSet {
334+
return cmdutil.NewFlagErrorf("--batch-size not valid when data is not batchable")
335+
}
322336
ingestRes, err = ingestReader(ctx, client, r, typ, opts)
323337
}
324338

@@ -375,17 +389,16 @@ func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, typ axi
375389
defer t.Stop()
376390

377391
readers := make(chan io.Reader)
378-
379392
go func() {
380393
defer close(readers)
381394

382395
// Add first reader.
383396
pr, pw := io.Pipe()
384397
readers <- pr
385398

386-
// Start with a 64 byte buffer, check up until 1 MB per line.
399+
// Start with a 1 KB buffer, check up until 1 MB per line.
387400
scanner := bufio.NewScanner(r)
388-
scanner.Buffer(make([]byte, 64), 1024*1024)
401+
scanner.Buffer(make([]byte, 1024), 1024*1024)
389402
scanner.Split(splitLinesMulti)
390403

391404
// We need to scan in a go func to make sure we don't block on
@@ -413,23 +426,35 @@ func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, typ axi
413426
}
414427
}()
415428

429+
var lineCount uint
430+
flushBatch := func() {
431+
if err := pw.Close(); err != nil {
432+
return
433+
}
434+
435+
pr, pw = io.Pipe()
436+
readers <- pr
437+
438+
lineCount = 0
439+
t.Reset(opts.FlushEvery)
440+
}
416441
for {
417442
select {
418443
case <-ctx.Done():
419444
_ = pw.CloseWithError(ctx.Err())
420445
return
421446
case <-t.C:
422-
if err := pw.Close(); err != nil {
423-
return
447+
flushBatch()
448+
case line := <-lines:
449+
if lineCount >= opts.BatchSize {
450+
flushBatch()
424451
}
425452

426-
pr, pw = io.Pipe()
427-
readers <- pr
428-
case line := <-lines:
429453
if _, err := pw.Write(line); err != nil {
430454
_ = pw.CloseWithError(err)
431455
return
432456
}
457+
lineCount++
433458
case <-done:
434459
_ = pw.Close()
435460
return
@@ -480,7 +505,7 @@ func ingestReader(ctx context.Context, client *axiom.Client, r io.Reader, typ ax
480505
ingestOptions = append(ingestOptions, opts.Labels...)
481506
ingestOptions = append(ingestOptions, opts.CSVFields...)
482507

483-
res, err := client.Datasets.Ingest(ctx, opts.Dataset, r, typ, enc, ingestOptions...)
508+
res, err := client.Ingest(ctx, opts.Dataset, r, typ, enc, ingestOptions...)
484509
if err != nil {
485510
return nil, err
486511
}

internal/cmd/query/query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func run(ctx context.Context, opts *options) error {
154154
progStop := opts.IO.StartActivityIndicator()
155155
defer progStop()
156156

157-
res, err := client.Datasets.Query(ctx, opts.Query,
157+
res, err := client.Query(ctx, opts.Query,
158158
query.SetStartTime(opts.startTime),
159159
query.SetEndTime(opts.endTime),
160160
)

internal/cmd/stream/stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func run(ctx context.Context, opts *options) error {
138138
for {
139139
queryCtx, queryCancel := context.WithTimeout(ctx, streamingDuration)
140140

141-
res, err := client.Datasets.QueryLegacy(queryCtx, opts.Dataset, querylegacy.Query{
141+
res, err := client.QueryLegacy(queryCtx, opts.Dataset, querylegacy.Query{
142142
StartTime: lastRequest,
143143
EndTime: time.Now(),
144144
}, querylegacy.Options{

0 commit comments

Comments
 (0)