Skip to content

Commit

Permalink
Fix ES integration test race conditions
Browse files Browse the repository at this point in the history
- Add template creation retry with timeout in factory
- Add context to CreateTemplates in writer
- Ensure templates are created before returning writer

Fixes jaegertracing#6094
  • Loading branch information
madmecodes committed Nov 20, 2024
1 parent 295146c commit a19bc63
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
31 changes: 29 additions & 2 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"path/filepath"
"strings"
"sync/atomic"
"time"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -185,7 +186,9 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return createSpanWriter(ctx, f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
}

// CreateDependencyReader implements storage.Factory
Expand Down Expand Up @@ -238,6 +241,7 @@ func createSpanReader(
}

func createSpanWriter(
ctx context.Context,
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
Expand Down Expand Up @@ -276,13 +280,36 @@ func createSpanWriter(
if err != nil {
return nil, err
}
if err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.Indices.IndexPrefix); err != nil {
if err := CreateTemplatesWithRetry(ctx, writer, spanMapping, serviceMapping, cfg.Indices.IndexPrefix, logger); err != nil {
return nil, err
}
}
return writer, nil
}

func CreateTemplatesWithRetry(
ctx context.Context,
writer *esSpanStore.SpanWriter,
spanMapping string,
serviceMapping string,
indexPrefix cfg.IndexPrefix,
logger *zap.Logger,
) error {
for i := 0; i < 3; i++ {
select {
case <-ctx.Done():
return fmt.Errorf("template creation timeout: %w", ctx.Err())
default:
if err := writer.CreateTemplates(ctx, spanMapping, serviceMapping, indexPrefix); err == nil {
return nil
}
logger.Warn("Template creation failed, retrying", zap.Int("attempt", i+1))
time.Sleep(time.Duration(100*(1<<i)) * time.Millisecond)
}
}
return errors.New("failed to create templates after retries")
}

func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
params := esSampleStore.Params{
Client: f.getPrimaryClient,
Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/es/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter {
}

// CreateTemplates creates index templates.
func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate string, indexPrefix cfg.IndexPrefix) error {
func (s *SpanWriter) CreateTemplates(ctx context.Context, spanTemplate, serviceTemplate string, indexPrefix cfg.IndexPrefix) error {
jaegerSpanIdx := indexPrefix.Apply("jaeger-span")
jaegerServiceIdx := indexPrefix.Apply("jaeger-service")
_, err := s.client().CreateTemplate(jaegerSpanIdx).Body(spanTemplate).Do(context.Background())
_, err := s.client().CreateTemplate(jaegerSpanIdx).Body(spanTemplate).Do(ctx)
if err != nil {
return fmt.Errorf("failed to create template %q: %w", jaegerSpanIdx, err)
}
_, err = s.client().CreateTemplate(jaegerServiceIdx).Body(serviceTemplate).Do(context.Background())
_, err = s.client().CreateTemplate(jaegerServiceIdx).Body(serviceTemplate).Do(ctx)
if err != nil {
return fmt.Errorf("failed to create template %q: %w", jaegerServiceIdx, err)
}
Expand Down

0 comments on commit a19bc63

Please sign in to comment.