Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][v2][storage] Create v2 query service to operate on otlp data model #6343

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
14 changes: 0 additions & 14 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,13 @@ import (
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

var errNoArchiveSpanStorage = errors.New("archive span storage was not configured")

const (
defaultMaxClockSkewAdjust = time.Second
)

// QueryServiceOptions has optional members of QueryService
type QueryServiceOptions struct {
ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer
Adjuster adjuster.Adjuster
}

// StorageCapabilities is a feature flag for query service
type StorageCapabilities struct {
ArchiveStorage bool `json:"archiveStorage"`
// TODO: Maybe add metrics Storage here
// SupportRegex bool
// SupportTagFilter bool
}

// QueryService contains span utils required by the query-service.
type QueryService struct {
traceReader tracestore.Reader
Expand Down
180 changes: 180 additions & 0 deletions cmd/query/app/querysvc/query_service_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package querysvc

import (
"context"
"errors"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/adjuster"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

var errNoArchiveSpanStorage = errors.New("archive span storage was not configured")

const (
defaultMaxClockSkewAdjust = time.Second
)

// QueryServiceOptions has optional members of QueryService
type QueryServiceOptionsV2 struct {
ArchiveTraceReader tracestore.Reader
ArchiveTraceWriter tracestore.Writer
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
Adjuster adjuster.Adjuster
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// StorageCapabilities is a feature flag for query service
type StorageCapabilities struct {
ArchiveStorage bool `json:"archiveStorage"`
// TODO: Maybe add metrics Storage here
// SupportRegex bool
// SupportTagFilter bool
}

// QueryService contains span utils required by the query-service.
type QueryServiceV2 struct {
traceReader tracestore.Reader
dependencyReader depstore.Reader
options QueryServiceOptionsV2
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

// NewQueryService returns a new QueryService.
func NewQueryServiceV2(
traceReader tracestore.Reader,
dependencyReader depstore.Reader,
options QueryServiceOptionsV2,
) *QueryServiceV2 {
qsvc := &QueryServiceV2{
traceReader: traceReader,
dependencyReader: dependencyReader,
options: options,
}

if qsvc.options.Adjuster == nil {
qsvc.options.Adjuster = adjuster.Sequence(
adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...)
}
return qsvc

Check warning on line 65 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L54-L65

Added lines #L54 - L65 were not covered by tests
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// GetTraces retrieves traces with given trace IDs from the primary reader,
// and if any of them are not found it then queries the archive reader.
// The iterator is single-use: once consumed, it cannot be used again.
func (qs QueryServiceV2) GetTraces(
ctx context.Context,
traceIDs ...tracestore.GetTraceParams) iter.Seq2[[]ptrace.Traces, error] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
traceIDs ...tracestore.GetTraceParams) iter.Seq2[[]ptrace.Traces, error] {
traceIDs ...tracestore.GetTraceParams,
) iter.Seq2[[]ptrace.Traces, error] {

getTracesIter := qs.traceReader.GetTraces(ctx, traceIDs...)
return func(yield func([]ptrace.Traces, error) bool) {
foundTraceIDs := make(map[pcommon.TraceID]struct{})
getTracesIter(func(traces []ptrace.Traces, err error) bool {
if err != nil {
return yield(nil, err)
}
for _, trace := range traces {
resources := trace.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
scopes := resources.At(i).ScopeSpans()
for j := 0; j < scopes.Len(); j++ {
spans := scopes.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
foundTraceIDs[span.TraceID()] = struct{}{}
}

Check warning on line 90 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L73-L90

Added lines #L73 - L90 were not covered by tests
}
}
}
return yield(traces, nil)

Check warning on line 94 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L94

Added line #L94 was not covered by tests
})
if qs.options.ArchiveTraceReader != nil {
var missingTraceIDs []tracestore.GetTraceParams
for _, id := range traceIDs {
if _, found := foundTraceIDs[id.TraceID]; !found {
missingTraceIDs = append(missingTraceIDs, id)
}

Check warning on line 101 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L96-L101

Added lines #L96 - L101 were not covered by tests
}
if len(missingTraceIDs) > 0 {
qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...)(yield)
}

Check warning on line 105 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L103-L105

Added lines #L103 - L105 were not covered by tests
}
}
}

// GetServices is the queryService implementation of tracestore.Reader.GetServices
func (qs QueryServiceV2) GetServices(ctx context.Context) ([]string, error) {
return qs.traceReader.GetServices(ctx)

Check warning on line 112 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L111-L112

Added lines #L111 - L112 were not covered by tests
}

// GetOperations is the queryService implementation of tracestore.Reader.GetOperations
func (qs QueryServiceV2) GetOperations(
ctx context.Context,
query tracestore.OperationQueryParameters,
) ([]tracestore.Operation, error) {
return qs.traceReader.GetOperations(ctx, query)

Check warning on line 120 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L119-L120

Added lines #L119 - L120 were not covered by tests
}

// FindTraces is the queryService implementation of tracestore.Reader.FindTraces
func (qs QueryServiceV2) FindTraces(ctx context.Context, query tracestore.TraceQueryParams) iter.Seq2[[]ptrace.Traces, error] {
return qs.traceReader.FindTraces(ctx, query)

Check warning on line 125 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L124-L125

Added lines #L124 - L125 were not covered by tests
}

// ArchiveTrace is the queryService utility to archive traces.
func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.TraceID) error {
if qs.options.ArchiveTraceWriter == nil {
return errNoArchiveSpanStorage
}
getTracesIter := qs.GetTraces(ctx, tracestore.GetTraceParams{TraceID: traceID})
var archiveErr error
getTracesIter(func(traces []ptrace.Traces, err error) bool {
if err != nil {
archiveErr = err
return false
}
for _, trace := range traces {
err = qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace)
if err != nil {
archiveErr = err
return false
}

Check warning on line 145 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L129-L145

Added lines #L129 - L145 were not covered by tests
}
return true

Check warning on line 147 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L147

Added line #L147 was not covered by tests
})
return archiveErr

Check warning on line 149 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L149

Added line #L149 was not covered by tests
}

// Adjust applies adjusters to the trace.
func (qs QueryServiceV2) Adjust(tracesIter iter.Seq[[]ptrace.Traces]) {
tracesIter(func(traces []ptrace.Traces) bool {
for _, trace := range traces {
qs.options.Adjuster.Adjust(trace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you cannot assume here that trace is a full trace, you need to clump consecutive chunks if they are for the same trace ID. We can implement a helper function for that that will take Seq[[]ptrace.Traces] and return Seq[ptrace.Traces] where each item is a full trace. Similar to what I was suggesting in https://github.com/jaegertracing/jaeger/pull/6388/files#r1894703201

}
return true

Check warning on line 158 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L153-L158

Added lines #L153 - L158 were not covered by tests
})
}
Comment on lines +153 to +160
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro Did I understand correctly in that this is what we wanted here? Or did you mean that we should change the underlying adjusters themselves to work on Seq?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you understood correctly. However, because adjusting needs to re-arrange the data I think this func should return Seq[ptrace.Traces] where each item is fully adjusted trace.


// GetDependencies implements depstore.Reader.GetDependencies
func (qs QueryServiceV2) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{
StartTime: endTs.Add(-lookback),
EndTime: endTs,
})

Check warning on line 167 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L163-L167

Added lines #L163 - L167 were not covered by tests
}

// GetCapabilities returns the features supported by the query service.
func (qs QueryServiceV2) GetCapabilities() StorageCapabilities {
return StorageCapabilities{
ArchiveStorage: qs.options.hasArchiveStorage(),
}

Check warning on line 174 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L171-L174

Added lines #L171 - L174 were not covered by tests
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
}

// hasArchiveStorage returns true if archive storage reader/writer are initialized.
func (opts *QueryServiceOptionsV2) hasArchiveStorage() bool {
return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil

Check warning on line 179 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L178-L179

Added lines #L178 - L179 were not covered by tests
}
Loading