Skip to content
Open
4 changes: 3 additions & 1 deletion cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ func (s *server) Start(ctx context.Context, host component.Host) error {

opts := querysvc.QueryServiceOptions{
MaxClockSkewAdjust: s.config.MaxClockSkewAdjust,
MaxTraceSize: s.config.MaxTraceSize,
}
v2opts := v2querysvc.QueryServiceOptions{
MaxClockSkewAdjust: s.config.MaxClockSkewAdjust,
MaxTraceSize: s.config.MaxTraceSize,
}
if err := s.addArchiveStorage(&opts, &v2opts, host); err != nil {
return err
Expand Down Expand Up @@ -164,7 +166,7 @@ func (s *server) addArchiveStorage(
v2opts.ArchiveTraceReader = traceReader
v2opts.ArchiveTraceWriter = traceWriter

spanReader := v1adapter.GetV1Reader(traceReader)
spanReader := v1adapter.GetV1ReaderWithLimit(traceReader, v2opts.MaxTraceSize)
spanWriter := v1adapter.GetV1Writer(traceWriter)

opts.ArchiveSpanReader = spanReader
Expand Down
11 changes: 11 additions & 0 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ const (
queryAdditionalHeaders = "query.additional-headers"
queryMaxClockSkewAdjust = "query.max-clock-skew-adjustment"
queryEnableTracing = "query.enable-tracing"
queryMaxTraceSize = "query.max-trace-size"
)

const (
defaultMaxClockSkewAdjust = 0 * time.Second
defaultMaxTraceSize = 0 // 0 means no limit
)

var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{
Expand Down Expand Up @@ -78,6 +80,8 @@ type QueryOptions struct {
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
// MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span.
MaxClockSkewAdjust time.Duration `mapstructure:"max_clock_skew_adjust" valid:"optional"`
// MaxTraceSize is the maximum number of spans per trace (0 = no limit).
MaxTraceSize int `mapstructure:"max_trace_size" valid:"optional"`
// EnableTracing determines whether traces will be emitted by jaeger-query.
EnableTracing bool `mapstructure:"enable_tracing"`
// HTTP holds the HTTP configuration that the query service uses to serve requests.
Expand All @@ -98,6 +102,7 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.Bool(queryTokenPropagation, false, "Allow propagation of bearer token to be used by storage plugins")
flagSet.Duration(queryMaxClockSkewAdjust, defaultMaxClockSkewAdjust, "The maximum delta by which span timestamps may be adjusted in the UI due to clock skew; set to 0s to disable clock skew adjustments")
flagSet.Bool(queryEnableTracing, false, "Enables emitting jaeger-query traces")
flagSet.Int(queryMaxTraceSize, defaultMaxTraceSize, "Maximum number of spans per trace (0 = no limit)")
tlsGRPCFlagsConfig.AddFlags(flagSet)
tlsHTTPFlagsConfig.AddFlags(flagSet)
}
Expand Down Expand Up @@ -137,6 +142,10 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q
}
qOpts.Tenancy = tenancy.InitFromViper(v)
qOpts.EnableTracing = v.GetBool(queryEnableTracing)
qOpts.MaxTraceSize = v.GetInt(queryMaxTraceSize)
if qOpts.MaxTraceSize < 0 {
return qOpts, fmt.Errorf("%s must be non-negative", queryMaxTraceSize)
}
return qOpts, nil
}

Expand All @@ -149,9 +158,11 @@ func (qOpts *QueryOptions) BuildQueryServiceOptions(
) (*querysvc.QueryServiceOptions, *v2querysvc.QueryServiceOptions) {
opts := &querysvc.QueryServiceOptions{
MaxClockSkewAdjust: qOpts.MaxClockSkewAdjust,
MaxTraceSize: qOpts.MaxTraceSize,
}
v2Opts := &v2querysvc.QueryServiceOptions{
MaxClockSkewAdjust: qOpts.MaxClockSkewAdjust,
MaxTraceSize: qOpts.MaxTraceSize,
}
as, err := initArchiveStorageFn()
if err != nil {
Expand Down
50 changes: 50 additions & 0 deletions cmd/query/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,53 @@ func TestDefaultQueryOptions(t *testing.T) {
require.Equal(t, ":16685", qo.GRPC.NetAddr.Endpoint)
require.EqualValues(t, "tcp", qo.GRPC.NetAddr.Transport)
}

func TestQueryOptionsMaxTraceSizeValidation(t *testing.T) {
tests := []struct {
name string
flags []string
expectError bool
expected int
}{
{
name: "default value",
flags: []string{},
expectError: false,
expected: 0,
},
{
name: "valid positive value",
flags: []string{"--query.max-trace-size=1000"},
expectError: false,
expected: 1000,
},
{
name: "valid zero value",
flags: []string{"--query.max-trace-size=0"},
expectError: false,
expected: 0,
},
{
name: "negative value should error",
flags: []string{"--query.max-trace-size=-1"},
expectError: true,
expected: 0,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
v, command := config.Viperize(AddFlags)
command.ParseFlags(test.flags)
qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop())

if test.expectError {
require.Error(t, err)
require.Contains(t, err.Error(), "query.max-trace-size must be non-negative")
} else {
require.NoError(t, err)
require.Equal(t, test.expected, qOpts.MaxTraceSize)
}
})
}
}
3 changes: 2 additions & 1 deletion cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type QueryServiceOptions struct {
ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer
MaxClockSkewAdjust time.Duration
MaxTraceSize int
}

// StorageCapabilities is a feature flag for query service
Expand Down Expand Up @@ -55,7 +56,7 @@ type TraceQueryParameters struct {

// NewQueryService returns a new QueryService.
func NewQueryService(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService {
spanReader := v1adapter.GetV1Reader(traceReader)
spanReader := v1adapter.GetV1ReaderWithLimit(traceReader, options.MaxTraceSize)
qsvc := &QueryService{
spanReader: spanReader,
dependencyReader: dependencyReader,
Expand Down
2 changes: 2 additions & 0 deletions cmd/query/app/querysvc/v2/querysvc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type QueryServiceOptions struct {
ArchiveTraceWriter tracestore.Writer
// MaxClockSkewAdjust is the maximum duration by which to adjust a span.
MaxClockSkewAdjust time.Duration
// MaxTraceSize is the maximum number of spans per trace (0 = no limit).
MaxTraceSize int
}

// StorageCapabilities is a feature flag for query service
Expand Down
7 changes: 4 additions & 3 deletions internal/storage/v2/v1adapter/spanreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ var errTooManyTracesFound = errors.New("too many traces found")
// SpanReader wraps a tracestore.Reader so that it can be downgraded to implement
// the v1 spanstore.Reader interface.
type SpanReader struct {
traceReader tracestore.Reader
traceReader tracestore.Reader
maxTraceSize int
}

func (sr *SpanReader) GetTrace(ctx context.Context, query spanstore.GetTraceParameters) (*model.Trace, error) {
Expand All @@ -29,7 +30,7 @@ func (sr *SpanReader) GetTrace(ctx context.Context, query spanstore.GetTracePara
Start: query.StartTime,
End: query.EndTime,
})
traces, err := V1TracesFromSeq2(getTracesIter)
traces, err := V1TracesFromSeq2WithLimit(getTracesIter, sr.maxTraceSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -80,7 +81,7 @@ func (sr *SpanReader) FindTraces(
DurationMax: query.DurationMax,
SearchDepth: query.NumTraces,
})
return V1TracesFromSeq2(getTracesIter)
return V1TracesFromSeq2WithLimit(getTracesIter, sr.maxTraceSize)
}

func (sr *SpanReader) FindTraceIDs(
Expand Down
7 changes: 6 additions & 1 deletion internal/storage/v2/v1adapter/tracereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ type TraceReader struct {
}

func GetV1Reader(reader tracestore.Reader) spanstore.Reader {
return GetV1ReaderWithLimit(reader, 0)
}

func GetV1ReaderWithLimit(reader tracestore.Reader, maxTraceSize int) spanstore.Reader {
if tr, ok := reader.(*TraceReader); ok {
return tr.spanReader
}
return &SpanReader{
traceReader: reader,
traceReader: reader,
maxTraceSize: maxTraceSize,
}
}

Expand Down
128 changes: 128 additions & 0 deletions internal/storage/v2/v1adapter/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,134 @@ func V1TracesFromSeq2(otelSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace
return jaegerTraces, nil
}

// V1TracesFromSeq2WithLimit converts an interator of ptrace.Traces chunks into v1 traces.
// If maxTraceSize > 0, traces exceeding that number of spans will be truncated while consuming the sequence.
func V1TracesFromSeq2WithLimit(otelSeq iter.Seq2[[]ptrace.Traces, error], maxTraceSize int) ([]*model.Trace, error) {
var (
jaegerTraces []*model.Trace
iterErr error
)
limitedSeq := applyTraceSizeLimit(otelSeq, maxTraceSize)
jptrace.AggregateTraces(limitedSeq)(func(otelTrace ptrace.Traces, err error) bool {
if err != nil {
iterErr = err
return false
}
jaegerTraces = append(jaegerTraces, modelTraceFromOtelTrace(otelTrace))
return true
})
if iterErr != nil {
return nil, iterErr
}
return jaegerTraces, nil
}

// applyTraceSizeLimit applies a per-trace span budget while consuming the iterator.
// Once the budget is reached for a logical trace, remaining chunks of that trace are not yielded,
// and if the current chunk exceeds the remaining budget it is truncated to the exact number of remaining spans.
func applyTraceSizeLimit(otelSeq iter.Seq2[[]ptrace.Traces, error], maxTraceSize int) iter.Seq2[[]ptrace.Traces, error] {
if maxTraceSize <= 0 {
return otelSeq
}

return func(yield func([]ptrace.Traces, error) bool) {
spansProcessed := 0
currentTraceID := pcommon.NewTraceIDEmpty()
truncated := false

otelSeq(func(traces []ptrace.Traces, err error) bool {
if err != nil {
return yield(nil, err)
}

limited := make([]ptrace.Traces, 0, len(traces))
for _, tr := range traces {
// detect logical trace boundary by the first span's TraceID
rs := tr.ResourceSpans()
if rs.Len() > 0 {
ss := rs.At(0).ScopeSpans()
if ss.Len() > 0 {
spans := ss.At(0).Spans()
if spans.Len() > 0 {
tid := spans.At(0).TraceID()
if tid != currentTraceID {
currentTraceID = tid
spansProcessed = 0
truncated = false
}
}
}
}

if truncated {
// already reached limit for this trace; skip this trace
continue
}

spanCount := countSpansInTrace(tr)
if spansProcessed+spanCount > maxTraceSize {
remaining := maxTraceSize - spansProcessed
if remaining > 0 {
limited = append(limited, truncateTraceToLimit(tr, remaining))
spansProcessed += remaining
}
truncated = true
// Continue to next trace in the same batch
continue
}

limited = append(limited, tr)
spansProcessed += spanCount
}

if len(limited) > 0 {
if !yield(limited, nil) {
return false
}
}
return true // Always continue to process more batches
})
}
}

func countSpansInTrace(tr ptrace.Traces) int {
total := 0
rs := tr.ResourceSpans()
for i := 0; i < rs.Len(); i++ {
ss := rs.At(i).ScopeSpans()
for j := 0; j < ss.Len(); j++ {
total += ss.At(j).Spans().Len()
}
}
return total
}

// truncateTraceToLimit returns a new ptrace.Traces containing at most remaining spans, preserving
// resource and scope attributes for included spans only.
func truncateTraceToLimit(tr ptrace.Traces, remaining int) ptrace.Traces {
out := ptrace.NewTraces()
rs := tr.ResourceSpans()
copied := 0
for i := 0; i < rs.Len() && copied < remaining; i++ {
inRes := rs.At(i)
outRes := out.ResourceSpans().AppendEmpty()
inRes.Resource().CopyTo(outRes.Resource())
ss := inRes.ScopeSpans()
for j := 0; j < ss.Len() && copied < remaining; j++ {
inScope := ss.At(j)
outScope := outRes.ScopeSpans().AppendEmpty()
inScope.Scope().CopyTo(outScope.Scope())
spans := inScope.Spans()
for k := 0; k < spans.Len() && copied < remaining; k++ {
span := spans.At(k)
span.CopyTo(outScope.Spans().AppendEmpty())
copied++
}
}
}
return out
}

func V1TraceIDsFromSeq2(traceIDsIter iter.Seq2[[]tracestore.FoundTraceID, error]) ([]model.TraceID, error) {
var (
iterErr error
Expand Down
Loading