Skip to content

Commit

Permalink
feat(ingest): support setting fields for csv without header row + csv…
Browse files Browse the repository at this point in the history
… batching
  • Loading branch information
lukasmalkmus committed Apr 3, 2024
1 parent 073cda7 commit 553727b
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/AlecAivazis/survey/v2 v2.3.7
github.com/MakeNowJust/heredoc v1.0.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/axiomhq/axiom-go v0.17.3
github.com/axiomhq/axiom-go v0.17.4
github.com/axiomhq/pkg v0.6.0
github.com/briandowns/spinner v1.23.0
github.com/cli/cli v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/awslabs/amazon-ecr-credential-helper/ecr-login v0.0.0-20231024185945-8841054dbdb8 h1:SoFYaT9UyGkR0+nogNyD/Lj+bsixB+SNuAS4ABlEs6M=
github.com/awslabs/amazon-ecr-credential-helper/ecr-login v0.0.0-20231024185945-8841054dbdb8/go.mod h1:2JF49jcDOrLStIXN/j/K1EKRq8a8R2qRnlZA6/o/c7c=
github.com/axiomhq/axiom-go v0.17.3 h1:FklpZQneBc0BydD04GAup3J3ukDXvuWWJW71rgOdKok=
github.com/axiomhq/axiom-go v0.17.3/go.mod h1:giSWQNKS2AQr0KD0QpdqXsYHWPh1BBgNDW3jA4cTRcM=
github.com/axiomhq/axiom-go v0.17.4 h1:2abUAONJhmraTWgtjm0U8pGZ+8NpWv2nniitS1MvJzo=
github.com/axiomhq/axiom-go v0.17.4/go.mod h1:giSWQNKS2AQr0KD0QpdqXsYHWPh1BBgNDW3jA4cTRcM=
github.com/axiomhq/pkg v0.6.0 h1:elslidLCtDB+GPOx+1VzXijf+BT28+HnPUir8vSaEkA=
github.com/axiomhq/pkg v0.6.0/go.mod h1:0Vu7mQOXdsxpYYyGIm5QfHwafGboyyyoz8FIhSvA30M=
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
Expand Down
33 changes: 26 additions & 7 deletions internal/cmd/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type options struct {
// Labels attached to every event, server-side.
Labels []ingest.Option
labels []string // for the flag value
// CSVFields are the field names for the CSV data. This is handy if the data
// to ingest does not have a header row.
CSVFields []ingest.Option
csvFields []string
// ContinueOnError will continue ingesting, even if an error is returned
// from the server.
ContinueOnError bool
Expand Down Expand Up @@ -135,6 +139,11 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
# to every events, so there is no need to add them to the data,
# locally:
$ cat log*.json.gz | axiom ingest http-logs -t=json -e=gzip -l=env:prod -l=app:webserver
# Send a CSV file to a dataset called "sec-logs". The CSV file does
# not have a header row, so the field names are set manually. This
# also comes in handy as the file is now automatically batched.
$ axiom ingest sec-logs -f sec-logs.csv -t=csv --csv-fields=timestamp,source,severity,message
`),

Annotations: map[string]string{
Expand Down Expand Up @@ -176,10 +185,15 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
opts.Labels = append(opts.Labels, ingest.SetEventLabel(splits[0], splits[1]))
}

// Populate the CSV fields.
for _, field := range opts.csvFields {
opts.CSVFields = append(opts.CSVFields, ingest.AddCSVField(field))
}

if err := complete(cmd.Context(), opts); err != nil {
return err
}
return run(cmd.Context(), opts, cmd.Flag("flush-every").Changed)
return run(cmd.Context(), opts, cmd.Flag("flush-every").Changed, cmd.Flag("csv-fields").Changed)
},
}

Expand All @@ -191,6 +205,7 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
cmd.Flags().StringVarP(&opts.contentType, "content-type", "t", "", "Content type of the data to ingest (will auto-detect if not set, must be set if content encoding is set and content type is not identity)")
cmd.Flags().StringVarP(&opts.contentEncoding, "content-encoding", "e", axiom.Identity.String(), "Content encoding of the data to ingest")
cmd.Flags().StringSliceVarP(&opts.labels, "label", "l", nil, "Labels to attach to the ingested events, server side")
cmd.Flags().StringSliceVar(&opts.csvFields, "csv-fields", nil, "CSV header fields to use as event field names, server side (e.g. if there is no header row)")
cmd.Flags().BoolVar(&opts.ContinueOnError, "continue-on-error", false, "Don't fail on ingest errors (use with care!)")

_ = cmd.RegisterFlagCompletionFunc("timestamp-field", cmdutil.NoCompletion)
Expand Down Expand Up @@ -250,7 +265,7 @@ func complete(ctx context.Context, opts *options) error {
}, &opts.Dataset, opts.IO.SurveyIO())
}

func run(ctx context.Context, opts *options, flushEverySet bool) error {
func run(ctx context.Context, opts *options, flushEverySet, csvFieldsSet bool) error {
client, err := opts.Client(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -297,9 +312,12 @@ func run(ctx context.Context, opts *options, flushEverySet bool) error {
return cmdutil.NewFlagErrorf("--delimier/-d not valid when content type is not CSV")
}

var ingestRes *ingest.Status
if filename == "stdin" && typ == axiom.NDJSON && opts.ContentEncoding == axiom.Identity {
ingestRes, err = ingestEvery(ctx, client, r, opts)
var (
batchable = typ == axiom.NDJSON || (typ == axiom.CSV && csvFieldsSet)
ingestRes *ingest.Status
)
if filename == "stdin" && batchable && opts.ContentEncoding == axiom.Identity {
ingestRes, err = ingestEvery(ctx, client, r, typ, opts)
} else {
ingestRes, err = ingestReader(ctx, client, r, typ, opts)
}
Expand Down Expand Up @@ -352,7 +370,7 @@ func run(ctx context.Context, opts *options, flushEverySet bool) error {
return lastErr
}

func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, opts *options) (*ingest.Status, error) {
func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, typ axiom.ContentType, opts *options) (*ingest.Status, error) {
t := time.NewTicker(opts.FlushEvery)
defer t.Stop()

Expand Down Expand Up @@ -421,7 +439,7 @@ func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, opts *o

var res ingest.Status
for r := range readers {
ingestRes, err := ingestReader(ctx, client, r, axiom.NDJSON, opts)
ingestRes, err := ingestReader(ctx, client, r, typ, opts)
if err != nil {
if opts.ContinueOnError {
fmt.Fprintf(opts.IO.ErrOut(), "%s Failed to ingest: %v, continuing...\n",
Expand Down Expand Up @@ -460,6 +478,7 @@ func ingestReader(ctx context.Context, client *axiom.Client, r io.Reader, typ ax
ingestOptions = append(ingestOptions, ingest.SetCSVDelimiter(v))
}
ingestOptions = append(ingestOptions, opts.Labels...)
ingestOptions = append(ingestOptions, opts.CSVFields...)

res, err := client.Datasets.Ingest(ctx, opts.Dataset, r, typ, enc, ingestOptions...)
if err != nil {
Expand Down

0 comments on commit 553727b

Please sign in to comment.