Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
aH.handleFunc(router, aH.calls, "/metrics/calls").Methods(http.MethodGet)
aH.handleFunc(router, aH.errors, "/metrics/errors").Methods(http.MethodGet)
aH.handleFunc(router, aH.minStep, "/metrics/minstep").Methods(http.MethodGet)
aH.handleFunc(router, aH.attributeValues, "/metrics/attributes").Methods(http.MethodGet)
aH.handleFunc(router, aH.getQualityMetrics, "/quality-metrics").Methods(http.MethodGet)
}

Expand Down Expand Up @@ -360,6 +361,30 @@ func (aH *APIHandler) minStep(w http.ResponseWriter, r *http.Request) {
aH.writeJSON(w, r, &structuredRes)
}

func (aH *APIHandler) attributeValues(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
if key == "" {
aH.handleError(w, errors.New("attribute key is required"), http.StatusBadRequest)
return
}
// Parse service names from query parameter
serviceName := r.URL.Query().Get("service")

values, err := aH.metricsQueryService.GetAttributeValues(r.Context(), &metricstore.AttributeValuesQueryParameters{
AttributeKey: key,
ServiceName: serviceName,
})
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}

structuredRes := structuredResponse{
Data: values,
Total: len(values),
}
aH.writeJSON(w, r, &structuredRes)
}

func (aH *APIHandler) metrics(w http.ResponseWriter, r *http.Request, getMetrics func(context.Context, metricstore.BaseQueryParameters) (*metrics.MetricFamily, error)) {
requestParams, err := aH.queryParser.parseMetricsQueryParams(r)
if aH.handleError(w, err, http.StatusBadRequest) {
Expand Down
112 changes: 112 additions & 0 deletions cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/jaegertracing/jaeger/internal/jtracer"
"github.com/jaegertracing/jaeger/internal/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/internal/storage/metricstore/disabled"
"github.com/jaegertracing/jaeger/internal/storage/v1/api/metricstore"
metricsmocks "github.com/jaegertracing/jaeger/internal/storage/v1/api/metricstore/mocks"
"github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore/mocks"
Expand Down Expand Up @@ -1160,3 +1161,114 @@ func TestSearchTenancyFlowTenantHTTP(t *testing.T) {
assert.Empty(t, responseMegacorp.Errors)
assert.Nil(t, responseMegacorp.Data)
}

func TestGetLabelValues(t *testing.T) {
mr := &metricsmocks.Reader{}
apiHandlerOptions := []HandlerOption{
HandlerOptions.MetricsQueryService(mr),
}
ts := initializeTestServer(t, apiHandlerOptions...)

expectedValues := []string{"emailservice", "frontend", "productcatalogservice"}

testCases := []struct {
name string
urlPath string
labelName string
service string
}{
{
name: "Get span_kind values",
urlPath: "/api/metrics/attributes?key=span_kind&service=frontend",
labelName: "span_kind",
service: "frontend",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mr.On(
"GetAttributeValues",
mock.AnythingOfType("*context.valueCtx"),
mock.MatchedBy(func(params *metricstore.AttributeValuesQueryParameters) bool {
return params.AttributeKey == tc.labelName &&
params.ServiceName == tc.service
}),
).Return(expectedValues, nil).Once()

var response structuredResponse
err := getJSON(ts.server.URL+tc.urlPath, &response)

require.NoError(t, err)

// Convert response.Data from []interface{} to []string for comparison
dataSlice, ok := response.Data.([]any)
require.True(t, ok, "Response data should be a slice")

actualValues := make([]string, len(dataSlice))
for i, v := range dataSlice {
actualValues[i] = v.(string)
}

assert.Equal(t, expectedValues, actualValues)
assert.Equal(t, len(expectedValues), response.Total)
})
}
}

func TestGetLabelValuesError(t *testing.T) {
t.Run("missing key parameter", func(t *testing.T) {
mr := &metricsmocks.Reader{}
apiHandlerOptions := []HandlerOption{
HandlerOptions.MetricsQueryService(mr),
}
ts := initializeTestServer(t, apiHandlerOptions...)
response, err := http.Get(ts.server.URL + "/api/metrics/attributes?service=frontend")
require.NoError(t, err)
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
require.NoError(t, err)

var errResponse structuredResponse
err = json.Unmarshal(body, &errResponse)
require.NoError(t, err)

require.Len(t, errResponse.Errors, 1)
assert.Equal(t, http.StatusBadRequest, errResponse.Errors[0].Code)
assert.Equal(t, "attribute key is required", errResponse.Errors[0].Msg)
})

t.Run("storage error", func(t *testing.T) {
mr := &metricsmocks.Reader{}
apiHandlerOptions := []HandlerOption{
HandlerOptions.MetricsQueryService(mr),
}
ts := initializeTestServer(t, apiHandlerOptions...)

mr.On(
"GetAttributeValues",
mock.AnythingOfType("*context.valueCtx"),
mock.MatchedBy(func(params *metricstore.AttributeValuesQueryParameters) bool {
return params.AttributeKey == "span_kind" && params.ServiceName == "frontend"
}),
).Return(nil, errors.New("storage error")).Once()

response, err := http.Get(ts.server.URL + "/api/metrics/attributes?service=frontend&key=span_kind&target=tags")
require.NoError(t, err)
defer response.Body.Close()

assert.Equal(t, http.StatusInternalServerError, response.StatusCode)

// Read the response body to verify error message
body, err := io.ReadAll(response.Body)
require.NoError(t, err)

var errResponse structuredResponse
err = json.Unmarshal(body, &errResponse)
require.NoError(t, err)

require.Len(t, errResponse.Errors, 1)
assert.Equal(t, http.StatusInternalServerError, errResponse.Errors[0].Code)
assert.Equal(t, "storage error", errResponse.Errors[0].Msg)
})
}
5 changes: 5 additions & 0 deletions internal/storage/metricstore/disabled/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ func (*MetricsReader) GetErrorRates(context.Context, *metricstore.ErrorRateQuery
func (*MetricsReader) GetMinStepDuration(context.Context, *metricstore.MinStepDurationQueryParameters) (time.Duration, error) {
return 0, ErrDisabled
}

// GetAttributeValues gets the available values for a specific attribute.
func (*MetricsReader) GetAttributeValues(context.Context, *metricstore.AttributeValuesQueryParameters) ([]string, error) {
return nil, ErrDisabled
}
15 changes: 15 additions & 0 deletions internal/storage/metricstore/disabled/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,18 @@ func TestGetMinStepDurations(t *testing.T) {
require.ErrorIs(t, err, ErrDisabled)
require.EqualError(t, err, ErrDisabled.Error())
}

func TestGetLabelValues(t *testing.T) {
reader, err := NewMetricsReader()
require.NoError(t, err)
require.NotNil(t, reader)

qParams := &metricstore.AttributeValuesQueryParameters{
AttributeKey: "span_kind",
ServiceName: "emailservice",
}
r, err := reader.GetAttributeValues(context.Background(), qParams)
assert.Zero(t, r)
require.ErrorIs(t, err, ErrDisabled)
require.EqualError(t, err, ErrDisabled.Error())
}
35 changes: 35 additions & 0 deletions internal/storage/metricstore/elasticsearch/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package elasticsearch

import (
"context"
"fmt"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -114,6 +115,40 @@ func (*QueryBuilder) buildTimeSeriesAggQuery(params metricstore.BaseQueryParamet
return dateHistAgg
}

// This function builds an Elasticsearch query to fetch unique values for a specified attribute (tag), optionally filtered by a service name.
func (*QueryBuilder) BuildAttributeValuesQuery(params *metricstore.AttributeValuesQueryParameters) (elastic.Query, elastic.Aggregation) {
// Create a bool query to filter by service name if provided
boolQuery := elastic.NewBoolQuery()
if params.ServiceName != "" {
boolQuery.Filter(elastic.NewTermQuery("process.serviceName", params.ServiceName))
}

// Create a global aggregation that will contain all nested aggregations
globalAgg := elastic.NewGlobalAggregation()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this an aggregation query? Seems like you're doing something like GROUP BY, but isn't there a simpler SELECT DISTINCT flavor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we search in multiple nested documents (three currently), defined here

nestedTagFieldList = []string{nestedTagsField, nestedProcessTagsField, nestedLogFieldsField}

a simpler appraoch would work in case we just search in a given location. Like process.tags, but I feel predefining the location of the attributes is not needed.


for i, path := range spanstore.NestedTagFieldList {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you loop through all these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I ditched the ResourceTarget query parameter in favour of searching 3-4 places where a tag could be present in ES.

I took inspiration from the already working code in the search page where we filter by tag

func (s *SpanReader) buildTagQuery(k string, v string) elastic.Query {
objectTagListLen := len(objectTagFieldList)
queries := make([]elastic.Query, len(nestedTagFieldList)+objectTagListLen)
kd := s.dotReplacer.ReplaceDot(k)
for i := range objectTagFieldList {
queries[i] = s.buildObjectQuery(objectTagFieldList[i], kd, v)
}
for i := range nestedTagFieldList {
queries[i+objectTagListLen] = s.buildNestedQuery(nestedTagFieldList[i], k, v)
}
// but configuration can change over time
return elastic.NewBoolQuery().Should(queries...)

In span fitlter by tag, We only seem to search in 3 places, and its been working fine I guess?

nestedTagFieldList = []string{nestedTagsField, nestedProcessTagsField, nestedLogFieldsField}

We are doing something similar in the new api to get tag values. So instead of providing the exact path to search at, which you know requires the user to dig into ES and figure out the nested path, we always search in some predefined places.

// Create nested aggregation for each path
nestedAgg := elastic.NewNestedAggregation().Path(path)

filterAgg := elastic.NewFilterAggregation().
Filter(elastic.NewTermQuery(path+".key", params.AttributeKey))

// Get unique values
valuesAgg := elastic.NewTermsAggregation().
Field(path + ".value").
Size(100)

// Chain aggregations
filterAgg.SubAggregation("values", valuesAgg)
nestedAgg.SubAggregation("filtered_by_key", filterAgg)

// Add to global aggregation with unique name
globalAgg.SubAggregation(fmt.Sprintf("path_%d", i), nestedAgg)
}

return boolQuery, globalAgg
}

// Execute runs the Elasticsearch search with the provided bool and aggregation queries.
func (q *QueryBuilder) Execute(ctx context.Context, boolQuery elastic.BoolQuery, aggQuery elastic.Aggregation, timeRange TimeRange) (*elastic.SearchResult, error) {
indexName := q.cfg.Indices.IndexPrefix.Apply("jaeger-span-")
Expand Down
84 changes: 84 additions & 0 deletions internal/storage/metricstore/elasticsearch/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"math"
"strings"
"time"

"github.com/olivere/elastic/v7"
Expand Down Expand Up @@ -166,6 +167,89 @@ func (r MetricsReader) GetErrorRates(ctx context.Context, params *metricstore.Er
return CalculateErrorRates(rawErrorsMetrics, callRateMetrics, params.BaseQueryParameters, timeRange), nil
}

// GetAttributeValues implements metricstore.Reader.
func (r MetricsReader) GetAttributeValues(ctx context.Context, params *metricstore.AttributeValuesQueryParameters) ([]string, error) {
boolQuery, aggQuery := r.queryBuilder.BuildAttributeValuesQuery(params)

searchResult, err := r.executeSearchWithAggregation(ctx, boolQuery, aggQuery)
if err != nil {
return nil, fmt.Errorf("failed to execute attribute values query: %w", err)
}

// Collect values from all paths (path_0, path_1, path_2, etc.)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by path_0 ... ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is simply the alias/unique identifier for each nested aggregation. The path_, idx is simply the index of path in nestedTagFieldList .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its defined here

globalAgg.SubAggregation(fmt.Sprintf("path_%d", i), nestedAgg)

allValues := make(map[string]bool) // Use map to deduplicate

// The aggregation is wrapped in aggName ("results_buckets")
if resultsAgg, found := searchResult.Aggregations.Global(aggName); found {
// Look for path aggregations directly in results_buckets
for name := range resultsAgg.Aggregations {
if !strings.HasPrefix(name, "path_") {
continue
}
nestedAgg, _ := resultsAgg.Aggregations.Nested(name)
filterAgg, found := nestedAgg.Aggregations.Filter("filtered_by_key")
if !found {
continue
}
valuesAgg, found := filterAgg.Aggregations.Terms("values")
if !found {
continue
}
for _, bucket := range valuesAgg.Buckets {
if bucket.Key == nil {
continue
}
keyStr, ok := bucket.Key.(string)
if !ok {
keyStr = fmt.Sprintf("%v", bucket.Key)
}
allValues[keyStr] = true
}
}
}

// Convert map keys to slice
values := make([]string, 0, len(allValues))
for value := range allValues {
values = append(values, value)
}

return values, nil
}

// executeSearchWithAggregation is a helper method to execute a search with an aggregation
func (r MetricsReader) executeSearchWithAggregation(
ctx context.Context,
query elastic.Query,
aggQuery elastic.Aggregation,
) (*elastic.SearchResult, error) {
// Calculate a default time range for the last hour, keeping this low to reduce data volume
timeRange := TimeRange{
startTimeMillis: time.Now().Add(-1 * time.Hour).UnixMilli(),
endTimeMillis: time.Now().UnixMilli(),
extendedStartTimeMillis: time.Now().Add(-1 * time.Hour).UnixMilli(),
}

// Here we'll execute using a method similar to the QueryBuilder's Execute
// but using our own custom aggregation
searchRequest := elastic.NewSearchRequest()
searchRequest.Query(query)
searchRequest.Size(0) // Only interested in aggregations
searchRequest.Aggregation(aggName, aggQuery)

// Directly cast the query to BoolQuery
boolQuery, _ := query.(*elastic.BoolQuery)

metricsParams := MetricsQueryParams{
metricName: "attribute_values",
metricDesc: "Search for attribute values",
boolQuery: *boolQuery,
aggQuery: aggQuery,
}

return r.executeSearch(ctx, metricsParams, timeRange)
}

// GetMinStepDuration returns the minimum step duration.
func (MetricsReader) GetMinStepDuration(_ context.Context, _ *metricstore.MinStepDurationQueryParameters) (time.Duration, error) {
return minStep, nil
Expand Down
Loading
Loading