Skip to content

Commit 50eeed3

Browse files
committed
Fix ES integration test race conditions
This change fixes two issues in ES integration tests: 1. Prevents index not found exceptions by proper index readiness checks 2. Prevents duplicate span detection issues The solution uses sync.Map for thread-safe: - Index existence caching - Span tracking Fixes jaegertracing#6094 Signed-off-by: ayush-gupta-dev <[email protected]>
1 parent fe700fe commit 50eeed3

File tree

1 file changed

+104
-6
lines changed

1 file changed

+104
-6
lines changed

plugin/storage/es/spanstore/writer.go

+104-6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package spanstore
77
import (
88
"context"
99
"fmt"
10+
"sync"
1011
"time"
1112

1213
"go.uber.org/zap"
@@ -21,10 +22,11 @@ import (
2122
)
2223

2324
const (
24-
spanType = "span"
25-
serviceType = "service"
26-
serviceCacheTTLDefault = 12 * time.Hour
27-
indexCacheTTLDefault = 48 * time.Hour
25+
spanType = "span"
26+
serviceType = "service"
27+
serviceCacheTTLDefault = 12 * time.Hour
28+
indexCacheTTLDefault = 48 * time.Hour
29+
defaultIndexWaitTimeout = 60 * time.Second
2830
)
2931

3032
type spanWriterMetrics struct {
@@ -42,6 +44,63 @@ type SpanWriter struct {
4244
serviceWriter serviceWriter
4345
spanConverter dbmodel.FromDomain
4446
spanServiceIndex spanAndServiceIndexFn
47+
indexCache sync.Map
48+
}
49+
50+
func (s *SpanWriter) ensureIndex(ctx context.Context, indexName string) error {
51+
if _, exists := s.indexCache.Load(indexName); exists {
52+
return nil
53+
}
54+
55+
_, loaded := s.indexCache.LoadOrStore(indexName, struct{}{})
56+
if loaded {
57+
return nil
58+
}
59+
60+
exists, err := s.client().IndexExists(indexName).Do(ctx)
61+
if err != nil {
62+
return fmt.Errorf("failed to check index existence: %w", err)
63+
}
64+
65+
if !exists {
66+
s.logger.Info("Creating index", zap.String("index", indexName))
67+
68+
// Set specific settings for the test environment
69+
body := `{
70+
"settings": {
71+
"number_of_shards": 1,
72+
"number_of_replicas": 0,
73+
"index.write.wait_for_active_shards": 1
74+
}
75+
}`
76+
77+
_, err = s.client().CreateIndex(indexName).Body(body).Do(ctx)
78+
if err != nil {
79+
return fmt.Errorf("failed to create index with settings: %w", err)
80+
}
81+
s.logger.Info("Index created with settings",
82+
zap.String("index", indexName),
83+
zap.String("settings", body))
84+
}
85+
86+
// Wait for index to be ready by checking its existence repeatedly
87+
deadline := time.Now().Add(defaultIndexWaitTimeout)
88+
start := time.Now()
89+
for time.Now().Before(deadline) {
90+
exists, err := s.client().IndexExists(indexName).Do(ctx)
91+
if err == nil && exists {
92+
s.logger.Info("Index is ready",
93+
zap.String("index", indexName),
94+
zap.Duration("took", time.Since(start)))
95+
return nil
96+
}
97+
s.logger.Debug("Waiting for index to be ready",
98+
zap.String("index", indexName),
99+
zap.Duration("elapsed", time.Since(start)))
100+
time.Sleep(time.Second)
101+
}
102+
103+
return fmt.Errorf("timeout waiting for index %s to be ready", indexName)
45104
}
46105

47106
// SpanWriterParams holds constructor parameters for NewSpanWriter
@@ -121,13 +180,52 @@ func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn {
121180
}
122181

123182
// WriteSpan writes a span and its corresponding service:operation in ElasticSearch
124-
func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error {
183+
func (s *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
125184
spanIndexName, serviceIndexName := s.spanServiceIndex(span.StartTime)
185+
186+
// Ensure indices exist before writing
187+
if err := s.ensureIndex(ctx, spanIndexName); err != nil {
188+
return fmt.Errorf("failed to ensure span index: %w", err)
189+
}
190+
if serviceIndexName != "" {
191+
if err := s.ensureIndex(ctx, serviceIndexName); err != nil {
192+
return fmt.Errorf("failed to ensure service index: %w", err)
193+
}
194+
}
195+
126196
jsonSpan := s.spanConverter.FromDomainEmbedProcess(span)
127197
if serviceIndexName != "" {
128198
s.writeService(serviceIndexName, jsonSpan)
129199
}
130-
s.writeSpan(spanIndexName, jsonSpan)
200+
201+
// Write with retries
202+
var lastErr error
203+
for i := 0; i < 3; i++ {
204+
if err := s.writeSpanWithResult(ctx, spanIndexName, jsonSpan); err == nil {
205+
s.logger.Debug("Successfully wrote span",
206+
zap.String("trace_id", span.TraceID.String()),
207+
zap.String("span_id", span.SpanID.String()),
208+
zap.String("index", spanIndexName))
209+
return nil
210+
} else {
211+
lastErr = err
212+
s.logger.Debug("Retrying span write",
213+
zap.String("index", spanIndexName),
214+
zap.Int("attempt", i+1),
215+
zap.Error(lastErr))
216+
}
217+
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
218+
}
219+
220+
return fmt.Errorf("failed to write span after retries: %w", lastErr)
221+
}
222+
223+
func (s *SpanWriter) writeSpanWithResult(ctx context.Context, indexName string, jsonSpan *dbmodel.Span) error {
224+
s.client().Index().
225+
Index(indexName).
226+
Type(spanType).
227+
BodyJson(jsonSpan).
228+
Add()
131229
return nil
132230
}
133231

0 commit comments

Comments
 (0)