Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][v2][storage] Create v2 query service to operate on otlp data model #6343

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
6 changes: 0 additions & 6 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ import (
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

var errNoArchiveSpanStorage = errors.New("archive span storage was not configured")

const (
defaultMaxClockSkewAdjust = time.Second
)

// QueryServiceOptions has optional members of QueryService
type QueryServiceOptions struct {
ArchiveSpanReader spanstore.Reader
Expand Down
140 changes: 140 additions & 0 deletions cmd/query/app/querysvc/query_service_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package querysvc

import (
"context"
"errors"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/adjuster"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

var errNoArchiveSpanStorage = errors.New("archive span storage was not configured")

const (
defaultMaxClockSkewAdjust = time.Second
)

// QueryServiceOptions has optional members of QueryService
type QueryServiceOptionsV2 struct {
ArchiveTraceReader tracestore.Reader
ArchiveTraceWriter tracestore.Writer
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
Adjuster adjuster.Adjuster
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// StorageCapabilities is a feature flag for query service
type StorageCapabilitiesV2 struct {
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
ArchiveStorage bool `json:"archiveStorage"`
// TODO: Maybe add metrics Storage here
// SupportRegex bool
// SupportTagFilter bool
}

// QueryService contains span utils required by the query-service.
type QueryServiceV2 struct {
traceReader tracestore.Reader
dependencyReader depstore.Reader
options QueryServiceOptionsV2
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

// NewQueryService returns a new QueryService.
func NewQueryServiceV2(
traceReader tracestore.Reader,
dependencyReader depstore.Reader,
options QueryServiceOptionsV2,
) *QueryServiceV2 {
qsvc := &QueryServiceV2{
traceReader: traceReader,
dependencyReader: dependencyReader,
options: options,
}

if qsvc.options.Adjuster == nil {
qsvc.options.Adjuster = adjuster.Sequence(adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...)
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
}
return qsvc

Check warning on line 65 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L55-L65

Added lines #L55 - L65 were not covered by tests
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// GetTrace is the queryService implementation of tracestore.Reader.GetTrace
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
func (qs QueryServiceV2) GetTraces(ctx context.Context, traceIDs ...tracestore.GetTraceParams) iter.Seq2[[]ptrace.Traces, error] {
traceIter := qs.traceReader.GetTraces(ctx, traceIDs...)
_, err := iter.FlattenWithErrors(traceIter)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if errors.Is(err, spanstore.ErrTraceNotFound) {
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
if qs.options.ArchiveTraceReader == nil {
return func(yield func([]ptrace.Traces, error) bool) {
yield(nil, err)
}

Check warning on line 76 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L69-L76

Added lines #L69 - L76 were not covered by tests
}
traceIter = qs.options.ArchiveTraceReader.GetTraces(ctx, traceIDs...)

Check warning on line 78 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L78

Added line #L78 was not covered by tests
}
return traceIter

Check warning on line 80 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L80

Added line #L80 was not covered by tests
}

// GetServices is the queryService implementation of tracestore.Reader.GetServices
func (qs QueryServiceV2) GetServices(ctx context.Context) ([]string, error) {
return qs.traceReader.GetServices(ctx)

Check warning on line 85 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L84-L85

Added lines #L84 - L85 were not covered by tests
}

// GetOperations is the queryService implementation of tracestore.Reader.GetOperations
func (qs QueryServiceV2) GetOperations(
ctx context.Context,
query tracestore.OperationQueryParameters,
) ([]tracestore.Operation, error) {
return qs.traceReader.GetOperations(ctx, query)

Check warning on line 93 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}

// FindTraces is the queryService implementation of tracestore.Reader.FindTraces
func (qs QueryServiceV2) FindTraces(ctx context.Context, query tracestore.TraceQueryParams) iter.Seq2[[]ptrace.Traces, error] {
return qs.traceReader.FindTraces(ctx, query)

Check warning on line 98 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L97-L98

Added lines #L97 - L98 were not covered by tests
}

// ArchiveTrace is the queryService utility to archive traces.
func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.TraceID) error {
if qs.options.ArchiveTraceWriter == nil {
return errNoArchiveSpanStorage
}
var err error
traces, err := iter.FlattenWithErrors(qs.GetTraces(ctx, tracestore.GetTraceParams{TraceID: traceID}))
if err != nil {
return err
}
for _, trace := range traces {
err = errors.Join(err, qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace))
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return err

Check warning on line 114 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L102-L114

Added lines #L102 - L114 were not covered by tests
}

// Adjust applies adjusters to the trace.
func (qs QueryServiceV2) Adjust(trace ptrace.Traces) {
qs.options.Adjuster.Adjust(trace)

Check warning on line 119 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L118-L119

Added lines #L118 - L119 were not covered by tests
}

// GetDependencies implements depstore.Reader.GetDependencies
func (qs QueryServiceV2) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{
StartTime: endTs.Add(-lookback),
EndTime: endTs,
})

Check warning on line 127 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L123-L127

Added lines #L123 - L127 were not covered by tests
}

// GetCapabilities returns the features supported by the query service.
func (qs QueryServiceV2) GetCapabilities() StorageCapabilities {
return StorageCapabilities{
ArchiveStorage: qs.options.hasArchiveStorage(),
}

Check warning on line 134 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L131-L134

Added lines #L131 - L134 were not covered by tests
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
}

// hasArchiveStorage returns true if archive storage reader/writer are initialized.
func (opts *QueryServiceOptionsV2) hasArchiveStorage() bool {
return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil

Check warning on line 139 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L138-L139

Added lines #L138 - L139 were not covered by tests
}
Loading