diff --git a/bulk_indexer.go b/bulk_indexer.go index 80044ba..cf1e34d 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -279,13 +279,14 @@ func (b *BulkIndexer) BytesUncompressedFlushed() int { type BulkIndexerItem struct { Index string DocumentID string + Pipeline string Body io.WriterTo DynamicTemplates map[string]string } // Add encodes an item in the buffer. func (b *BulkIndexer) Add(item BulkIndexerItem) error { - b.writeMeta(item.Index, item.DocumentID, item.DynamicTemplates) + b.writeMeta(item.Index, item.DocumentID, item.Pipeline, item.DynamicTemplates) if _, err := item.Body.WriteTo(b.writer); err != nil { return fmt.Errorf("failed to write bulk indexer item: %w", err) } @@ -296,7 +297,7 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error { return nil } -func (b *BulkIndexer) writeMeta(index, documentID string, dynamicTemplates map[string]string) { +func (b *BulkIndexer) writeMeta(index, documentID, pipeline string, dynamicTemplates map[string]string) { b.jsonw.RawString(`{"create":{`) first := true if documentID != "" { @@ -312,6 +313,14 @@ func (b *BulkIndexer) writeMeta(index, documentID string, dynamicTemplates map[s b.jsonw.String(index) first = false } + if pipeline != "" { + if !first { + b.jsonw.RawByte(',') + } + b.jsonw.RawString(`"pipeline":`) + b.jsonw.String(pipeline) + first = false + } if len(dynamicTemplates) > 0 { if !first { b.jsonw.RawByte(',') diff --git a/bulk_indexer_test.go b/bulk_indexer_test.go index d8497cc..7c52483 100644 --- a/bulk_indexer_test.go +++ b/bulk_indexer_test.go @@ -165,3 +165,41 @@ func TestDynamicTemplates(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(2), stat.Indexed) } + +func TestPipeline(t *testing.T) { + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result, _, _, pipelines := docappendertest.DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r) + err := json.NewEncoder(w).Encode(result) + require.NoError(t, err) + for _, p := range pipelines { + require.Contains(t, p, "test-pipeline", "test-pipeline should have been present") + } + require.Equal(t, 2, len(pipelines), "2 pipelines should have been returned") + }) + indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Pipeline: "test-pipeline1", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Pipeline: "test-pipeline2", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + }) + require.NoError(t, err) + + stat, err := indexer.Flush(context.Background()) + require.NoError(t, err) + require.Equal(t, int64(2), stat.Indexed) +} diff --git a/docappendertest/docappendertest.go b/docappendertest/docappendertest.go index ee224cb..39fb6fe 100644 --- a/docappendertest/docappendertest.go +++ b/docappendertest/docappendertest.go @@ -67,6 +67,19 @@ func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) ( res esutil.BulkIndexerResponse, stats RequestStats, dynamicTemplates []map[string]string) { + + indexed, result, stats, dynamicTemplates, _ := DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r) + return indexed, result, stats, dynamicTemplates +} + +// DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines decodes a /_bulk request's body, +// returning the decoded documents and a response body and stats about request, per-request dynamic templates and pipelines specified in the event. +func DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r *http.Request) ( + docs [][]byte, + res esutil.BulkIndexerResponse, + stats RequestStats, + dynamicTemplates []map[string]string, + pipelines []string) { body := r.Body switch r.Header.Get("Content-Encoding") { case "gzip": @@ -89,6 +102,7 @@ func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) ( action := make(map[string]struct { Index string `json:"_index"` DynamicTemplates map[string]string `json:"dynamic_templates"` + Pipeline string `json:"pipeline"` }) if err := json.NewDecoder(strings.NewReader(scanner.Text())).Decode(&action); err != nil { panic(err) @@ -109,8 +123,9 @@ func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) ( item := esutil.BulkIndexerResponseItem{Status: http.StatusCreated, Index: action[actionType].Index} result.Items = append(result.Items, map[string]esutil.BulkIndexerResponseItem{actionType: item}) dynamicTemplates = append(dynamicTemplates, action[actionType].DynamicTemplates) + pipelines = append(pipelines, action[actionType].Pipeline) } - return indexed, result, RequestStats{int64(cr.bytesRead)}, dynamicTemplates + return indexed, result, RequestStats{int64(cr.bytesRead)}, dynamicTemplates, pipelines } // NewMockElasticsearchClient returns an elasticsearch.Client which sends /_bulk requests to bulkHandler.