Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/query/app/query_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,15 @@ func (p *queryParser) parseMetricsQueryParams(r *http.Request) (bqp metricstore.
if err != nil {
return bqp, err
}

tags, err := p.parseTags(r.Form[tagParam], r.Form[tagsParam])
if err != nil {
return bqp, err
}
if len(tags) > 0 {
bqp.Tags = tags
}

bqp.EndTime = &endTs
bqp.Lookback = &lookback
bqp.Step = &step
Expand Down
22 changes: 22 additions & 0 deletions cmd/query/app/query_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,25 @@ func TestParameterErrors(t *testing.T) {
})
}
}

func TestParseMetricsTags(t *testing.T) {
t.Run("simple tags only", func(t *testing.T) {
request, err := http.NewRequest(http.MethodGet, "x?service=foo&step=1000&tag=key1:value1,key2:value2", http.NoBody)
require.NoError(t, err)
parser := &queryParser{
timeNow: time.Now,
}
mqp, err := parser.parseMetricsQueryParams(request)
require.NoError(t, err)
assert.Equal(t, time.Second, *mqp.Step)
})
t.Run("malformed simple tag", func(t *testing.T) {
request, err := http.NewRequest(http.MethodGet, "x?service=foo&step=1000&tag=keyWithoutValue", http.NoBody)
require.NoError(t, err)
parser := &queryParser{
timeNow: time.Now,
}
_, err = parser.parseMetricsQueryParams(request)
require.Error(t, err)
})
}
82 changes: 70 additions & 12 deletions internal/storage/metricstore/prometheus/metricstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type (
spanKindFilter string
serviceFilter string
rate string
tagFilters []string
}

metricsQueryParams struct {
Expand Down Expand Up @@ -134,13 +135,25 @@ func (m MetricsReader) GetLatencies(ctx context.Context, requestParams *metricst
metricName: "service_latencies",
metricDesc: fmt.Sprintf("%.2fth quantile latency, grouped by service", requestParams.Quantile),
buildPromQuery: func(p promQueryParams) string {
// Build filter string including service_name, span_kind, and tags
filters := []string{fmt.Sprintf(`service_name =~ %q`, p.serviceFilter)}

if p.spanKindFilter != "" {
filters = append(filters, p.spanKindFilter)
}

// Add tag filters if there are any
if len(p.tagFilters) > 0 {
filters = append(filters, p.tagFilters...)
}

filterStr := strings.Join(filters, ", ")

return fmt.Sprintf(
// Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection.
`histogram_quantile(%.2f, sum(rate(%s_bucket{service_name =~ %q, %s}[%s])) by (%s))`,
`histogram_quantile(%.2f, sum(rate(%s_bucket{%s}[%s])) by (%s))`,
requestParams.Quantile,
m.latencyMetricName,
p.serviceFilter,
p.spanKindFilter,
filterStr,
p.rate,
p.groupBy,
)
Expand Down Expand Up @@ -177,12 +190,24 @@ func (m MetricsReader) GetCallRates(ctx context.Context, requestParams *metricst
metricName: "service_call_rate",
metricDesc: "calls/sec, grouped by service",
buildPromQuery: func(p promQueryParams) string {
// Build filter string including service_name, span_kind, and tags
filters := []string{fmt.Sprintf(`service_name =~ %q`, p.serviceFilter)}

if p.spanKindFilter != "" {
filters = append(filters, p.spanKindFilter)
}

// Add tag filters if there are any
if len(p.tagFilters) > 0 {
filters = append(filters, p.tagFilters...)
}

filterStr := strings.Join(filters, ", ")

return fmt.Sprintf(
// Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection.
`sum(rate(%s{service_name =~ %q, %s}[%s])) by (%s)`,
`sum(rate(%s{%s}[%s])) by (%s)`,
m.callsMetricName,
p.serviceFilter,
p.spanKindFilter,
filterStr,
p.rate,
p.groupBy,
)
Expand Down Expand Up @@ -211,11 +236,32 @@ func (m MetricsReader) GetErrorRates(ctx context.Context, requestParams *metrics
metricName: "service_error_rate",
metricDesc: "error rate, computed as a fraction of errors/sec over calls/sec, grouped by service",
buildPromQuery: func(p promQueryParams) string {
// Build base filters for all queries (service_name)
baseFilters := []string{fmt.Sprintf(`service_name =~ %q`, p.serviceFilter)}

// Add status_code filter only for error rate numerator, must be right after service_name to match test expectations
errorFilters := append([]string{}, baseFilters...)
errorFilters = append(errorFilters, `status_code = "STATUS_CODE_ERROR"`)

// Add span_kind filter
if p.spanKindFilter != "" {
baseFilters = append(baseFilters, p.spanKindFilter)
errorFilters = append(errorFilters, p.spanKindFilter)
}

// Add tag filters if there are any
if len(p.tagFilters) > 0 {
baseFilters = append(baseFilters, p.tagFilters...)
errorFilters = append(errorFilters, p.tagFilters...)
}

errorFilterStr := strings.Join(errorFilters, ", ")
baseFilterStr := strings.Join(baseFilters, ", ")

return fmt.Sprintf(
// Note: p.spanKindFilter can be ""; trailing commas are okay within a timeseries selection.
`sum(rate(%s{service_name =~ %q, status_code = "STATUS_CODE_ERROR", %s}[%s])) by (%s) / sum(rate(%s{service_name =~ %q, %s}[%s])) by (%s)`,
m.callsMetricName, p.serviceFilter, p.spanKindFilter, p.rate, p.groupBy,
m.callsMetricName, p.serviceFilter, p.spanKindFilter, p.rate, p.groupBy,
`sum(rate(%s{%s}[%s])) by (%s) / sum(rate(%s{%s}[%s])) by (%s)`,
m.callsMetricName, errorFilterStr, p.rate, p.groupBy,
m.callsMetricName, baseFilterStr, p.rate, p.groupBy,
)
},
}
Expand Down Expand Up @@ -308,11 +354,23 @@ func (m MetricsReader) buildPromQuery(metricsParams metricsQueryParams) string {
if len(metricsParams.SpanKinds) > 0 {
spanKindFilter = fmt.Sprintf(`span_kind =~ %q`, strings.Join(metricsParams.SpanKinds, "|"))
}

// Build tag filters
var tagFilters []string
if len(metricsParams.Tags) > 0 {
for k, v := range metricsParams.Tags {
// Escape dots in key names for Prometheus compatibility
escapedKey := strings.ReplaceAll(k, ".", "_")
tagFilters = append(tagFilters, fmt.Sprintf(`%s=%q`, escapedKey, v))
}
}

promParams := promQueryParams{
serviceFilter: strings.Join(metricsParams.ServiceNames, "|"),
spanKindFilter: spanKindFilter,
rate: promqlDurationString(metricsParams.RatePer),
groupBy: strings.Join(groupBy, ","),
tagFilters: tagFilters,
}
return metricsParams.buildPromQuery(promParams)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type (
wantDescription string
wantLabels map[string]string
wantPromQlQuery string
tags map[string]string
}
)

Expand Down Expand Up @@ -162,6 +163,21 @@ func TestGetLatencies(t *testing.T) {
wantPromQlQuery: `histogram_quantile(0.95, sum(rate(duration_bucket{service_name =~ "emailservice", ` +
`span_kind =~ "SPAN_KIND_SERVER"}[10m])) by (service_name,le))`,
},
{
name: "tag filtering should be included in the query",
serviceNames: []string{"emailservice"},
spanKinds: []string{"SPAN_KIND_SERVER"},
tags: map[string]string{"http.method": "GET"},
groupByOperation: false,
wantName: "service_latencies",
wantDescription: "0.95th quantile latency, grouped by service",
wantLabels: map[string]string{
"service_name": "emailservice",
"http.method": "GET",
},
wantPromQlQuery: `histogram_quantile(0.95, sum(rate(duration_bucket{service_name =~ "emailservice", ` +
`span_kind =~ "SPAN_KIND_SERVER", http_method="GET"}[10m])) by (service_name,le))`,
},
{
name: "group by service and operation should be reflected in name/description and query group-by",
serviceNames: []string{"emailservice"},
Expand Down Expand Up @@ -265,6 +281,23 @@ func TestGetCallRates(t *testing.T) {
wantPromQlQuery: `sum(rate(calls{service_name =~ "emailservice", ` +
`span_kind =~ "SPAN_KIND_SERVER"}[10m])) by (service_name)`,
},
{
name: "tag filtering should be included in the query",
serviceNames: []string{"emailservice"},
spanKinds: []string{"SPAN_KIND_SERVER"},
groupByOperation: false,
tags: map[string]string{
"http.method": "GET",
},
wantName: "service_call_rate",
wantDescription: "calls/sec, grouped by service",
wantLabels: map[string]string{
"http.method": "GET",
"service_name": "emailservice",
},
wantPromQlQuery: `sum(rate(calls{service_name =~ "emailservice", span_kind =~ "SPAN_KIND_SERVER",` +
` http_method="GET"}[10m])) by (service_name)`,
},
{
name: "group by service and operation should be reflected in name/description and query group-by",
serviceNames: []string{"emailservice"},
Expand Down Expand Up @@ -366,6 +399,24 @@ func TestGetErrorRates(t *testing.T) {
`span_kind =~ "SPAN_KIND_SERVER"}[10m])) by (service_name) / ` +
`sum(rate(calls{service_name =~ "emailservice", span_kind =~ "SPAN_KIND_SERVER"}[10m])) by (service_name)`,
},
{
name: "tag filtering should be included in the query",
serviceNames: []string{"emailservice"},
spanKinds: []string{"SPAN_KIND_SERVER"},
groupByOperation: false,
tags: map[string]string{
"http.method": "GET",
},
wantName: "service_error_rate",
wantDescription: "error rate, computed as a fraction of errors/sec over calls/sec, grouped by service",
wantLabels: map[string]string{
"service_name": "emailservice",
"http.method": "GET",
},
wantPromQlQuery: `sum(rate(calls{service_name =~ "emailservice", status_code = "STATUS_CODE_ERROR", ` +
`span_kind =~ "SPAN_KIND_SERVER", http_method="GET"}[10m])) by (service_name) / ` +
`sum(rate(calls{service_name =~ "emailservice", span_kind =~ "SPAN_KIND_SERVER", http_method="GET"}[10m])) by (service_name)`,
},
{
name: "group by service and operation should be reflected in name/description and query group-by",
serviceNames: []string{"emailservice"},
Expand Down Expand Up @@ -937,6 +988,8 @@ func startMockPrometheusServer(t *testing.T, wantPromQlQuery string, wantWarning
mockResponsePayloadFile := "testdata/service_datapoint_response.json"
if strings.Contains(promQuery, "by (service_name,span_name") {
mockResponsePayloadFile = "testdata/service_span_name_datapoint_response.json"
} else if strings.Contains(promQuery, "http_method=") {
mockResponsePayloadFile = "testdata/service_with_tags_response.json"
}
sendResponse(t, w, mockResponsePayloadFile)
}))
Expand Down Expand Up @@ -964,6 +1017,7 @@ func buildTestBaseQueryParametersFrom(tc metricsTestCase) metricstore.BaseQueryP
Step: &step,
RatePer: &ratePer,
SpanKinds: tc.spanKinds,
Tags: tc.tags,
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"status": "success",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"service_name": "emailservice",
"http.method": "GET"
},
"values": [
[
1620351786,
"9223372036854"
]
]
}
]
}
}
2 changes: 2 additions & 0 deletions internal/storage/v1/api/metricstore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type BaseQueryParameters struct {
RatePer *time.Duration
// SpanKinds is the list of span kinds to include (logical OR) in the resulting metrics aggregation.
SpanKinds []string
// Tags is a map of tag keys and values to filter the metrics by.
Tags map[string]string
}

// LatenciesQueryParameters contains the parameters required for latency metrics queries.
Expand Down
Loading