Skip to content

Commit 0debdf3

Browse files
marctcrfratto
andcommitted
Refactor collectors to not depend of kingpin to be configured.
This commit removes references to kingpin.CommandLine, allowing for the collector package to be used and configured with a custom kingpin (or no kingpin at all). The configuration for collectors has been moved to struct fields, which the kingpin flags populate at flag parse time. Co-authored-by: Robert Fratto <[email protected]> Signed-off-by: Marc Tuduri <[email protected]>
1 parent c8b9064 commit 0debdf3

16 files changed

+240
-173
lines changed

collector/exporter.go

+16-10
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,23 @@ var (
5050
versionRE = regexp.MustCompile(`^\d+\.\d+`)
5151
)
5252

53-
// Tunable flags.
54-
var (
55-
exporterLockTimeout = kingpin.Flag(
53+
// Config holds configuration options for the exporter.
54+
type Config struct {
55+
LockTimeout int
56+
SlowLogFilter bool
57+
}
58+
59+
// RegisterFlags adds flags to configure the exporter.
60+
func (c *Config) RegisterFlags(application *kingpin.Application) {
61+
application.Flag(
5662
"exporter.lock_wait_timeout",
5763
"Set a lock_wait_timeout (in seconds) on the connection to avoid long metadata locking.",
58-
).Default("2").Int()
59-
slowLogFilter = kingpin.Flag(
64+
).Default("2").IntVar(&c.LockTimeout)
65+
application.Flag(
6066
"exporter.log_slow_filter",
6167
"Add a log_slow_filter to avoid slow query logging of scrapes. NOTE: Not supported by Oracle MySQL.",
62-
).Default("false").Bool()
63-
)
68+
).Default("false").BoolVar(&c.SlowLogFilter)
69+
}
6470

6571
// metric definition
6672
var (
@@ -95,11 +101,11 @@ type Exporter struct {
95101
}
96102

97103
// New returns a new MySQL exporter for the provided DSN.
98-
func New(ctx context.Context, dsn string, scrapers []Scraper, logger log.Logger) *Exporter {
104+
func New(ctx context.Context, dsn string, scrapers []Scraper, logger log.Logger, cfg Config) *Exporter {
99105
// Setup extra params for the DSN, default to having a lock timeout.
100-
dsnParams := []string{fmt.Sprintf(timeoutParam, *exporterLockTimeout)}
106+
dsnParams := []string{fmt.Sprintf(timeoutParam, cfg.LockTimeout)}
101107

102-
if *slowLogFilter {
108+
if cfg.SlowLogFilter {
103109
dsnParams = append(dsnParams, sessionSettingsParam)
104110
}
105111

collector/exporter_test.go

+10
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"os"
2020
"testing"
2121

22+
"github.com/alecthomas/kingpin/v2"
2223
"github.com/go-kit/log"
2324
"github.com/go-kit/log/level"
2425
"github.com/prometheus/client_golang/prometheus"
@@ -33,13 +34,22 @@ func TestExporter(t *testing.T) {
3334
t.Skip("-short is passed, skipping test")
3435
}
3536

37+
var exporterConfig Config
38+
kingpinApp := kingpin.New("TestExporter", "")
39+
exporterConfig.RegisterFlags(kingpinApp)
40+
_, err := kingpinApp.Parse([]string{})
41+
if err != nil {
42+
t.Fatal(err)
43+
}
44+
3645
exporter := New(
3746
context.Background(),
3847
dsn,
3948
[]Scraper{
4049
ScrapeGlobalStatus{},
4150
},
4251
log.NewNopLogger(),
52+
exporterConfig,
4353
)
4454

4555
convey.Convey("Metrics describing", t, func() {

collector/heartbeat.go

+25-20
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,6 @@ const (
3636
heartbeatQuery = "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(%s), server_id from `%s`.`%s`"
3737
)
3838

39-
var (
40-
collectHeartbeatDatabase = kingpin.Flag(
41-
"collect.heartbeat.database",
42-
"Database from where to collect heartbeat data",
43-
).Default("heartbeat").String()
44-
collectHeartbeatTable = kingpin.Flag(
45-
"collect.heartbeat.table",
46-
"Table from where to collect heartbeat data",
47-
).Default("heartbeat").String()
48-
collectHeartbeatUtc = kingpin.Flag(
49-
"collect.heartbeat.utc",
50-
"Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)",
51-
).Bool()
52-
)
53-
5439
// Metric descriptors.
5540
var (
5641
HeartbeatStoredDesc = prometheus.NewDesc(
@@ -74,7 +59,11 @@ var (
7459
// server_id int unsigned NOT NULL PRIMARY KEY,
7560
//
7661
// );
77-
type ScrapeHeartbeat struct{}
62+
type ScrapeHeartbeat struct {
63+
Database string
64+
Table string
65+
UTC bool
66+
}
7867

7968
// Name of the Scraper. Should be unique.
8069
func (ScrapeHeartbeat) Name() string {
@@ -91,17 +80,33 @@ func (ScrapeHeartbeat) Version() float64 {
9180
return 5.1
9281
}
9382

83+
// RegisterFlags adds flags to configure the Scraper.
84+
func (s *ScrapeHeartbeat) RegisterFlags(application *kingpin.Application) {
85+
application.Flag(
86+
"collect.heartbeat.database",
87+
"Database from where to collect heartbeat data",
88+
).Default("heartbeat").StringVar(&s.Database)
89+
application.Flag(
90+
"collect.heartbeat.table",
91+
"Table from where to collect heartbeat data",
92+
).Default("heartbeat").StringVar(&s.Table)
93+
application.Flag(
94+
"collect.heartbeat.utc",
95+
"Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)",
96+
).BoolVar(&s.UTC)
97+
}
98+
9499
// nowExpr returns a current timestamp expression.
95-
func nowExpr() string {
96-
if *collectHeartbeatUtc {
100+
func (s ScrapeHeartbeat) nowExpr() string {
101+
if s.UTC {
97102
return "UTC_TIMESTAMP(6)"
98103
}
99104
return "NOW(6)"
100105
}
101106

102107
// Scrape collects data from database connection and sends it over channel as prometheus metric.
103-
func (ScrapeHeartbeat) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
104-
query := fmt.Sprintf(heartbeatQuery, nowExpr(), *collectHeartbeatDatabase, *collectHeartbeatTable)
108+
func (s ScrapeHeartbeat) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
109+
query := fmt.Sprintf(heartbeatQuery, s.nowExpr(), s.Database, s.Table)
105110
heartbeatRows, err := db.QueryContext(ctx, query)
106111
if err != nil {
107112
return err

collector/heartbeat_test.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,12 @@ var ScrapeHeartbeatTestCases = []ScrapeHeartbeatTestCase{
5555
func TestScrapeHeartbeat(t *testing.T) {
5656
for _, tt := range ScrapeHeartbeatTestCases {
5757
t.Run(fmt.Sprint(tt.Args), func(t *testing.T) {
58-
_, err := kingpin.CommandLine.Parse(tt.Args)
58+
scraper := ScrapeHeartbeat{}
59+
60+
app := kingpin.New("TestScrapeHeartbeat", "")
61+
scraper.RegisterFlags(app)
62+
63+
_, err := app.Parse(tt.Args)
5964
if err != nil {
6065
t.Fatal(err)
6166
}
@@ -72,7 +77,7 @@ func TestScrapeHeartbeat(t *testing.T) {
7277

7378
ch := make(chan prometheus.Metric)
7479
go func() {
75-
if err = (ScrapeHeartbeat{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
80+
if err = scraper.Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
7681
t.Errorf("error calling function on test: %s", err)
7782
}
7883
close(ch)

collector/info_schema_processlist.go

+26-21
Original file line numberDiff line numberDiff line change
@@ -42,22 +42,6 @@ const infoSchemaProcesslistQuery = `
4242
GROUP BY user, SUBSTRING_INDEX(host, ':', 1), command, state
4343
`
4444

45-
// Tunable flags.
46-
var (
47-
processlistMinTime = kingpin.Flag(
48-
"collect.info_schema.processlist.min_time",
49-
"Minimum time a thread must be in each state to be counted",
50-
).Default("0").Int()
51-
processesByUserFlag = kingpin.Flag(
52-
"collect.info_schema.processlist.processes_by_user",
53-
"Enable collecting the number of processes by user",
54-
).Default("true").Bool()
55-
processesByHostFlag = kingpin.Flag(
56-
"collect.info_schema.processlist.processes_by_host",
57-
"Enable collecting the number of processes by host",
58-
).Default("true").Bool()
59-
)
60-
6145
// Metric descriptors.
6246
var (
6347
processlistCountDesc = prometheus.NewDesc(
@@ -79,7 +63,11 @@ var (
7963
)
8064

8165
// ScrapeProcesslist collects from `information_schema.processlist`.
82-
type ScrapeProcesslist struct{}
66+
type ScrapeProcesslist struct {
67+
ProcessListMinTime int
68+
ProcessesByUserFlag bool
69+
ProcessesByHostFlag bool
70+
}
8371

8472
// Name of the Scraper. Should be unique.
8573
func (ScrapeProcesslist) Name() string {
@@ -96,11 +84,27 @@ func (ScrapeProcesslist) Version() float64 {
9684
return 5.1
9785
}
9886

87+
// RegisterFlags adds flags to configure the Scraper.
88+
func (s *ScrapeProcesslist) RegisterFlags(application *kingpin.Application) {
89+
application.Flag(
90+
"collect.info_schema.processlist.min_time",
91+
"Minimum time a thread must be in each state to be counted",
92+
).Default("0").IntVar(&s.ProcessListMinTime)
93+
application.Flag(
94+
"collect.info_schema.processlist.processes_by_user",
95+
"Enable collecting the number of processes by user",
96+
).Default("true").BoolVar(&s.ProcessesByUserFlag)
97+
application.Flag(
98+
"collect.info_schema.processlist.processes_by_host",
99+
"Enable collecting the number of processes by host",
100+
).Default("true").BoolVar(&s.ProcessesByHostFlag)
101+
}
102+
99103
// Scrape collects data from database connection and sends it over channel as prometheus metric.
100-
func (ScrapeProcesslist) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
104+
func (s ScrapeProcesslist) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
101105
processQuery := fmt.Sprintf(
102106
infoSchemaProcesslistQuery,
103-
*processlistMinTime,
107+
s.ProcessListMinTime,
104108
)
105109
processlistRows, err := db.QueryContext(ctx, processQuery)
106110
if err != nil {
@@ -162,12 +166,13 @@ func (ScrapeProcesslist) Scrape(ctx context.Context, db *sql.DB, ch chan<- prome
162166
}
163167
}
164168

165-
if *processesByHostFlag {
169+
if s.ProcessesByHostFlag {
166170
for _, host := range sortedMapKeys(stateHostCounts) {
167171
ch <- prometheus.MustNewConstMetric(processesByHostDesc, prometheus.GaugeValue, float64(stateHostCounts[host]), host)
168172
}
169173
}
170-
if *processesByUserFlag {
174+
175+
if s.ProcessesByUserFlag {
171176
for _, user := range sortedMapKeys(stateUserCounts) {
172177
ch <- prometheus.MustNewConstMetric(processesByUserDesc, prometheus.GaugeValue, float64(stateUserCounts[user]), user)
173178
}

collector/info_schema_processlist_test.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@ import (
2727
)
2828

2929
func TestScrapeProcesslist(t *testing.T) {
30-
_, err := kingpin.CommandLine.Parse([]string{
30+
scraper := ScrapeProcesslist{}
31+
app := kingpin.New("TestScrapeProcesslist", "")
32+
scraper.RegisterFlags(app)
33+
34+
_, err := app.Parse([]string{
3135
"--collect.info_schema.processlist.processes_by_user",
3236
"--collect.info_schema.processlist.processes_by_host",
3337
})
@@ -56,7 +60,7 @@ func TestScrapeProcesslist(t *testing.T) {
5660

5761
ch := make(chan prometheus.Metric)
5862
go func() {
59-
if err = (ScrapeProcesslist{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
63+
if err = scraper.Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
6064
t.Errorf("error calling function on test: %s", err)
6165
}
6266
close(ch)

collector/info_schema_tables.go

+14-12
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,6 @@ const (
5151
`
5252
)
5353

54-
// Tunable flags.
55-
var (
56-
tableSchemaDatabases = kingpin.Flag(
57-
"collect.info_schema.tables.databases",
58-
"The list of databases to collect table stats for, or '*' for all",
59-
).Default("*").String()
60-
)
61-
6254
// Metric descriptors.
6355
var (
6456
infoSchemaTablesVersionDesc = prometheus.NewDesc(
@@ -79,7 +71,9 @@ var (
7971
)
8072

8173
// ScrapeTableSchema collects from `information_schema.tables`.
82-
type ScrapeTableSchema struct{}
74+
type ScrapeTableSchema struct {
75+
Databases string
76+
}
8377

8478
// Name of the Scraper. Should be unique.
8579
func (ScrapeTableSchema) Name() string {
@@ -96,10 +90,18 @@ func (ScrapeTableSchema) Version() float64 {
9690
return 5.1
9791
}
9892

93+
// RegisterFlags adds flags to configure the Scraper.
94+
func (s *ScrapeTableSchema) RegisterFlags(application *kingpin.Application) {
95+
application.Flag(
96+
"collect.info_schema.tables.databases",
97+
"The list of databases to collect table stats for, or '*' for all",
98+
).Default("*").StringVar(&s.Databases)
99+
}
100+
99101
// Scrape collects data from database connection and sends it over channel as prometheus metric.
100-
func (ScrapeTableSchema) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
102+
func (s ScrapeTableSchema) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
101103
var dbList []string
102-
if *tableSchemaDatabases == "*" {
104+
if s.Databases == "*" {
103105
dbListRows, err := db.QueryContext(ctx, dbListQuery)
104106
if err != nil {
105107
return err
@@ -117,7 +119,7 @@ func (ScrapeTableSchema) Scrape(ctx context.Context, db *sql.DB, ch chan<- prome
117119
dbList = append(dbList, database)
118120
}
119121
} else {
120-
dbList = strings.Split(*tableSchemaDatabases, ",")
122+
dbList = strings.Split(s.Databases, ",")
121123
}
122124

123125
for _, database := range dbList {

collector/mysql_user.go

+13-11
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,6 @@ const mysqlUserQuery = `
6969
FROM mysql.user
7070
`
7171

72-
// Tunable flags.
73-
var (
74-
userPrivilegesFlag = kingpin.Flag(
75-
"collect.mysql.user.privileges",
76-
"Enable collecting user privileges from mysql.user",
77-
).Default("false").Bool()
78-
)
79-
8072
var (
8173
labelNames = []string{"mysql_user", "hostmask"}
8274
)
@@ -102,7 +94,9 @@ var (
10294
)
10395

10496
// ScrapeUser collects from `information_schema.processlist`.
105-
type ScrapeUser struct{}
97+
type ScrapeUser struct {
98+
Privileges bool
99+
}
106100

107101
// Name of the Scraper. Should be unique.
108102
func (ScrapeUser) Name() string {
@@ -119,8 +113,16 @@ func (ScrapeUser) Version() float64 {
119113
return 5.1
120114
}
121115

116+
// RegisterFlags adds flags to configure the Scraper.
117+
func (s *ScrapeUser) RegisterFlags(application *kingpin.Application) {
118+
application.Flag(
119+
"collect.mysql.user.privileges",
120+
"Enable collecting user privileges from mysql.user",
121+
).Default("false").BoolVar(&s.Privileges)
122+
}
123+
122124
// Scrape collects data from database connection and sends it over channel as prometheus metric.
123-
func (ScrapeUser) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
125+
func (s ScrapeUser) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
124126
var (
125127
userRows *sql.Rows
126128
err error
@@ -213,7 +215,7 @@ func (ScrapeUser) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.M
213215
return err
214216
}
215217

216-
if *userPrivilegesFlag {
218+
if s.Privileges {
217219
userCols, err := userRows.Columns()
218220
if err != nil {
219221
return err

0 commit comments

Comments
 (0)