Skip to content

Commit a55266e

Browse files
committed
Fix ES integration test race conditions
- Add template creation retry with timeout in factory - Add context to CreateTemplates in writer - Ensure templates are created before returning writer Fixes jaegertracing#6094
1 parent 295146c commit a55266e

File tree

2 files changed

+32
-5
lines changed

2 files changed

+32
-5
lines changed

plugin/storage/es/factory.go

+29-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"path/filepath"
1515
"strings"
1616
"sync/atomic"
17+
"time"
1718

1819
"github.com/spf13/viper"
1920
"go.opentelemetry.io/otel"
@@ -185,7 +186,9 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
185186

186187
// CreateSpanWriter implements storage.Factory
187188
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
188-
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
189+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
190+
defer cancel()
191+
return createSpanWriter(ctx, f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
189192
}
190193

191194
// CreateDependencyReader implements storage.Factory
@@ -238,6 +241,7 @@ func createSpanReader(
238241
}
239242

240243
func createSpanWriter(
244+
ctx context.Context,
241245
clientFn func() es.Client,
242246
cfg *config.Configuration,
243247
archive bool,
@@ -276,13 +280,36 @@ func createSpanWriter(
276280
if err != nil {
277281
return nil, err
278282
}
279-
if err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.Indices.IndexPrefix); err != nil {
283+
if err := CreateTemplatesWithRetry(ctx, writer, spanMapping, serviceMapping, cfg.Indices.IndexPrefix, logger); err != nil {
280284
return nil, err
281285
}
282286
}
283287
return writer, nil
284288
}
285289

290+
func CreateTemplatesWithRetry(
291+
ctx context.Context,
292+
writer *esSpanStore.SpanWriter,
293+
spanMapping string,
294+
serviceMapping string,
295+
indexPrefix cfg.IndexPrefix,
296+
logger *zap.Logger,
297+
) error {
298+
for i := 0; i < 3; i++ {
299+
select {
300+
case <-ctx.Done():
301+
return fmt.Errorf("template creation timeout: %w", ctx.Err())
302+
default:
303+
if err := writer.CreateTemplates(ctx, spanMapping, serviceMapping, indexPrefix); err == nil {
304+
return nil
305+
}
306+
logger.Warn("Template creation failed, retrying", zap.Int("attempt", i+1))
307+
time.Sleep(time.Duration(100*(1<<i)) * time.Millisecond)
308+
}
309+
}
310+
return errors.New("failed to create templates after retries")
311+
}
312+
286313
func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
287314
params := esSampleStore.Params{
288315
Client: f.getPrimaryClient,

plugin/storage/es/spanstore/writer.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,14 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter {
8181
}
8282

8383
// CreateTemplates creates index templates.
84-
func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate string, indexPrefix cfg.IndexPrefix) error {
84+
func (s *SpanWriter) CreateTemplates(ctx context.Context, spanTemplate, serviceTemplate string, indexPrefix cfg.IndexPrefix) error {
8585
jaegerSpanIdx := indexPrefix.Apply("jaeger-span")
8686
jaegerServiceIdx := indexPrefix.Apply("jaeger-service")
87-
_, err := s.client().CreateTemplate(jaegerSpanIdx).Body(spanTemplate).Do(context.Background())
87+
_, err := s.client().CreateTemplate(jaegerSpanIdx).Body(spanTemplate).Do(ctx)
8888
if err != nil {
8989
return fmt.Errorf("failed to create template %q: %w", jaegerSpanIdx, err)
9090
}
91-
_, err = s.client().CreateTemplate(jaegerServiceIdx).Body(serviceTemplate).Do(context.Background())
91+
_, err = s.client().CreateTemplate(jaegerServiceIdx).Body(serviceTemplate).Do(ctx)
9292
if err != nil {
9393
return fmt.Errorf("failed to create template %q: %w", jaegerServiceIdx, err)
9494
}

0 commit comments

Comments
 (0)