Skip to content

Commit a331469

Browse files
committed
Added TSDB import with OpenMetrics and CSV file support.
Based on #5887 Thanks for your work so far @dipack95, it helped a lot! Changes on top of @dipack95: * Addressed all reviews components * Use subcommands for different formats * Simplifed block creation, no need to be such complex for first iteration. * Simpliefied and separate concerns. No need to have access to DB. Block * writting is separated as well for ease of benchmarking and test. This will be also needed by @JessicaGreben * Added import support for different formats. * Removed all tests - those had to be pulled over and adjusted ): Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent 575cdf2 commit a331469

File tree

12 files changed

+898
-1417
lines changed

12 files changed

+898
-1417
lines changed

pkg/textparse/interface.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package textparse
1616
import (
1717
"mime"
1818

19+
"github.com/pkg/errors"
1920
"github.com/prometheus/prometheus/pkg/exemplar"
2021
"github.com/prometheus/prometheus/pkg/labels"
2122
)
@@ -55,8 +56,8 @@ type Parser interface {
5556
// exemplar. It returns if an exemplar exists or not.
5657
Exemplar(l *exemplar.Exemplar) bool
5758

58-
// Next advances the parser to the next sample. It returns false if no
59-
// more samples were read or an error occurred.
59+
// Next advances the parser to the next sample. It returns io.EOF if no
60+
// more samples were read.
6061
Next() (Entry, error)
6162
}
6263

@@ -94,3 +95,45 @@ const (
9495
MetricTypeStateset = "stateset"
9596
MetricTypeUnknown = "unknown"
9697
)
98+
99+
func (m *MetricType) ParseForOpenMetrics(mtyp string) error {
100+
switch mtyp {
101+
case "counter":
102+
*m = MetricTypeCounter
103+
case "gauge":
104+
*m = MetricTypeGauge
105+
case "histogram":
106+
*m = MetricTypeHistogram
107+
case "gaugehistogram":
108+
*m = MetricTypeGaugeHistogram
109+
case "summary":
110+
*m = MetricTypeSummary
111+
case "info":
112+
*m = MetricTypeInfo
113+
case "stateset":
114+
*m = MetricTypeStateset
115+
case "unknown":
116+
*m = MetricTypeUnknown
117+
default:
118+
return errors.Errorf("invalid metric type %q", mtyp)
119+
}
120+
return nil
121+
}
122+
123+
func (m *MetricType) ParseForProm(mtyp string) error {
124+
switch mtyp {
125+
case "counter":
126+
*m = MetricTypeCounter
127+
case "gauge":
128+
*m = MetricTypeGauge
129+
case "histogram":
130+
*m = MetricTypeHistogram
131+
case "summary":
132+
*m = MetricTypeSummary
133+
case "unknown":
134+
*m = MetricTypeUnknown
135+
default:
136+
return errors.Errorf("invalid metric type %q", mtyp)
137+
}
138+
return nil
139+
}

pkg/textparse/openmetricsparse.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -259,25 +259,8 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
259259
}
260260
switch t {
261261
case tType:
262-
switch s := yoloString(p.text); s {
263-
case "counter":
264-
p.mtype = MetricTypeCounter
265-
case "gauge":
266-
p.mtype = MetricTypeGauge
267-
case "histogram":
268-
p.mtype = MetricTypeHistogram
269-
case "gaugehistogram":
270-
p.mtype = MetricTypeGaugeHistogram
271-
case "summary":
272-
p.mtype = MetricTypeSummary
273-
case "info":
274-
p.mtype = MetricTypeInfo
275-
case "stateset":
276-
p.mtype = MetricTypeStateset
277-
case "unknown":
278-
p.mtype = MetricTypeUnknown
279-
default:
280-
return EntryInvalid, errors.Errorf("invalid metric type %q", s)
262+
if err := p.mtype.ParseForOpenMetrics(yoloString(p.text)); err != nil {
263+
return EntryInvalid, err
281264
}
282265
case tHelp:
283266
if !utf8.Valid(p.text) {

pkg/textparse/promparse.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -289,19 +289,8 @@ func (p *PromParser) Next() (Entry, error) {
289289
}
290290
switch t {
291291
case tType:
292-
switch s := yoloString(p.text); s {
293-
case "counter":
294-
p.mtype = MetricTypeCounter
295-
case "gauge":
296-
p.mtype = MetricTypeGauge
297-
case "histogram":
298-
p.mtype = MetricTypeHistogram
299-
case "summary":
300-
p.mtype = MetricTypeSummary
301-
case "untyped":
302-
p.mtype = MetricTypeUnknown
303-
default:
304-
return EntryInvalid, errors.Errorf("invalid metric type %q", s)
292+
if err := p.mtype.ParseForProm(yoloString(p.text)); err != nil {
293+
return EntryInvalid, err
305294
}
306295
case tHelp:
307296
if !utf8.Valid(p.text) {

tsdb/cmd/tsdb/main.go

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,15 @@ import (
3434
"github.com/go-kit/kit/log"
3535
"github.com/pkg/errors"
3636
"github.com/prometheus/prometheus/pkg/labels"
37+
"github.com/prometheus/prometheus/pkg/textparse"
3738
"github.com/prometheus/prometheus/storage"
3839
"github.com/prometheus/prometheus/tsdb"
3940
"github.com/prometheus/prometheus/tsdb/chunks"
4041
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
4142
"github.com/prometheus/prometheus/tsdb/importer"
43+
"github.com/prometheus/prometheus/tsdb/importer/blocks"
44+
"github.com/prometheus/prometheus/tsdb/importer/csv"
45+
"github.com/prometheus/prometheus/tsdb/importer/openmetrics"
4246
"gopkg.in/alecthomas/kingpin.v2"
4347
)
4448

@@ -53,34 +57,43 @@ func execute() (err error) {
5357
var (
5458
defaultDBPath = filepath.Join("benchout", "storage")
5559

56-
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
57-
benchCmd = cli.Command("bench", "run benchmarks")
58-
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
59-
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
60-
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
61-
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String()
62-
listCmd = cli.Command("ls", "list db blocks")
63-
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
64-
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
65-
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
66-
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
67-
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
68-
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
69-
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
70-
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
71-
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
72-
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
73-
importCmd = cli.Command("import", "import samples from file containing information formatted in the Open Metrics format. Please refer to the storage docs for more details.")
74-
importFilePath = importCmd.Arg("file path", "file to import samples from (must be in Open Metrics format)").Required().String()
75-
importDbPath = importCmd.Arg("db path", "database path").Required().String()
76-
importMaxSamplesInMemory = importCmd.Flag("max-samples-in-mem", "maximum number of samples to process in a cycle").Default("10000").Int()
77-
importMaxBlockChildren = importCmd.Flag("max-block-children", "maximum number of children a block can have at a given time").Default("20").Int()
60+
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
61+
62+
benchCmd = cli.Command("bench", "run benchmarks")
63+
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
64+
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
65+
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
66+
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String()
67+
68+
listCmd = cli.Command("ls", "list db blocks")
69+
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
70+
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
71+
72+
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
73+
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
74+
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
75+
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
76+
77+
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
78+
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
79+
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
80+
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
81+
82+
importCmd = cli.Command("import", "[Experimental] import samples from input and produce TSDB block. Please refer to the storage docs for more details.")
83+
importDbPath = importCmd.Flag("output", "output directory for generated block").Default(".").String()
84+
importFilePath = importCmd.Flag("input-file", "disables reading from input and using file to import samples from. If empty input is required").String()
85+
importBlockSize = importCmd.Flag("block-size", "The maximum block size. The actual block timestamps will be aligned with Prometheus time ranges.").Default("2h").Hidden().Duration()
86+
87+
omImportCmd = importCmd.Command("openmetrics", "import samples from OpenMetrics input and produce TSDB block. Please refer to the storage docs for more details.")
88+
89+
csvImportCmd = importCmd.Command("csv", "import samples from CSV input and produce TSDB block. Please refer to the storage docs for more details.")
90+
csvImportDelimiter = csvImportCmd.Flag("delimiter", "CSV single character for fields delimiting").Default(",").String()
7891
)
7992

8093
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
8194
var merr tsdb_errors.MultiError
8295

83-
switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
96+
switch cmd := kingpin.MustParse(cli.Parse(os.Args[1:])); cmd {
8497
case benchWriteCmd.FullCommand():
8598
wb := &writeBenchmark{
8699
outPath: *benchWriteOutPath,
@@ -144,16 +157,39 @@ func execute() (err error) {
144157
err = merr.Err()
145158
}()
146159
return dumpSamples(db, *dumpMinTime, *dumpMaxTime)
147-
case importCmd.FullCommand():
148-
f, err := os.Open(*importFilePath)
149-
if err != nil {
150-
return err
160+
case omImportCmd.FullCommand(), csvImportCmd.FullCommand():
161+
input := os.Stdin
162+
if importFilePath != nil {
163+
input, err = os.Open(*importFilePath)
164+
if err != nil {
165+
return err
166+
}
167+
defer func() {
168+
merr.Add(err)
169+
merr.Add(input.Close())
170+
err = merr.Err()
171+
}()
172+
}
173+
174+
var p textparse.Parser
175+
if cmd == omImportCmd.FullCommand() {
176+
p = openmetrics.NewParser(input)
177+
} else {
178+
if len(*csvImportDelimiter) != 1 {
179+
return errors.Errorf("wrong format of delimiter flag, expected single character, got %q", *csvImportDelimiter)
180+
}
181+
182+
p = csv.NewParser(input, []rune(*csvImportDelimiter)[0])
151183
}
152-
return importer.ImportFromFile(f, *importDbPath, *importMaxSamplesInMemory, *importMaxBlockChildren, logger)
184+
return importer.Import(logger, p, blocks.NewMultiWriter(logger, *importDbPath, durToMillis(*importBlockSize)))
153185
}
154186
return nil
155187
}
156188

189+
func durToMillis(t time.Duration) int64 {
190+
return int64(t.Seconds() * 1000)
191+
}
192+
157193
type writeBenchmark struct {
158194
outPath string
159195
samplesFile string

tsdb/importer/blocks/multi.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2020 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package blocks
15+
16+
import (
17+
"github.com/go-kit/kit/log"
18+
"github.com/oklog/ulid"
19+
"github.com/pkg/errors"
20+
"github.com/prometheus/prometheus/pkg/labels"
21+
"github.com/prometheus/prometheus/storage"
22+
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
23+
"github.com/prometheus/prometheus/tsdb/index"
24+
)
25+
26+
type errAppender struct{ err error }
27+
28+
func (a errAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { return 0, a.err }
29+
func (a errAppender) AddFast(ref uint64, t int64, v float64) error { return a.err }
30+
func (a errAppender) Commit() error { return a.err }
31+
func (a errAppender) Rollback() error { return a.err }
32+
33+
func rangeForTimestamp(t int64, width int64) (maxt int64) {
34+
return (t/width)*width + width
35+
}
36+
37+
type MultiWriter struct {
38+
blocks map[index.Range]Writer
39+
activeAppenders map[index.Range]storage.Appender
40+
41+
logger log.Logger
42+
dir string
43+
// TODO(bwplotka): Allow more complex compaction levels.
44+
sizeMillis int64
45+
}
46+
47+
func NewMultiWriter(logger log.Logger, dir string, sizeMillis int64) *MultiWriter {
48+
return &MultiWriter{
49+
logger: logger,
50+
dir: dir,
51+
sizeMillis: sizeMillis,
52+
blocks: map[index.Range]Writer{},
53+
activeAppenders: map[index.Range]storage.Appender{},
54+
}
55+
}
56+
57+
// Appender is not thread-safe. Returned Appender is not thread-save as well.
58+
// TODO(bwplotka): Consider making it thread safe.
59+
func (w *MultiWriter) Appender() storage.Appender { return w }
60+
61+
func (w *MultiWriter) getOrCreate(t int64) storage.Appender {
62+
maxt := rangeForTimestamp(t, w.sizeMillis)
63+
hash := index.Range{Start: maxt - w.sizeMillis, End: maxt}
64+
if a, ok := w.activeAppenders[hash]; ok {
65+
return a
66+
}
67+
68+
nw, err := NewTSDBWriter(w.logger, w.dir)
69+
if err != nil {
70+
return errAppender{err: errors.Wrap(err, "new tsdb writer")}
71+
}
72+
73+
w.blocks[hash] = nw
74+
w.activeAppenders[hash] = nw.Appender()
75+
return w.activeAppenders[hash]
76+
}
77+
78+
func (w *MultiWriter) Add(l labels.Labels, t int64, v float64) (uint64, error) {
79+
return w.getOrCreate(t).Add(l, t, v)
80+
}
81+
82+
func (w *MultiWriter) AddFast(ref uint64, t int64, v float64) error {
83+
return w.getOrCreate(t).AddFast(ref, t, v)
84+
}
85+
86+
func (w *MultiWriter) Commit() error {
87+
var merr tsdb_errors.MultiError
88+
for _, a := range w.activeAppenders {
89+
merr.Add(a.Commit())
90+
}
91+
return merr.Err()
92+
}
93+
94+
func (w *MultiWriter) Rollback() error {
95+
var merr tsdb_errors.MultiError
96+
for _, a := range w.activeAppenders {
97+
merr.Add(a.Rollback())
98+
}
99+
return merr.Err()
100+
}
101+
102+
func (w *MultiWriter) Flush() ([]ulid.ULID, error) {
103+
ids := make([]ulid.ULID, 0, len(w.blocks))
104+
for _, b := range w.blocks {
105+
id, err := b.Flush()
106+
if err != nil {
107+
return nil, err
108+
}
109+
ids = append(ids, id...)
110+
}
111+
return ids, nil
112+
}
113+
114+
func (w *MultiWriter) Close() error {
115+
var merr tsdb_errors.MultiError
116+
for _, b := range w.blocks {
117+
merr.Add(b.Close())
118+
}
119+
return merr.Err()
120+
}

0 commit comments

Comments
 (0)