Skip to content

Commit 04b7a2c

Browse files
committed
Added TSDB import with OpenMetrics and CSV file support.
Based on #5887 Thanks for your work so far @dipack95, it helped a lot! Changes on top of @dipack95: * Addressed all reviews components * Use subcommands for different formats * Simplifed block creation, no need to be such complex for first iteration. * Simpliefied and separate concerns. No need to have access to DB. Block * writting is separated as well for ease of benchmarking and test. This will be also needed by @JessicaGreben * Added import support for different formats. * Removed all tests - those had to be pulled over and adjusted ): Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent 3d0ff2d commit 04b7a2c

File tree

12 files changed

+822
-1417
lines changed

12 files changed

+822
-1417
lines changed

pkg/textparse/interface.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package textparse
1616
import (
1717
"mime"
1818

19+
"github.com/pkg/errors"
1920
"github.com/prometheus/prometheus/pkg/exemplar"
2021
"github.com/prometheus/prometheus/pkg/labels"
2122
)
@@ -55,8 +56,8 @@ type Parser interface {
5556
// exemplar. It returns if an exemplar exists or not.
5657
Exemplar(l *exemplar.Exemplar) bool
5758

58-
// Next advances the parser to the next sample. It returns false if no
59-
// more samples were read or an error occurred.
59+
// Next advances the parser to the next sample. It returns io.EOF if no
60+
// more samples were read.
6061
Next() (Entry, error)
6162
}
6263

@@ -94,3 +95,45 @@ const (
9495
MetricTypeStateset = "stateset"
9596
MetricTypeUnknown = "unknown"
9697
)
98+
99+
func (m *MetricType) ParseForOpenMetrics(mtyp string) error {
100+
switch mtyp {
101+
case "counter":
102+
*m = MetricTypeCounter
103+
case "gauge":
104+
*m = MetricTypeGauge
105+
case "histogram":
106+
*m = MetricTypeHistogram
107+
case "gaugehistogram":
108+
*m = MetricTypeGaugeHistogram
109+
case "summary":
110+
*m = MetricTypeSummary
111+
case "info":
112+
*m = MetricTypeInfo
113+
case "stateset":
114+
*m = MetricTypeStateset
115+
case "unknown":
116+
*m = MetricTypeUnknown
117+
default:
118+
return errors.Errorf("invalid metric type %q", mtyp)
119+
}
120+
return nil
121+
}
122+
123+
func (m *MetricType) ParseForProm(mtyp string) error {
124+
switch mtyp {
125+
case "counter":
126+
*m = MetricTypeCounter
127+
case "gauge":
128+
*m = MetricTypeGauge
129+
case "histogram":
130+
*m = MetricTypeHistogram
131+
case "summary":
132+
*m = MetricTypeSummary
133+
case "unknown":
134+
*m = MetricTypeUnknown
135+
default:
136+
return errors.Errorf("invalid metric type %q", mtyp)
137+
}
138+
return nil
139+
}

pkg/textparse/openmetricsparse.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -259,25 +259,8 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
259259
}
260260
switch t {
261261
case tType:
262-
switch s := yoloString(p.text); s {
263-
case "counter":
264-
p.mtype = MetricTypeCounter
265-
case "gauge":
266-
p.mtype = MetricTypeGauge
267-
case "histogram":
268-
p.mtype = MetricTypeHistogram
269-
case "gaugehistogram":
270-
p.mtype = MetricTypeGaugeHistogram
271-
case "summary":
272-
p.mtype = MetricTypeSummary
273-
case "info":
274-
p.mtype = MetricTypeInfo
275-
case "stateset":
276-
p.mtype = MetricTypeStateset
277-
case "unknown":
278-
p.mtype = MetricTypeUnknown
279-
default:
280-
return EntryInvalid, errors.Errorf("invalid metric type %q", s)
262+
if err := p.mtype.ParseForOpenMetrics(yoloString(p.text)); err != nil {
263+
return EntryInvalid, err
281264
}
282265
case tHelp:
283266
if !utf8.Valid(p.text) {

pkg/textparse/promparse.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -289,19 +289,8 @@ func (p *PromParser) Next() (Entry, error) {
289289
}
290290
switch t {
291291
case tType:
292-
switch s := yoloString(p.text); s {
293-
case "counter":
294-
p.mtype = MetricTypeCounter
295-
case "gauge":
296-
p.mtype = MetricTypeGauge
297-
case "histogram":
298-
p.mtype = MetricTypeHistogram
299-
case "summary":
300-
p.mtype = MetricTypeSummary
301-
case "untyped":
302-
p.mtype = MetricTypeUnknown
303-
default:
304-
return EntryInvalid, errors.Errorf("invalid metric type %q", s)
292+
if err := p.mtype.ParseForProm(yoloString(p.text)); err != nil {
293+
return EntryInvalid, err
305294
}
306295
case tHelp:
307296
if !utf8.Valid(p.text) {

tsdb/cmd/tsdb/main.go

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,15 @@ import (
3434
"github.com/go-kit/kit/log"
3535
"github.com/pkg/errors"
3636
"github.com/prometheus/prometheus/pkg/labels"
37+
"github.com/prometheus/prometheus/pkg/textparse"
3738
"github.com/prometheus/prometheus/storage"
3839
"github.com/prometheus/prometheus/tsdb"
3940
"github.com/prometheus/prometheus/tsdb/chunks"
4041
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
4142
"github.com/prometheus/prometheus/tsdb/importer"
43+
"github.com/prometheus/prometheus/tsdb/importer/blocks"
44+
"github.com/prometheus/prometheus/tsdb/importer/csv"
45+
"github.com/prometheus/prometheus/tsdb/importer/openmetrics"
4246
"gopkg.in/alecthomas/kingpin.v2"
4347
)
4448

@@ -53,34 +57,43 @@ func execute() (err error) {
5357
var (
5458
defaultDBPath = filepath.Join("benchout", "storage")
5559

56-
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
57-
benchCmd = cli.Command("bench", "run benchmarks")
58-
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
59-
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
60-
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
61-
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String()
62-
listCmd = cli.Command("ls", "list db blocks")
63-
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
64-
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
65-
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
66-
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
67-
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
68-
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
69-
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
70-
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
71-
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
72-
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
73-
importCmd = cli.Command("import", "import samples from file containing information formatted in the Open Metrics format. Please refer to the storage docs for more details.")
74-
importFilePath = importCmd.Arg("file path", "file to import samples from (must be in Open Metrics format)").Required().String()
75-
importDbPath = importCmd.Arg("db path", "database path").Required().String()
76-
importMaxSamplesInMemory = importCmd.Flag("max-samples-in-mem", "maximum number of samples to process in a cycle").Default("10000").Int()
77-
importMaxBlockChildren = importCmd.Flag("max-block-children", "maximum number of children a block can have at a given time").Default("20").Int()
60+
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
61+
62+
benchCmd = cli.Command("bench", "run benchmarks")
63+
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
64+
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
65+
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
66+
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String()
67+
68+
listCmd = cli.Command("ls", "list db blocks")
69+
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
70+
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
71+
72+
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
73+
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
74+
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
75+
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
76+
77+
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
78+
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
79+
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
80+
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
81+
82+
importCmd = cli.Command("import", "[Experimental] import samples from input and produce TSDB block. Please refer to the storage docs for more details.")
83+
importDbPath = importCmd.Flag("output", "output directory for generated block").Default(".").String()
84+
importFilePath = importCmd.Flag("input-file", "disables reading from input and using file to import samples from. If empty input is required").String()
85+
importBlockSize = importCmd.Flag("block-size", "The maximum block size. The actual block timestamps will be aligned with Prometheus time ranges.").Default("2h").Hidden().Duration()
86+
87+
omImportCmd = importCmd.Command("openmetrics", "import samples from OpenMetrics input and produce TSDB block. Please refer to the storage docs for more details.")
88+
89+
csvImportCmd = importCmd.Command("csv", "import samples from CSV input and produce TSDB block. Please refer to the storage docs for more details.")
90+
csvImportDelimiter = csvImportCmd.Flag("delimiter", "CSV single character for fields delimiting").Default(",").String()
7891
)
7992

8093
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
8194
var merr tsdb_errors.MultiError
8295

83-
switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
96+
switch cmd := kingpin.MustParse(cli.Parse(os.Args[1:])); cmd {
8497
case benchWriteCmd.FullCommand():
8598
wb := &writeBenchmark{
8699
outPath: *benchWriteOutPath,
@@ -144,16 +157,39 @@ func execute() (err error) {
144157
err = merr.Err()
145158
}()
146159
return dumpSamples(db, *dumpMinTime, *dumpMaxTime)
147-
case importCmd.FullCommand():
148-
f, err := os.Open(*importFilePath)
149-
if err != nil {
150-
return err
160+
case omImportCmd.FullCommand(), csvImportCmd.FullCommand():
161+
input := os.Stdin
162+
if importFilePath != nil {
163+
input, err = os.Open(*importFilePath)
164+
if err != nil {
165+
return err
166+
}
167+
defer func() {
168+
merr.Add(err)
169+
merr.Add(input.Close())
170+
err = merr.Err()
171+
}()
172+
}
173+
174+
var p textparse.Parser
175+
if cmd == omImportCmd.FullCommand() {
176+
p = openmetrics.NewParser(input)
177+
} else {
178+
if len(*csvImportDelimiter) != 1 {
179+
return errors.Errorf("wrong format of delimiter flag, expected single character, got %q", *csvImportDelimiter)
180+
}
181+
182+
p = csv.NewParser(input, []rune(*csvImportDelimiter)[0])
151183
}
152-
return importer.ImportFromFile(f, *importDbPath, *importMaxSamplesInMemory, *importMaxBlockChildren, logger)
184+
return importer.Import(logger, p, blocks.NewMultiWriter(logger, *importDbPath, durToMillis(*importBlockSize)))
153185
}
154186
return nil
155187
}
156188

189+
func durToMillis(t time.Duration) int64 {
190+
return int64(t.Seconds() * 1000)
191+
}
192+
157193
type writeBenchmark struct {
158194
outPath string
159195
samplesFile string

tsdb/importer/blocks/multi.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package blocks
2+
3+
import (
4+
"github.com/go-kit/kit/log"
5+
"github.com/oklog/ulid"
6+
"github.com/pkg/errors"
7+
"github.com/prometheus/prometheus/pkg/labels"
8+
"github.com/prometheus/prometheus/storage"
9+
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
10+
"github.com/prometheus/prometheus/tsdb/index"
11+
)
12+
13+
type errAppender struct{ err error }
14+
15+
func (a errAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { return 0, a.err }
16+
func (a errAppender) AddFast(ref uint64, t int64, v float64) error { return a.err }
17+
func (a errAppender) Commit() error { return a.err }
18+
func (a errAppender) Rollback() error { return a.err }
19+
20+
func rangeForTimestamp(t int64, width int64) (maxt int64) {
21+
return (t/width)*width + width
22+
}
23+
24+
type MultiWriter struct {
25+
blocks map[index.Range]Writer
26+
activeAppenders map[index.Range]storage.Appender
27+
28+
logger log.Logger
29+
dir string
30+
// TODO(bwplotka): Allow more complex compaction levels.
31+
sizeMillis int64
32+
}
33+
34+
func NewMultiWriter(logger log.Logger, dir string, sizeMillis int64) *MultiWriter {
35+
return &MultiWriter{
36+
logger: logger,
37+
dir: dir,
38+
sizeMillis: sizeMillis,
39+
blocks: map[index.Range]Writer{},
40+
activeAppenders: map[index.Range]storage.Appender{},
41+
}
42+
}
43+
44+
// Appender is not thread-safe. Returned Appender is not thread-save as well.
45+
// TODO(bwplotka): Consider making it thread safe.
46+
func (w *MultiWriter) Appender() storage.Appender { return w }
47+
48+
func (w *MultiWriter) getOrCreate(t int64) storage.Appender {
49+
maxt := rangeForTimestamp(t, w.sizeMillis)
50+
hash := index.Range{Start: maxt - w.sizeMillis, End: maxt}
51+
if a, ok := w.activeAppenders[hash]; ok {
52+
return a
53+
}
54+
55+
nw, err := NewTSDBWriter(w.logger, w.dir)
56+
if err != nil {
57+
return errAppender{err: errors.Wrap(err, "new tsdb writer")}
58+
}
59+
60+
w.blocks[hash] = nw
61+
w.activeAppenders[hash] = nw.Appender()
62+
return w.activeAppenders[hash]
63+
}
64+
65+
func (w *MultiWriter) Add(l labels.Labels, t int64, v float64) (uint64, error) {
66+
return w.getOrCreate(t).Add(l, t, v)
67+
}
68+
69+
func (w *MultiWriter) AddFast(ref uint64, t int64, v float64) error {
70+
return w.getOrCreate(t).AddFast(ref, t, v)
71+
}
72+
73+
func (w *MultiWriter) Commit() error {
74+
var merr tsdb_errors.MultiError
75+
for _, a := range w.activeAppenders {
76+
merr.Add(a.Commit())
77+
}
78+
return merr.Err()
79+
}
80+
81+
func (w *MultiWriter) Rollback() error {
82+
var merr tsdb_errors.MultiError
83+
for _, a := range w.activeAppenders {
84+
merr.Add(a.Rollback())
85+
}
86+
return merr.Err()
87+
}
88+
89+
func (w *MultiWriter) Flush() ([]ulid.ULID, error) {
90+
ids := make([]ulid.ULID, 0, len(w.blocks))
91+
for _, b := range w.blocks {
92+
id, err := b.Flush()
93+
if err != nil {
94+
return nil, err
95+
}
96+
ids = append(ids, id...)
97+
}
98+
return ids, nil
99+
}
100+
101+
func (w *MultiWriter) Close() error {
102+
var merr tsdb_errors.MultiError
103+
for _, b := range w.blocks {
104+
merr.Add(b.Close())
105+
}
106+
return merr.Err()
107+
}

0 commit comments

Comments
 (0)