Skip to content

Commit 59700e5

Browse files
authored
Merge pull request #84 from metrico/feat/chstats
feat: clickhouse stat scraper
2 parents e2edb79 + c92097b commit 59700e5

File tree

6 files changed

+284
-0
lines changed

6 files changed

+284
-0
lines changed

.github/workflows/ghcr.yml

+9
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ jobs:
1414
runs-on: ubuntu-latest
1515
steps:
1616
- uses: actions/checkout@v2
17+
- id: tag_bump
18+
name: Bump version and push tag
19+
uses: anothrNick/[email protected]
20+
env:
21+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
22+
DEFAULT_BUMP: patch
23+
RELEASE_BRANCHES: main
24+
PRERELEASE: true
1725
- name: Log in to the Container registry
1826
uses: docker/[email protected]
1927
with:
@@ -29,6 +37,7 @@ jobs:
2937
ghcr.io/metrico/qryn-otel-collector
3038
tags: |
3139
latest
40+
${{ steps.tag_bump.outputs.new_tag }}
3241
- name: Build and push
3342
uses: docker/[email protected]
3443
with:

cmd/otel-collector/components.go

+2
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ import (
165165

166166
"github.com/metrico/otel-collector/exporter/clickhouseprofileexporter"
167167
"github.com/metrico/otel-collector/exporter/qrynexporter"
168+
"github.com/metrico/otel-collector/receiver/chstatsreceiver"
168169
"github.com/metrico/otel-collector/receiver/pyroscopereceiver"
169170
)
170171

@@ -300,6 +301,7 @@ func components() (otelcol.Factories, error) {
300301
zipkinreceiver.NewFactory(),
301302
zookeeperreceiver.NewFactory(),
302303
pyroscopereceiver.NewFactory(),
304+
chstatsreceiver.NewFactory(),
303305
}
304306
for _, rcv := range factories.Receivers {
305307
receivers = append(receivers, rcv)

receiver/chstatsreceiver/README.md

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Pyroscope Receiver
2+
3+
| Status | |
4+
| ------------------------ |---------|
5+
| Stability | [beta] |
6+
| Supported pipeline types | metrics |
7+
8+
The chstatsreceiver module is a component of the OpenTelemetry collector that collects and exports metrics from ClickHouse databases. It uses the ClickHouse Go client library to connect to the database and execute SQL queries to retrieve metrics data. The module is designed to be highly configurable, allowing users to specify the database connection details, the SQL queries to execute, and the metrics to export.
9+
10+
## Configuration
11+
12+
- `dsn`: sets the Data Source Name (DSN) for the ClickHouse database.
13+
The DSN is a string that contains the necessary information to connect to the database,
14+
such as the host, port, and database name
15+
- `queries`: list of the SQL queries that the receiver will execute against the database to retrieve metrics data.
16+
The queries are specified as a list of strings.
17+
- `timeout`: amount of time between two consecutive stats requests iterations.
18+
The timeout is specified as the duration value like `20s`, `1m`, etc.
19+
20+
## Clickhouse Queries
21+
22+
Each clickhouse query should return two fields:
23+
- labels as array of Tuple(String, String)
24+
- value Float64
25+
26+
Labels should have the `__name__` label with the name of the metric.
27+
28+
For example
29+
```sql
30+
SELECT
31+
[('__name__', 'some_metric'), ('label2', 'val2')]::Array(Tuple(String,String)),
32+
2::Float64
33+
```
34+
35+
## Example
36+
37+
```yaml
38+
receivers:
39+
chstatsreceiver:
40+
dsn: clickhouse://localhost:9000
41+
queries:
42+
- |
43+
SELECT [
44+
('__name__', 'clickhouse_bytes_on_disk'), ('db', database), ('disk', disk_name), ('host', hostname())
45+
],
46+
sum(bytes_on_disk)::Float64
47+
FROM system.parts
48+
WHERE (active = 1) AND (database NOT IN ('system', '_system'))
49+
GROUP BY database, disk_name
50+
exporters:
51+
prometheusremotewrite:
52+
endpoint: http://localhost:3100/prom/remote/write
53+
timeout: 30s
54+
service:
55+
pipelines:
56+
metrics:
57+
receivers: [chstatsreceiver]
58+
exporters: [prometheusremotewrite]
59+
```

receiver/chstatsreceiver/config.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package chstatsreceiver
2+
3+
import (
4+
"fmt"
5+
"net/url"
6+
"time"
7+
8+
"go.opentelemetry.io/collector/component"
9+
)
10+
11+
// Represents the receiver config within the collector's config.yaml
12+
type Config struct {
13+
DSN string `mapstructure:"dsn"`
14+
Timeout time.Duration `mapstructure:"timeout"`
15+
Queries []string `mapstructure:"queries"`
16+
}
17+
18+
var _ component.Config = (*Config)(nil)
19+
20+
// Checks that the receiver configuration is valid
21+
func (cfg *Config) Validate() error {
22+
if cfg.Timeout < 15*time.Second {
23+
return fmt.Errorf("timeout must be at least 15 seconds")
24+
}
25+
chDSN, err := url.Parse(cfg.DSN)
26+
if err != nil {
27+
return fmt.Errorf("invalid dsn: %w", err)
28+
}
29+
if chDSN.Scheme != "clickhouse" {
30+
return fmt.Errorf("invalid dsn: scheme should be clickhouse://")
31+
}
32+
return nil
33+
}

receiver/chstatsreceiver/factory.go

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package chstatsreceiver
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"go.opentelemetry.io/collector/component"
8+
"go.opentelemetry.io/collector/consumer"
9+
"go.opentelemetry.io/collector/receiver"
10+
)
11+
12+
const (
13+
typeStr = "chstatsreceiver"
14+
defaultTimeout = 15 * time.Second
15+
)
16+
17+
func createDefaultConfig() component.Config {
18+
return &Config{
19+
DSN: "",
20+
Timeout: defaultTimeout,
21+
Queries: []string{},
22+
}
23+
}
24+
25+
func createMetricsReceiver(_ context.Context, set receiver.CreateSettings, cfg component.Config, consumer consumer.Metrics) (receiver.Metrics, error) {
26+
return &chReceiver{
27+
cfg: cfg.(*Config),
28+
logger: set.Logger,
29+
consumer: consumer,
30+
}, nil
31+
}
32+
33+
func NewFactory() receiver.Factory {
34+
return receiver.NewFactory(
35+
component.MustNewType(typeStr),
36+
createDefaultConfig,
37+
receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelAlpha))
38+
}

receiver/chstatsreceiver/receiver.go

+143
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package chstatsreceiver
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"text/template"
8+
"time"
9+
10+
"github.com/ClickHouse/clickhouse-go/v2"
11+
"go.opentelemetry.io/collector/component"
12+
"go.opentelemetry.io/collector/consumer"
13+
"go.opentelemetry.io/collector/pdata/pcommon"
14+
"go.opentelemetry.io/collector/pdata/pmetric"
15+
"go.uber.org/zap"
16+
)
17+
18+
type chReceiver struct {
19+
cfg *Config
20+
db clickhouse.Conn
21+
consumer consumer.Metrics
22+
templates []*template.Template
23+
logger *zap.Logger
24+
cancel context.CancelFunc
25+
ticker *time.Ticker
26+
}
27+
28+
func (r *chReceiver) Start(ctx context.Context, _ component.Host) error {
29+
opts, err := clickhouse.ParseDSN(r.cfg.DSN)
30+
if err != nil {
31+
return err
32+
}
33+
db, err := clickhouse.Open(opts)
34+
if err != nil {
35+
return err
36+
}
37+
r.db = db
38+
r.templates = make([]*template.Template, len(r.cfg.Queries))
39+
for i, query := range r.cfg.Queries {
40+
r.templates[i], err = template.New(fmt.Sprintf("tpl-%d", i)).Parse(query)
41+
if err != nil {
42+
return err
43+
}
44+
}
45+
46+
_ctx, cancel := context.WithCancel(ctx)
47+
r.cancel = cancel
48+
49+
r.ticker = time.NewTicker(r.cfg.Timeout)
50+
51+
go r.mainLoop(_ctx)
52+
return nil
53+
}
54+
55+
func (r *chReceiver) mainLoop(ctx context.Context) {
56+
for {
57+
r.logger.Info("tick start")
58+
select {
59+
case <-ctx.Done():
60+
fmt.Println("tick stop")
61+
return
62+
case <-r.ticker.C:
63+
err := r.GetMetrics(ctx)
64+
if err != nil {
65+
r.logger.Error("failed to get metrics", zap.Error(err))
66+
}
67+
}
68+
r.logger.Info("tick end")
69+
}
70+
}
71+
72+
func (r *chReceiver) GetMetrics(ctx context.Context) error {
73+
for _, tpl := range r.templates {
74+
err := r.getMetricsTemplate(ctx, tpl)
75+
if err != nil {
76+
return err
77+
}
78+
}
79+
return nil
80+
}
81+
82+
func (r *chReceiver) getMetricsTemplate(ctx context.Context, tpl *template.Template) error {
83+
queryBuf := bytes.Buffer{}
84+
params := map[string]any{
85+
"timestamp_ns": time.Now().UnixNano(),
86+
"timestamp_ms": time.Now().UnixMilli(),
87+
"timestamp_s": time.Now().Unix(),
88+
}
89+
err := tpl.Execute(&queryBuf, params)
90+
wrapErr := func(err error) error {
91+
return fmt.Errorf("failed to execute. Query: %s; error: %w", queryBuf.String(), err)
92+
}
93+
if err != nil {
94+
return wrapErr(err)
95+
}
96+
rows, err := r.db.Query(ctx, queryBuf.String())
97+
if err != nil {
98+
return wrapErr(err)
99+
}
100+
defer rows.Close()
101+
for rows.Next() {
102+
var (
103+
labels [][]string
104+
value float64
105+
)
106+
err = rows.Scan(&labels, &value)
107+
if err != nil {
108+
return wrapErr(err)
109+
}
110+
metrics := pmetric.NewMetrics()
111+
res := metrics.ResourceMetrics().AppendEmpty()
112+
res.Resource().Attributes()
113+
metric := res.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
114+
data := metric.SetEmptyGauge().DataPoints().AppendEmpty()
115+
for _, label := range labels {
116+
if label[0] == "__name__" {
117+
metric.SetName(label[1])
118+
continue
119+
}
120+
data.Attributes().PutStr(label[0], label[1])
121+
}
122+
data.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
123+
data.SetDoubleValue(value)
124+
select {
125+
case <-ctx.Done():
126+
return nil
127+
default:
128+
err = r.consumer.ConsumeMetrics(ctx, metrics)
129+
if err != nil {
130+
return wrapErr(err)
131+
}
132+
}
133+
}
134+
return nil
135+
}
136+
137+
func (r *chReceiver) Shutdown(_ context.Context) error {
138+
fmt.Println("shutting down")
139+
r.cancel()
140+
r.ticker.Stop()
141+
_ = r.db.Close()
142+
return nil
143+
}

0 commit comments

Comments
 (0)