Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): support setting fields for csv without header row + csv batching #221

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ issues:
- linters:
- staticcheck
text: "SA1019: client.Datasets.QueryLegacy"
- linters:
- staticcheck
text: "SA1019: client.QueryLegacy"
- linters:
- staticcheck
text: 'SA1019: "github.com/axiomhq/axiom-go/axiom/querylegacy"'
90 changes: 67 additions & 23 deletions internal/cmd/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ type options struct {
// Delimiter that separates CSV fields.
Delimiter string
// FlushEvery flushes the ingestion buffer after the specified duration. It
// is only valid when ingesting a stream of newline delimited JSON objects
// of unknown length.
// is only valid when ingesting batchable data, e.g. newline delimited JSON
// and CSV (with field names explicitly set) data that is not encoded.
FlushEvery time.Duration
// BatchSize to aim for when ingesting batchable data.
BatchSize uint
// ContentType of the data to ingest.
ContentType axiom.ContentType
contentType string // for the flag value
Expand All @@ -65,6 +67,10 @@ type options struct {
// Labels attached to every event, server-side.
Labels []ingest.Option
labels []string // for the flag value
// CSVFields are the field names for the CSV data. This is handy if the data
// to ingest does not have a header row.
CSVFields []ingest.Option
csvFields []string
// ContinueOnError will continue ingesting, even if an error is returned
// from the server.
ContinueOnError bool
Expand All @@ -77,7 +83,7 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
}

cmd := &cobra.Command{
Use: "ingest <dataset-name> [(-f|--file) <filename> [ ...]] [--timestamp-field <timestamp-field>] [--timestamp-format <timestamp-format>] [--flush-every <duration>] [(-t|--content-type <content-type>] [(-e|--content-encoding <content-encoding>] [(-l|--label) <key>:<value> [ ...]]",
Use: "ingest <dataset-name> [(-f|--file) <filename> [ ...]] [--timestamp-field <timestamp-field>] [--timestamp-format <timestamp-format>] [(-d|--delimiter <delimiter>] [--flush-every <duration>] [(-b|--batch-size <batch-size>] [(-t|--content-type <content-type>] [(-e|--content-encoding <content-encoding>] [(-l|--label) <key>:<value> [ ...]] [--csv-fields <field> [ ...]] [--continue-on-error <TRUE|FALSE>]",
Short: "Ingest structured data",
Long: heredoc.Doc(`
Ingest structured data into an Axiom dataset.
Expand Down Expand Up @@ -135,6 +141,11 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
# to every events, so there is no need to add them to the data,
# locally:
$ cat log*.json.gz | axiom ingest http-logs -t=json -e=gzip -l=env:prod -l=app:webserver

# Send a CSV file to a dataset called "sec-logs". The CSV file does
# not have a header row, so the field names are set manually. This
# also comes in handy as the file is now automatically batched.
$ axiom ingest sec-logs -f sec-logs.csv -t=csv --csv-fields=timestamp,source,severity,message
lukasmalkmus marked this conversation as resolved.
Show resolved Hide resolved
`),

Annotations: map[string]string{
Expand Down Expand Up @@ -176,30 +187,45 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
opts.Labels = append(opts.Labels, ingest.SetEventLabel(splits[0], splits[1]))
}

// Populate the CSV fields.
for _, field := range opts.csvFields {
opts.CSVFields = append(opts.CSVFields, ingest.AddCSVField(field))
}

if err := complete(cmd.Context(), opts); err != nil {
return err
}
return run(cmd.Context(), opts, cmd.Flag("flush-every").Changed)
return run(
cmd.Context(),
opts,
cmd.Flag("flush-every").Changed,
cmd.Flag("batch-size").Changed,
cmd.Flag("csv-fields").Changed,
)
},
}

cmd.Flags().StringSliceVarP(&opts.Filenames, "file", "f", nil, "File(s) to ingest (- to read from stdin). If stdin is a pipe the default value is -, otherwise this is a required parameter")
cmd.Flags().StringVar(&opts.TimestampField, "timestamp-field", "", "Field to take the ingestion time from (defaults to _time)")
cmd.Flags().StringVar(&opts.TimestampFormat, "timestamp-format", "", "Format used in the the timestamp field. Default uses a heuristic parser. Must be expressed using the reference time 'Mon Jan 2 15:04:05 -0700 MST 2006'")
cmd.Flags().StringVarP(&opts.Delimiter, "delimiter", "d", "", "Delimiter that separates CSV fields (only valid when input is CSV")
cmd.Flags().DurationVar(&opts.FlushEvery, "flush-every", time.Second, "Buffer flush interval for newline delimited JSON streams of unknown length")
cmd.Flags().DurationVar(&opts.FlushEvery, "flush-every", time.Second*5, "Buffer flush interval for batchable data")
cmd.Flags().UintVarP(&opts.BatchSize, "batch-size", "b", 10_000, "Batch size to aim for")
cmd.Flags().StringVarP(&opts.contentType, "content-type", "t", "", "Content type of the data to ingest (will auto-detect if not set, must be set if content encoding is set and content type is not identity)")
cmd.Flags().StringVarP(&opts.contentEncoding, "content-encoding", "e", axiom.Identity.String(), "Content encoding of the data to ingest")
cmd.Flags().StringSliceVarP(&opts.labels, "label", "l", nil, "Labels to attach to the ingested events, server side")
cmd.Flags().StringSliceVar(&opts.csvFields, "csv-fields", nil, "CSV header fields to use as event field names, server side (e.g. if there is no header row)")
cmd.Flags().BoolVar(&opts.ContinueOnError, "continue-on-error", false, "Don't fail on ingest errors (use with care!)")

_ = cmd.RegisterFlagCompletionFunc("timestamp-field", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("timestamp-format", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("delimiter", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("flush-every", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("batch-size", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("content-type", contentTypeCompletion)
_ = cmd.RegisterFlagCompletionFunc("content-encoding", contentEncodingCompletion)
_ = cmd.RegisterFlagCompletionFunc("label", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("csv-fields", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("continue-on-error", cmdutil.NoCompletion)

if opts.IO.IsStdinTTY() {
Expand Down Expand Up @@ -250,7 +276,7 @@ func complete(ctx context.Context, opts *options) error {
}, &opts.Dataset, opts.IO.SurveyIO())
}

func run(ctx context.Context, opts *options, flushEverySet bool) error {
func run(ctx context.Context, opts *options, flushEverySet, batchSizeSet, csvFieldsSet bool) error {
client, err := opts.Client(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -290,17 +316,23 @@ func run(ctx context.Context, opts *options, flushEverySet bool) error {
typ = opts.ContentType
}

if flushEverySet && typ != axiom.NDJSON {
return cmdutil.NewFlagErrorf("--flush-every not valid when content type is not newline delimited JSON")
}
if opts.Delimiter != "" && typ != axiom.CSV {
return cmdutil.NewFlagErrorf("--delimier/-d not valid when content type is not CSV")
}

var ingestRes *ingest.Status
if filename == "stdin" && typ == axiom.NDJSON && opts.ContentEncoding == axiom.Identity {
ingestRes, err = ingestEvery(ctx, client, r, opts)
var (
batchable = (typ == axiom.NDJSON || (typ == axiom.CSV && csvFieldsSet)) &&
opts.ContentEncoding == axiom.Identity
ingestRes *ingest.Status
)
if batchable {
ingestRes, err = ingestEvery(ctx, client, r, typ, opts)
} else {
if flushEverySet {
return cmdutil.NewFlagErrorf("--flush-every not valid when data is not batchable")
} else if batchSizeSet {
return cmdutil.NewFlagErrorf("--batch-size not valid when data is not batchable")
}
ingestRes, err = ingestReader(ctx, client, r, typ, opts)
}

Expand Down Expand Up @@ -352,22 +384,21 @@ func run(ctx context.Context, opts *options, flushEverySet bool) error {
return lastErr
}

func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, opts *options) (*ingest.Status, error) {
func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, typ axiom.ContentType, opts *options) (*ingest.Status, error) {
t := time.NewTicker(opts.FlushEvery)
defer t.Stop()

readers := make(chan io.Reader)

go func() {
defer close(readers)

// Add first reader.
pr, pw := io.Pipe()
readers <- pr

// Start with a 64 byte buffer, check up until 1 MB per line.
// Start with a 1 KB buffer, check up until 1 MB per line.
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 64), 1024*1024)
scanner.Buffer(make([]byte, 1024), 1024*1024)
scanner.Split(splitLinesMulti)

// We need to scan in a go func to make sure we don't block on
Expand Down Expand Up @@ -396,23 +427,35 @@ func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, opts *o
}
}()

var lineCount uint
flushBatch := func() {
if err := pw.Close(); err != nil {
return
}

pr, pw = io.Pipe()
readers <- pr

lineCount = 0
t.Reset(opts.FlushEvery)
}
for {
select {
case <-ctx.Done():
_ = pw.CloseWithError(ctx.Err())
return
case <-t.C:
if err := pw.Close(); err != nil {
return
flushBatch()
case line := <-lines:
if lineCount >= opts.BatchSize {
flushBatch()
}

pr, pw = io.Pipe()
readers <- pr
case line := <-lines:
if _, err := pw.Write(line); err != nil {
_ = pw.CloseWithError(err)
return
}
lineCount++
case <-done:
_ = pw.Close()
return
Expand All @@ -424,7 +467,7 @@ func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, opts *o

var res ingest.Status
for r := range readers {
ingestRes, err := ingestReader(ctx, client, r, axiom.NDJSON, opts)
ingestRes, err := ingestReader(ctx, client, r, typ, opts)
if err != nil {
if opts.ContinueOnError {
fmt.Fprintf(opts.IO.ErrOut(), "%s Failed to ingest: %v, continuing...\n",
Expand Down Expand Up @@ -463,8 +506,9 @@ func ingestReader(ctx context.Context, client *axiom.Client, r io.Reader, typ ax
ingestOptions = append(ingestOptions, ingest.SetCSVDelimiter(v))
}
ingestOptions = append(ingestOptions, opts.Labels...)
ingestOptions = append(ingestOptions, opts.CSVFields...)

res, err := client.Datasets.Ingest(ctx, opts.Dataset, r, typ, enc, ingestOptions...)
res, err := client.Ingest(ctx, opts.Dataset, r, typ, enc, ingestOptions...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func run(ctx context.Context, opts *options) error {
progStop := opts.IO.StartActivityIndicator()
defer progStop()

res, err := client.Datasets.Query(ctx, opts.Query,
res, err := client.Query(ctx, opts.Query,
query.SetStartTime(opts.startTime),
query.SetEndTime(opts.endTime),
)
Expand Down
Loading