Skip to content

feat(tagged): add an option to query clickhouse for term weights in tagged queries #304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ type ClickHouse struct {
IndexReverses IndexReverses `toml:"index-reverses" json:"index-reverses" comment:"see doc/config.md" commented:"true"`
IndexTimeout time.Duration `toml:"index-timeout" json:"index-timeout" comment:"total timeout to fetch series list from index"`
TaggedTable string `toml:"tagged-table" json:"tagged-table" comment:"'tagged' table from carbon-clickhouse, required for seriesByTag"`
Tag1CountTable string `toml:"tag1-count-table" json:"tag1-count-table" comment:"Table that contains the amounts of rows with each tag1 value, used to determine which Tag1 value to use in a query based on its cardinality. If left empty, basic sorting will be used."`
TaggedAutocompleDays int `toml:"tagged-autocomplete-days" json:"tagged-autocomplete-days" comment:"or how long the daemon will query tags during autocomplete"`
TaggedUseDaily bool `toml:"tagged-use-daily" json:"tagged-use-daily" comment:"whether to use date filter when searching for the metrics in the tagged-table"`
TaggedCosts map[string]*Costs `toml:"tagged-costs" json:"tagged-costs" comment:"costs for tags (for tune which tag will be used as primary), by default is 0, increase for costly (with poor selectivity) tags" commented:"true"`
Expand Down
28 changes: 27 additions & 1 deletion deploy/doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,30 @@ When `reverse = true` is set for data-table, there are two possibles cases for [
Depends on it for having a proper retention and aggregation you must additionally set `rollup-use-reverted = true` for the first case and `rollup-use-reverted = false` for the second.

#### Additional tuning tagged find for seriesByTag and autocomplete
Only one tag used as filter for index field Tag1, see graphite_tagged table [structure](https://github.com/lomik/
Only one tag used as filter for index field Tag1, see graphite_tagged table [structure](https://github.com/lomik/

To always choose the best Tag1 you can set the parameter `tag1-count-table = <table_name>`. The value should be a table in clickhouse that has columns (Date, Tag1, Count) similar to the graphite_tagged table. The table can be defined like this:

```
CREATE TABLE IF NOT EXISTS default.tag1_count_per_day
(
Date Date,
Tag1 String,
Count UInt64
)
ENGINE = SummingMergeTree
ORDER BY (Date, Tag1);

CREATE MATERIALIZED VIEW IF NOT EXISTS default.tag1_count_per_day_mv TO default.tag1_count_per_day AS
SELECT Date AS Date,
Tag1 AS Tag1,
count(*) AS Count
FROM default.graphite_tags
GROUP BY (Date, Tag1);
```

Here we additionally create a materialized view to automatically save the quantities of rows with each unique Tag1 as the metrics are being written.
graphite-clickhouse will query this table when it tries to decide which tag should be used when querying graphite_tagged table.
Overall using this parameter will somewhat increase writing load but can improve reading tagged metrics greatly in some cases.

Note that this option only works for terms with '=' operator in them.
29 changes: 29 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,33 @@ Depends on it for having a proper retention and aggregation you must additionall

#### Additional tuning tagged find for seriesByTag and autocomplete
Only one tag used as filter for index field Tag1, see graphite_tagged table [structure](https://github.com/lomik/

To always choose the best Tag1 you can set the parameter `tag1-count-table = <table_name>`. The value should be a table in clickhouse that has columns (Date, Tag1, Count) similar to the graphite_tagged table. The table can be defined like this:

```
CREATE TABLE IF NOT EXISTS default.tag1_count_per_day
(
Date Date,
Tag1 String,
Count UInt64
)
ENGINE = SummingMergeTree
ORDER BY (Date, Tag1);

CREATE MATERIALIZED VIEW IF NOT EXISTS default.tag1_count_per_day_mv TO default.tag1_count_per_day AS
SELECT Date AS Date,
Tag1 AS Tag1,
count(*) AS Count
FROM default.graphite_tags
GROUP BY (Date, Tag1);
```

Here we additionally create a materialized view to automatically save the quantities of rows with each unique Tag1 as the metrics are being written.
graphite-clickhouse will query this table when it tries to decide which tag should be used when querying graphite_tagged table.
Overall using this parameter will somewhat increase writing load but can improve reading tagged metrics greatly in some cases.

Note that this option only works for terms with '=' operator in them.

```toml
[common]
# general listener
Expand Down Expand Up @@ -353,6 +380,8 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
index-timeout = "1m0s"
# 'tagged' table from carbon-clickhouse, required for seriesByTag
tagged-table = "graphite_tagged"
# Table that contains the amounts of rows with each tag1 value, used to determine which Tag1 value to use in a query based on its cardinality. If left empty, basic sorting will be used.
tag1-count-table = ""
# or how long the daemon will query tags during autocomplete
tagged-autocomplete-days = 7
# whether to use date filter when searching for the metrics in the tagged-table
Expand Down
2 changes: 2 additions & 0 deletions finder/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func newPlainFinder(ctx context.Context, config *config.Config, query string, fr
f = NewTagged(
config.ClickHouse.URL,
config.ClickHouse.TaggedTable,
config.ClickHouse.Tag1CountTable,
config.ClickHouse.TaggedUseDaily,
config.FeatureFlags.UseCarbonBehavior,
config.FeatureFlags.DontMatchMissingTags,
Expand Down Expand Up @@ -147,6 +148,7 @@ func FindTagged(ctx context.Context, config *config.Config, terms []TaggedTerm,
fnd := NewTagged(
config.ClickHouse.URL,
config.ClickHouse.TaggedTable,
config.ClickHouse.Tag1CountTable,
config.ClickHouse.TaggedUseDaily,
config.FeatureFlags.UseCarbonBehavior,
config.FeatureFlags.DontMatchMissingTags,
Expand Down
225 changes: 177 additions & 48 deletions finder/tagged.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"sort"
"strconv"
"strings"

"github.com/lomik/graphite-clickhouse/config"
Expand All @@ -19,7 +20,8 @@ import (
)

var (
ErrCostlySeriesByTag = errs.NewErrorWithCode("seriesByTag argument has too much wildcard and regex terms", http.StatusForbidden)
ErrCostlySeriesByTag = errs.NewErrorWithCode("seriesByTag argument has too much wildcard and regex terms", http.StatusForbidden)
ErrInvalidSeriesByTag = errs.NewErrorWithCode("wrong seriesByTag call", http.StatusBadRequest)
)

type TaggedTermOp int
Expand Down Expand Up @@ -73,26 +75,30 @@ func (s TaggedTermList) Less(i, j int) bool {
type TaggedFinder struct {
url string // clickhouse dsn
table string // graphite_tag table
tag1CountTable string // table that helps to choose the most optimal Tag1
absKeepEncoded bool // Abs returns url encoded value. For queries from prometheus
opts clickhouse.Options // clickhouse query timeout
taggedCosts map[string]*config.Costs // costs for taggs (sor tune index search)
dailyEnabled bool
useCarbonBehavior bool
dontMatchMissingTags bool
metricMightExists bool // if false, skip all subsequent queries because we determined that result will be empty anyway

body []byte // clickhouse response
}

func NewTagged(url string, table string, dailyEnabled, useCarbonBehavior, dontMatchMissingTags, absKeepEncoded bool, opts clickhouse.Options, taggedCosts map[string]*config.Costs) *TaggedFinder {
func NewTagged(url string, table, tag1CountTable string, dailyEnabled, useCarbonBehavior, dontMatchMissingTags, absKeepEncoded bool, opts clickhouse.Options, taggedCosts map[string]*config.Costs) *TaggedFinder {
return &TaggedFinder{
url: url,
table: table,
tag1CountTable: tag1CountTable,
absKeepEncoded: absKeepEncoded,
opts: opts,
taggedCosts: taggedCosts,
dailyEnabled: dailyEnabled,
useCarbonBehavior: useCarbonBehavior,
dontMatchMissingTags: dontMatchMissingTags,
metricMightExists: true,
}
}

Expand Down Expand Up @@ -305,12 +311,8 @@ func ParseTaggedConditions(conditions []string, config *config.Config, autocompl
default:
return nil, fmt.Errorf("wrong seriesByTag expr: %#v", s)
}
if len(config.ClickHouse.TaggedCosts) > 0 {
if costs, ok := config.ClickHouse.TaggedCosts[terms[i].Key]; ok {
setCost(&terms[i], costs)
}
}
}

if autocomplete {
if config.ClickHouse.TagsMinInAutocomplete > 0 && nonWildcards < config.ClickHouse.TagsMinInAutocomplete {
return nil, ErrCostlySeriesByTag
Expand All @@ -319,48 +321,9 @@ func ParseTaggedConditions(conditions []string, config *config.Config, autocompl
return nil, ErrCostlySeriesByTag
}

if len(config.ClickHouse.TaggedCosts) == 0 {
sort.Sort(TaggedTermList(terms))
} else {
// compare with taggs costs
sort.Slice(terms, func(i, j int) bool {
// compare taggs costs, if all of TaggegTerms has custom cost.
// this is allow overwrite operators order (Eq with or without wildcards/Match), use with carefully
if terms[i].Cost != terms[j].Cost {
if terms[i].NonDefaultCost && terms[j].NonDefaultCost ||
(terms[i].NonDefaultCost && terms[j].Op == TaggedTermEq && !terms[j].HasWildcard) ||
(terms[j].NonDefaultCost && terms[i].Op == TaggedTermEq && !terms[i].HasWildcard) {
return terms[i].Cost < terms[j].Cost
}
}

if terms[i].Op == terms[j].Op {
if terms[i].Op == TaggedTermEq && !terms[i].HasWildcard && terms[j].HasWildcard {
// globs as fist eq might be have a bad perfomance
return true
}

if terms[i].Key == "__name__" && terms[j].Key != "__name__" {
return true
}

if terms[i].Cost != terms[j].Cost && terms[i].HasWildcard == terms[j].HasWildcard {
// compare taggs costs
return terms[i].Cost < terms[j].Cost
}

return false
} else {
return terms[i].Op < terms[j].Op
}
})
}

return terms, nil
}

var ErrInvalidSeriesByTag = errs.NewErrorWithCode("wrong seriesByTag call", http.StatusBadRequest)

func parseString(s string) (string, string, error) {
if s[0] != '\'' && s[0] != '"' {
panic("string should start with open quote")
Expand Down Expand Up @@ -458,11 +421,13 @@ func NewCachedTags(body []byte) *TaggedFinder {
}

func (t *TaggedFinder) Execute(ctx context.Context, config *config.Config, query string, from int64, until int64, stat *FinderStat) error {
terms, err := ParseSeriesByTag(query, config)
terms, err := t.PrepareTaggedTerms(ctx, config, query, from, until, stat)
if err != nil {
return err
}

if !t.metricMightExists {
return nil
}
return t.ExecutePrepared(ctx, terms, from, until, stat)
}

Expand Down Expand Up @@ -580,3 +545,167 @@ func (t *TaggedFinder) Abs(v []byte) []byte {
func (t *TaggedFinder) Bytes() ([]byte, error) {
return nil, ErrNotImplemented
}

func (t *TaggedFinder) PrepareTaggedTerms(ctx context.Context, cfg *config.Config, query string, from int64, until int64, stat *FinderStat) (terms []TaggedTerm, err error) {
terms, err = ParseSeriesByTag(query, cfg)
if err != nil {
return nil, err
}

if t.tag1CountTable != "" {
err = t.SetCostsFromCountTable(ctx, terms, from, until, stat)
if err != nil {
return nil, err
}
if !t.metricMightExists {
return nil, nil
}
}

if len(t.taggedCosts) != 0 {
SetCosts(terms, t.taggedCosts)
SortTaggedTermsByCost(terms)
} else {
sort.Sort(TaggedTermList(terms))
}

return terms, nil
}

func SortTaggedTermsByCost(terms []TaggedTerm) {
// compare with taggs costs
sort.Slice(terms, func(i, j int) bool {
// compare taggs costs, if all of TaggegTerms has custom cost.
// this is allow overwrite operators order (Eq with or without wildcards/Match), use with carefully
if terms[i].Cost != terms[j].Cost {
if terms[i].NonDefaultCost && terms[j].NonDefaultCost ||
(terms[i].NonDefaultCost && terms[j].Op == TaggedTermEq && !terms[j].HasWildcard) ||
(terms[j].NonDefaultCost && terms[i].Op == TaggedTermEq && !terms[i].HasWildcard) {
return terms[i].Cost < terms[j].Cost
}
}

if terms[i].Op == terms[j].Op {
if terms[i].Op == TaggedTermEq && !terms[i].HasWildcard && terms[j].HasWildcard {
// globs as fist eq might be have a bad perfomance
return true
}

if terms[i].Key == "__name__" && terms[j].Key != "__name__" {
return true
}

if terms[i].Cost != terms[j].Cost && terms[i].HasWildcard == terms[j].HasWildcard {
// compare taggs costs
return terms[i].Cost < terms[j].Cost
}

return false
} else {
return terms[i].Op < terms[j].Op
}
})
}

func (t *TaggedFinder) SetCostsFromCountTable(ctx context.Context, terms []TaggedTerm, from int64, until int64, stat *FinderStat) error {
w := where.New()
eqTermCount := 0

for i := 0; i < len(terms); i++ {
if terms[i].Op == TaggedTermEq && !terms[i].HasWildcard && terms[i].Value != "" {
sqlTerm, err := TaggedTermWhere1(&terms[i], t.useCarbonBehavior, t.dontMatchMissingTags)
if err != nil {
return err
}
w.Or(sqlTerm)
eqTermCount++
}
}

if w.SQL() == "" {
return nil
}

if t.dailyEnabled {
w.Andf(
"Date >= '%s' AND Date <= '%s'",
date.FromTimestampToDaysFormat(from),
date.UntilTimestampToDaysFormat(until),
)
} else {

w.Andf(
"Date >= '%s'",
date.FromTimestampToDaysFormat(from),
)
}

sql := fmt.Sprintf("SELECT Tag1, sum(Count) as cnt FROM %s %s GROUP BY Tag1 FORMAT TabSeparatedRaw", t.tag1CountTable, w.SQL())

var err error
t.body, _, _, err = clickhouse.Query(scope.WithTable(ctx, t.tag1CountTable), t.url, sql, t.opts, nil)
if err != nil {
return err
}

rows := t.List()

t.taggedCosts, err = chResultToCosts(rows)
if err != nil {
return err
}

// The metric does not exist if the response has less rows
// than there were tags with '=' op in the initial request
// This is due to each tag-value pair of a metric being written
// exactly one time as Tag1
if len(rows) < eqTermCount {
t.body = []byte{}
t.metricMightExists = false
return nil
}

return nil
}

func SetCosts(terms []TaggedTerm, costs map[string]*config.Costs) {
for i := 0; i < len(terms); i++ {
if cost, ok := costs[terms[i].Key]; ok {
setCost(&terms[i], cost)
}
}
}

func chResultToCosts(body [][]byte) (map[string]*config.Costs, error) {
costs := make(map[string]*config.Costs, 0)
for i := 0; i < len(body); i++ {
s := stringutils.UnsafeString(body[i])
tag, val, count, err := parseTag1CountRow(s)
if err != nil {
return nil, fmt.Errorf("failed to parse result from clickhouse while querying for tag costs: %s", err.Error())
}
if costs[tag] == nil {
costs[tag] = &config.Costs{Cost: nil, ValuesCost: make(map[string]int, 0)}
}
costs[tag].ValuesCost[val] = count
}
return costs, nil
}

func parseTag1CountRow(s string) (string, string, int, error) {
var (
tag1, count, tag, val string
cnt, n int
err error
)
if tag1, count, n = stringutils.Split2(s, "\t"); n != 2 {
return "", "", 0, fmt.Errorf("no tag count")
}
if tag, val, n = stringutils.Split2(tag1, "="); n != 2 {
return "", "", 0, fmt.Errorf("no '=' in Tag1")
}
if cnt, err = strconv.Atoi(count); err != nil {
return "", "", 0, fmt.Errorf("can't convert count to int")
}
return tag, val, cnt, nil
}
Loading
Loading