@@ -162,8 +162,7 @@ func TestExporter_PushEvent(t *testing.T) {
162162 })
163163
164164 exporter := newTestExporter (t , server .URL )
165- mustSend (t , exporter , `{"message": "test1"}` )
166- mustSend (t , exporter , `{"message": "test2"}` )
165+ mustSend (t , exporter , `{"message": "test1"}` , `{"message": "test2"}` )
167166
168167 rec .WaitItems (2 )
169168 })
@@ -392,7 +391,6 @@ func TestExporter_PushEvent(t *testing.T) {
392391 exporter := newTestExporter (t , server .URL , func (cfg * Config ) { * cfg = * testConfig })
393392 mustSend (t , exporter , `{"message": "test1"}` )
394393
395- time .Sleep (200 * time .Millisecond )
396394 assert .Equal (t , int64 (1 ), attempts .Load ())
397395 })
398396 }
@@ -408,9 +406,9 @@ func TestExporter_PushEvent(t *testing.T) {
408406 })
409407
410408 exporter := newTestExporter (t , server .URL )
411- mustSend (t , exporter , `{"message": "test1"}` )
409+ err := send (t , exporter , `{"message": "test1"}` )
410+ assert .ErrorContains (t , err , "flush failed" )
412411
413- time .Sleep (200 * time .Millisecond )
414412 assert .Equal (t , int64 (1 ), attempts .Load ())
415413 })
416414
@@ -444,7 +442,6 @@ func TestExporter_PushEvent(t *testing.T) {
444442 exporter := newTestExporter (t , server .URL )
445443 mustSend (t , exporter , `{"message": "test1"}` )
446444
447- time .Sleep (200 * time .Millisecond )
448445 assert .Equal (t , int64 (1 ), attempts .Load ())
449446 })
450447
@@ -482,9 +479,8 @@ func TestExporter_PushEvent(t *testing.T) {
482479 cfg .Retry .InitialInterval = 1 * time .Millisecond
483480 cfg .Retry .MaxInterval = 10 * time .Millisecond
484481 })
485- mustSend (t , exporter , `{"message": "test1", "idx": 0}` )
486- mustSend (t , exporter , `{"message": "test2", "idx": 1}` )
487- mustSend (t , exporter , `{"message": "test3", "idx": 2}` )
482+ mustSend (t , exporter , `{"message": "test1", "idx": 0}` ,
483+ `{"message": "test2", "idx": 1}` , `{"message": "test3", "idx": 2}` )
488484
489485 wg .Wait () // <- this blocks forever if the event is not retried
490486
@@ -515,8 +511,22 @@ func withTestExporterConfig(fns ...func(*Config)) func(string) *Config {
515511 }
516512}
517513
518- func mustSend (t * testing.T , exporter * elasticsearchLogsExporter , contents string ) {
519- err := pushDocuments (context .TODO (), exporter .index , []byte (contents ), exporter .bulkIndexer )
514+ func send (t * testing.T , exporter * elasticsearchLogsExporter , contents ... string ) error {
515+ req := request {
516+ bulkIndexer : exporter .bulkIndexer ,
517+ Items : nil ,
518+ }
519+ for _ , body := range contents {
520+ req .Add (bulkIndexerItem {
521+ Index : exporter .index ,
522+ Body : []byte (body ),
523+ })
524+ }
525+ return req .Export (context .TODO ())
526+ }
527+
528+ func mustSend (t * testing.T , exporter * elasticsearchLogsExporter , contents ... string ) {
529+ err := send (t , exporter , contents ... )
520530 require .NoError (t , err )
521531}
522532
@@ -528,6 +538,13 @@ func mustSendLogsWithAttributes(t *testing.T, exporter *elasticsearchLogsExporte
528538 logRecords := scopeLog .LogRecords ().At (0 )
529539 logRecords .Body ().SetStr (body )
530540
531- err := exporter .pushLogRecord (context .TODO (), resSpans .Resource (), logRecords , scopeLog .Scope ())
541+ req := request {
542+ bulkIndexer : exporter .bulkIndexer ,
543+ Items : nil ,
544+ }
545+ item , err := exporter .logRecordToItem (context .TODO (), resSpans .Resource (), logRecords , scopeLog .Scope ())
546+ require .NoError (t , err )
547+ req .Add (item )
548+ err = req .Export (context .TODO ())
532549 require .NoError (t , err )
533550}
0 commit comments