-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[v2][storage] Create v2 query service to operate on otlp data model #6343
Merged
mahadzaryab1
merged 42 commits into
jaegertracing:main
from
mahadzaryab1:query-service-v2
Dec 31, 2024
Merged
Changes from all commits
Commits
Show all changes
42 commits
Select commit
Hold shift + click to select a range
ad0de07
Create Query Service V2 To Operate on OTEL Data Model
mahadzaryab1 24278bb
Merge branch 'main' into query-service-v2
mahadzaryab1 41f1d51
Merge branch 'main' into query-service-v2
mahadzaryab1 a33ab6c
Address Build Failures And Feedback
mahadzaryab1 ffad629
Fix ArchiveTrace And GetTraces To Operate On Iterators
mahadzaryab1 1d721ac
Change Adjuster To Work On Seq
mahadzaryab1 a6df503
Fix Error Capture
mahadzaryab1 60a875a
Address Feedback From PR Review
mahadzaryab1 d31a5e5
Merge branch 'main' into query-service-v2
mahadzaryab1 2c7a848
Merge branch 'main' into query-service-v2
mahadzaryab1 d32ba48
Run Linter
mahadzaryab1 ecab080
Update Query Service To Perform Adjustments
mahadzaryab1 9ee2d38
Fix Signature of ArchiveTrace Function
mahadzaryab1 57ef9e7
Add Some Unit Tests
mahadzaryab1 40d9a68
Add Missing License
mahadzaryab1 32491f5
Adjust Archive Trace
mahadzaryab1 3d6af5f
Aggregate Traces
mahadzaryab1 ab95ab7
Remove TODO Comment
mahadzaryab1 6737ca9
Merge branch 'main' into query-service-v2
mahadzaryab1 005d8d7
Use SpanIter
mahadzaryab1 7171785
Add Proceed Check
mahadzaryab1 e5a4b1b
Address Feedback From PR Review
mahadzaryab1 f887d09
Create Receive Traces Helper Function
mahadzaryab1 959d2d4
Add Unit Tests For Get Traces
mahadzaryab1 d468249
Fix Lint
mahadzaryab1 f6d157e
Add Unit Tests For Find Traces
mahadzaryab1 d2f1c95
Add Test For Archive Trace Writer Error
mahadzaryab1 6f69d63
Add Test For Archive Trace Writer Success
mahadzaryab1 9250285
Write Test For Get Traces In Archive Storage
mahadzaryab1 1e0c5b3
Move Tests To Separate Package To Simplify Naming
mahadzaryab1 c14fc4f
Fix Comment
mahadzaryab1 73fbb19
Add Test For Error In Get Trace
mahadzaryab1 5020f89
Use Flatten With Errors
mahadzaryab1 e29eafe
Add Test For Error In Get Trace Reader
mahadzaryab1 2c716bf
Cleanup Tests
mahadzaryab1 28e5fd0
Combine Tests For Brevity
mahadzaryab1 5ae23f2
Fix Lint
mahadzaryab1 a79bcac
Merge branch 'main' into query-service-v2
mahadzaryab1 5636427
Move Query Service To V2 Package
mahadzaryab1 529bcda
Fix Lint
mahadzaryab1 806a265
Drop V2 Suffix
mahadzaryab1 f163d55
Merge branch 'main' into query-service-v2
mahadzaryab1 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package querysvc | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/jaegertracing/jaeger/pkg/testutils" | ||
) | ||
|
||
func TestMain(m *testing.M) { | ||
testutils.VerifyGoLeaks(m) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package querysvc | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/pdata/pcommon" | ||
"go.opentelemetry.io/collector/pdata/ptrace" | ||
|
||
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/adjuster" | ||
"github.com/jaegertracing/jaeger/internal/jptrace" | ||
"github.com/jaegertracing/jaeger/model" | ||
"github.com/jaegertracing/jaeger/pkg/iter" | ||
"github.com/jaegertracing/jaeger/storage_v2/depstore" | ||
"github.com/jaegertracing/jaeger/storage_v2/tracestore" | ||
) | ||
|
||
var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") | ||
|
||
const ( | ||
defaultMaxClockSkewAdjust = time.Second | ||
) | ||
|
||
// QueryServiceOptions holds the configuration options for the query service. | ||
type QueryServiceOptions struct { | ||
// ArchiveTraceReader is used to read archived traces from the storage. | ||
ArchiveTraceReader tracestore.Reader | ||
// ArchiveTraceWriter is used to write traces to the archive storage. | ||
ArchiveTraceWriter tracestore.Writer | ||
// Adjuster is used to adjust traces before they are returned to the client. | ||
// If not set, the default adjuster will be used. | ||
Adjuster adjuster.Adjuster | ||
} | ||
|
||
// StorageCapabilities is a feature flag for query service | ||
type StorageCapabilities struct { | ||
ArchiveStorage bool `json:"archiveStorage"` | ||
// TODO: Maybe add metrics Storage here | ||
// SupportRegex bool | ||
// SupportTagFilter bool | ||
} | ||
|
||
// QueryService provides methods to query data from the storage. | ||
type QueryService struct { | ||
traceReader tracestore.Reader | ||
dependencyReader depstore.Reader | ||
options QueryServiceOptions | ||
} | ||
|
||
// GetTraceParams defines the parameters for retrieving traces using the GetTraces function. | ||
type GetTraceParams struct { | ||
// TraceIDs is a slice of trace identifiers to fetch. | ||
TraceIDs []tracestore.GetTraceParams | ||
// RawTraces indicates whether to retrieve raw traces. | ||
// If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster. | ||
RawTraces bool | ||
} | ||
|
||
// TraceQueryParams represents the parameters for querying a batch of traces. | ||
type TraceQueryParams struct { | ||
tracestore.TraceQueryParams | ||
// RawTraces indicates whether to retrieve raw traces. | ||
// If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster. | ||
RawTraces bool | ||
} | ||
|
||
func NewQueryService( | ||
traceReader tracestore.Reader, | ||
dependencyReader depstore.Reader, | ||
options QueryServiceOptions, | ||
) *QueryService { | ||
qsvc := &QueryService{ | ||
traceReader: traceReader, | ||
dependencyReader: dependencyReader, | ||
options: options, | ||
} | ||
|
||
if qsvc.options.Adjuster == nil { | ||
qsvc.options.Adjuster = adjuster.Sequence( | ||
adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...) | ||
} | ||
return qsvc | ||
} | ||
|
||
// GetTraces retrieves traces with given trace IDs from the primary reader, | ||
// and if any of them are not found it then queries the archive reader. | ||
// The iterator is single-use: once consumed, it cannot be used again. | ||
func (qs QueryService) GetTraces( | ||
ctx context.Context, | ||
params GetTraceParams, | ||
) iter.Seq2[[]ptrace.Traces, error] { | ||
getTracesIter := qs.traceReader.GetTraces(ctx, params.TraceIDs...) | ||
return func(yield func([]ptrace.Traces, error) bool) { | ||
foundTraceIDs, proceed := qs.receiveTraces(getTracesIter, yield, params.RawTraces) | ||
if proceed && qs.options.ArchiveTraceReader != nil { | ||
var missingTraceIDs []tracestore.GetTraceParams | ||
for _, id := range params.TraceIDs { | ||
if _, found := foundTraceIDs[id.TraceID]; !found { | ||
missingTraceIDs = append(missingTraceIDs, id) | ||
} | ||
} | ||
if len(missingTraceIDs) > 0 { | ||
getArchiveTracesIter := qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...) | ||
qs.receiveTraces(getArchiveTracesIter, yield, params.RawTraces) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (qs QueryService) GetServices(ctx context.Context) ([]string, error) { | ||
return qs.traceReader.GetServices(ctx) | ||
} | ||
|
||
func (qs QueryService) GetOperations( | ||
ctx context.Context, | ||
query tracestore.OperationQueryParams, | ||
) ([]tracestore.Operation, error) { | ||
return qs.traceReader.GetOperations(ctx, query) | ||
} | ||
|
||
func (qs QueryService) FindTraces( | ||
ctx context.Context, | ||
query TraceQueryParams, | ||
) iter.Seq2[[]ptrace.Traces, error] { | ||
return func(yield func([]ptrace.Traces, error) bool) { | ||
tracesIter := qs.traceReader.FindTraces(ctx, query.TraceQueryParams) | ||
qs.receiveTraces(tracesIter, yield, query.RawTraces) | ||
} | ||
} | ||
|
||
// ArchiveTrace archives a trace specified by the given query parameters. | ||
// If the ArchiveTraceWriter is not configured, it returns | ||
// an error indicating that there is no archive span storage available. | ||
func (qs QueryService) ArchiveTrace(ctx context.Context, query tracestore.GetTraceParams) error { | ||
if qs.options.ArchiveTraceWriter == nil { | ||
return errNoArchiveSpanStorage | ||
} | ||
getTracesIter := qs.GetTraces( | ||
ctx, GetTraceParams{TraceIDs: []tracestore.GetTraceParams{query}}, | ||
) | ||
var archiveErr error | ||
getTracesIter(func(traces []ptrace.Traces, err error) bool { | ||
if err != nil { | ||
archiveErr = err | ||
return false | ||
} | ||
for _, trace := range traces { | ||
err = qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace) | ||
if err != nil { | ||
archiveErr = errors.Join(archiveErr, err) | ||
} | ||
} | ||
return true | ||
}) | ||
return archiveErr | ||
} | ||
|
||
func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { | ||
return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{ | ||
StartTime: endTs.Add(-lookback), | ||
EndTime: endTs, | ||
}) | ||
} | ||
|
||
func (qs QueryService) GetCapabilities() StorageCapabilities { | ||
return StorageCapabilities{ | ||
ArchiveStorage: qs.options.hasArchiveStorage(), | ||
} | ||
} | ||
|
||
func (opts *QueryServiceOptions) hasArchiveStorage() bool { | ||
return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil | ||
} | ||
|
||
func (qs QueryService) receiveTraces( | ||
seq iter.Seq2[[]ptrace.Traces, error], | ||
yield func([]ptrace.Traces, error) bool, | ||
rawTraces bool, | ||
) (map[pcommon.TraceID]struct{}, bool) { | ||
aggregatedTraces := jptrace.AggregateTraces(seq) | ||
foundTraceIDs := make(map[pcommon.TraceID]struct{}) | ||
proceed := true | ||
aggregatedTraces(func(trace ptrace.Traces, err error) bool { | ||
if err != nil { | ||
proceed = yield(nil, err) | ||
return proceed | ||
} | ||
if !rawTraces { | ||
qs.options.Adjuster.Adjust(trace) | ||
} | ||
jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool { | ||
foundTraceIDs[span.TraceID()] = struct{}{} | ||
return true | ||
}) | ||
proceed = yield([]ptrace.Traces{trace}, nil) | ||
return proceed | ||
}) | ||
return foundTraceIDs, proceed | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yurishkuro is this file path good? currently the adjusters are in
querysvc/v2/adjuster/
- should we also move it underquerysvc/v2/querysvc/adjuster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V2/adjuster is good path. But now that you mentioned, I think we could move the whole thing out of query/app into query/internal, as part of the overall cleanup I'm pushing for. But we can do it in a follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good - will do in a follow-up PR