Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][v2][storage] Create v2 query service to operate on otlp data model #6343

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
141 changes: 141 additions & 0 deletions cmd/query/app/querysvc/query_service_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// // Copyright (c) 2019 The Jaeger Authors.
// // SPDX-License-Identifier: Apache-2.0

package querysvc

import (
"context"
"errors"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

// TODO: Remove query_service.go and rename query_service_v2.go
// to query_service.go once all components have been migrated to
// operate on the OTEL data model.
var errNoArchiveSpanStorageV2 = errors.New("archive span storage was not configured")

const (
defaultMaxClockSkewAdjustV2 = time.Second
)
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved

// QueryServiceOptions has optional members of QueryService
type QueryServiceOptionsV2 struct {
ArchiveTraceReader tracestore.Reader
ArchiveTraceWriter tracestore.Writer
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
Adjuster adjuster.Adjuster
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// StorageCapabilities is a feature flag for query service
type StorageCapabilitiesV2 struct {
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
ArchiveStorage bool `json:"archiveStorage"`
// TODO: Maybe add metrics Storage here
// SupportRegex bool
// SupportTagFilter bool
}

// QueryService contains span utils required by the query-service.
type QueryServiceV2 struct {
traceReader tracestore.Reader
dependencyReader depstore.Reader
options QueryServiceOptionsV2
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

// NewQueryService returns a new QueryService.
func NewQueryServiceV2(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService {
qsvc := &QueryService{
traceReader: traceReader,
dependencyReader: dependencyReader,
options: options,
}

if qsvc.options.Adjuster == nil {
qsvc.options.Adjuster = adjuster.Sequence(StandardAdjusters(defaultMaxClockSkewAdjustV2)...)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}
return qsvc

Check warning on line 64 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L54-L64

Added lines #L54 - L64 were not covered by tests
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// GetTrace is the queryService implementation of tracestore.Reader.GetTrace
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
func (qs QueryServiceV2) GetTrace(ctx context.Context, traceID pcommon.TraceID) (ptrace.Traces, error) {
trace, err := qs.traceReader.GetTrace(ctx, traceID)
if errors.Is(err, spanstore.ErrTraceNotFound) {
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
if qs.options.ArchiveTraceReader == nil {
return ptrace.NewTraces(), err
}
trace, err = qs.options.ArchiveTraceReader.GetTrace(ctx, traceID)

Check warning on line 74 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L68-L74

Added lines #L68 - L74 were not covered by tests
}
return trace, err

Check warning on line 76 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L76

Added line #L76 was not covered by tests
}

// GetServices is the queryService implementation of tracestore.Reader.GetServices
func (qs QueryServiceV2) GetServices(ctx context.Context) ([]string, error) {
return qs.traceReader.GetServices(ctx)

Check warning on line 81 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L80-L81

Added lines #L80 - L81 were not covered by tests
}

// GetOperations is the queryService implementation of tracestore.Reader.GetOperations
func (qs QueryServiceV2) GetOperations(
ctx context.Context,
query tracestore.OperationQueryParameters,
) ([]tracestore.Operation, error) {
return qs.traceReader.GetOperations(ctx, query)

Check warning on line 89 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L88-L89

Added lines #L88 - L89 were not covered by tests
}

// FindTraces is the queryService implementation of tracestore.Reader.FindTraces
func (qs QueryServiceV2) FindTraces(ctx context.Context, query tracestore.TraceQueryParameters) ([]ptrace.Traces, error) {
return qs.traceReader.FindTraces(ctx, query)

Check warning on line 94 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L93-L94

Added lines #L93 - L94 were not covered by tests
}

// ArchiveTrace is the queryService utility to archive traces.
func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.TraceID) error {
if qs.options.ArchiveTraceWriter == nil {
return errNoArchiveSpanStorageV2
}
trace, err := qs.GetTrace(ctx, traceID)
if err != nil {
return err
}
return qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace)

Check warning on line 106 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L98-L106

Added lines #L98 - L106 were not covered by tests
}

// Adjust applies adjusters to the trace.
func (qs QueryServiceV2) Adjust(trace *model.Trace) (*model.Trace, error) {
return qs.options.Adjuster.Adjust(trace)

Check warning on line 111 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L110-L111

Added lines #L110 - L111 were not covered by tests
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// GetDependencies implements depstore.Reader.GetDependencies
func (qs QueryServiceV2) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{
StartTime: endTs.Add(-lookback),
EndTime: endTs,
})

Check warning on line 119 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L115-L119

Added lines #L115 - L119 were not covered by tests
}

// GetCapabilities returns the features supported by the query service.
func (qs QueryServiceV2) GetCapabilities() StorageCapabilities {
return StorageCapabilities{
ArchiveStorage: qs.options.hasArchiveStorage(),
}

Check warning on line 126 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L123-L126

Added lines #L123 - L126 were not covered by tests
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
}

// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them.
func (opts *QueryServiceOptionsV2) InitArchiveStorage(
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
archiveReader tracestore.Reader,
archiveWriter tracestore.Writer,
logger *zap.Logger) {
opts.ArchiveTraceReader = archiveReader
opts.ArchiveTraceWriter = archiveWriter

Check warning on line 135 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L133-L135

Added lines #L133 - L135 were not covered by tests
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// hasArchiveStorage returns true if archive storage reader/writer are initialized.
func (opts *QueryServiceOptionsV2) hasArchiveStorage() bool {
return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil

Check warning on line 140 in cmd/query/app/querysvc/query_service_v2.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service_v2.go#L139-L140

Added lines #L139 - L140 were not covered by tests
}
Loading