Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade storage integration test to v2 Trace Reader #6388

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
8 changes: 6 additions & 2 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner"
"github.com/jaegertracing/jaeger/plugin/storage/integration"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const otlpPort = 4317
Expand Down Expand Up @@ -149,8 +150,9 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {

s.SpanWriter, err = createSpanWriter(logger, otlpPort)
require.NoError(t, err)
s.SpanReader, err = createSpanReader(logger, ports.QueryGRPC)
spanReader, err := createSpanReader(logger, ports.QueryGRPC)
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)

t.Cleanup(func() {
// Call e2eCleanUp to close the SpanReader and SpanWriter gRPC connection.
Expand Down Expand Up @@ -207,7 +209,9 @@ func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool {
// e2eCleanUp closes the SpanReader and SpanWriter gRPC connection.
// This function should be called after all the tests are finished.
func (s *E2EStorageIntegration) e2eCleanUp(t *testing.T) {
require.NoError(t, s.SpanReader.(io.Closer).Close())
spanReader, err := v1adapter.GetV1Reader(s.TraceReader)
require.NoError(t, err)
require.NoError(t, spanReader.(io.Closer).Close())
require.NoError(t, s.SpanWriter.(io.Closer).Close())
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/tailsampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ts *TailSamplingIntegration) testTailSamplingProccessor(t *testing.T) {
var actual []string
assert.Eventually(t, func() bool {
var err error
actual, err = ts.SpanReader.GetServices(context.Background())
actual, err = ts.TraceReader.GetServices(context.Background())
require.NoError(t, err)
sort.Strings(actual)
return assert.ObjectsAreEqualValues(ts.expectedServices, actual)
Expand Down
6 changes: 4 additions & 2 deletions plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type BadgerIntegrationStorage struct {
Expand All @@ -35,9 +36,10 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) {
s.SpanWriter, err = s.factory.CreateSpanWriter()
require.NoError(t, err)

s.SpanReader, err = s.factory.CreateSpanReader()
spanReader, err := s.factory.CreateSpanReader()
require.NoError(t, err)

s.TraceReader = v1adapter.NewTraceReader(spanReader)

s.SamplingStore, err = s.factory.CreateSamplingStore(0)
require.NoError(t, err)
}
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type CassandraStorageIntegration struct {
Expand Down Expand Up @@ -74,8 +75,9 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) {
var err error
s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const (
Expand Down Expand Up @@ -134,8 +135,9 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool)
var err error
s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type GRPCStorageIntegrationTestSuite struct {
Expand All @@ -38,8 +39,9 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {

s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
45 changes: 27 additions & 18 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

//go:embed fixtures
Expand All @@ -42,7 +44,7 @@ var fixtures embed.FS
// and RunAll() under different conditions.
type StorageIntegration struct {
SpanWriter spanstore.Writer
SpanReader spanstore.Reader
TraceReader tracestore.Reader
ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer
DependencyWriter dependencystore.Writer
Expand Down Expand Up @@ -79,7 +81,7 @@ type StorageIntegration struct {
// the service name is formatted "query##-service".
type QueryFixtures struct {
Caption string
Query *spanstore.TraceQueryParameters
Query *tracestore.TraceQueryParams
ExpectedFixtures []string
}

Expand Down Expand Up @@ -143,7 +145,7 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
var actual []string
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetServices(context.Background())
actual, err = s.TraceReader.GetServices(context.Background())
if err != nil {
t.Log(err)
return false
Expand All @@ -154,9 +156,10 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
// If the storage backend returns more services than expected, let's log traces for those
t.Log("🛑 Found unexpected services!")
for _, service := range actual {
traces, err := s.SpanReader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{
iterTraces := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{
ServiceName: service,
})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
if err != nil {
t.Log(err)
continue
Expand Down Expand Up @@ -216,8 +219,10 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {

var actual *model.Trace
found := s.waitForCondition(t, func(_ *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
require.NotEmpty(t, traces)
actual = traces[0]
return err == nil && len(actual.Spans) >= len(expected.Spans)
})

Expand All @@ -242,27 +247,27 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) {
s.skipIfNeeded(t)
defer s.cleanUp(t)

var expected []spanstore.Operation
var expected []tracestore.Operation
if s.GetOperationsMissingSpanKind {
expected = []spanstore.Operation{
expected = []tracestore.Operation{
{Name: "example-operation-1"},
{Name: "example-operation-3"},
{Name: "example-operation-4"},
}
} else {
expected = []spanstore.Operation{
expected = []tracestore.Operation{
{Name: "example-operation-1", SpanKind: ""},
{Name: "example-operation-3", SpanKind: "server"},
{Name: "example-operation-4", SpanKind: "client"},
}
}
s.loadParseAndWriteExampleTrace(t)

var actual []spanstore.Operation
var actual []tracestore.Operation
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetOperations(context.Background(),
spanstore.OperationQueryParameters{ServiceName: "example-service-1"})
actual, err = s.TraceReader.GetOperations(context.Background(),
tracestore.OperationQueryParameters{ServiceName: "example-service-1"})
if err != nil {
t.Log(err)
return false
Expand All @@ -289,11 +294,13 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) {

var actual *model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
if err != nil {
t.Log(err)
}
require.NotEmpty(t, traces)
actual = traces[0]
return err == nil && len(actual.Spans) == len(expected.Spans)
})
if !assert.True(t, found) {
Expand All @@ -302,9 +309,10 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) {

t.Run("NotFound error", func(t *testing.T) {
fakeTraceID := model.TraceID{High: 0, Low: 1}
trace, err := s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: fakeTraceID})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: fakeTraceID.ToOTELTraceID()})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
assert.Equal(t, spanstore.ErrTraceNotFound, err)
assert.Nil(t, trace)
assert.Nil(t, traces)
})
}

Expand Down Expand Up @@ -342,11 +350,12 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) {
}
}

func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.TraceQueryParameters, expected []*model.Trace) []*model.Trace {
func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.TraceQueryParams, expected []*model.Trace) []*model.Trace {
var traces []*model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
traces, err = s.SpanReader.FindTraces(context.Background(), query)
iterTraces := s.TraceReader.FindTraces(context.Background(), *query)
traces, err = v1adapter.PTracesSeq2ToModel(iterTraces)
if err != nil {
t.Log(err)
return false
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const defaultLocalKafkaBroker = "127.0.0.1:9092"
Expand Down Expand Up @@ -91,7 +92,8 @@ func (s *KafkaIntegrationTestSuite) initialize(t *testing.T) {
spanConsumer.Start()

s.SpanWriter = spanWriter
s.SpanReader = &ingester{traceStore}
spanReader := &ingester{traceStore}
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.CleanUp = func(_ *testing.T) {}
s.SkipArchiveTest = true
}
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type MemStorageIntegrationTestSuite struct {
Expand All @@ -24,7 +25,8 @@ func (s *MemStorageIntegrationTestSuite) initialize(_ *testing.T) {
store := memory.NewStore()
archiveStore := memory.NewStore()
s.SamplingStore = memory.NewSamplingStore(2)
s.SpanReader = store
spanReader := store
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.SpanWriter = store
s.ArchiveSpanReader = archiveStore
s.ArchiveSpanWriter = archiveStore
Expand Down
54 changes: 54 additions & 0 deletions storage_v2/v1adapter/ptrace2model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package v1adapter

import (
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage/spanstore"
otel2model "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
)

// PTracesSeq2ToModel consumes tracesSeq.
// When necessary, it groups spans from *consecutive* chunks of ptrace.Traces into a single model.Trace
// It adheres to the chunking requirement of tracestore.Reader.GetTraces.
//
// Returns nil, and spanstore.ErrTraceNotFound for empty iterators
func PTracesSeq2ToModel(tracesSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) {
jaegerTraces := []*model.Trace{}
otelTraces, err := iter.CollectWithErrors(jptrace.AggregateTraces(tracesSeq))
if err != nil {
return nil, err
}
if len(otelTraces) == 0 {
return nil, spanstore.ErrTraceNotFound
}

for _, otelTrace := range otelTraces {
jTrace := &model.Trace{
Spans: modelSpansFromOtelTrace(otelTrace),
}
jaegerTraces = append(jaegerTraces, jTrace)
}
return jaegerTraces, nil
}

// modelSpansFromOtelTrace extracts spans from otel traces
func modelSpansFromOtelTrace(otelTrace ptrace.Traces) []*model.Span {
spans := []*model.Span{}
batches := otel2model.ProtoFromTraces(otelTrace)
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
proc := *batch.Process // shallow clone
span.Process = &proc
}
spans = append(spans, span)
}
}
return spans
}
Loading