diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index e3dc530a..ec6e96d6 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -30,6 +30,8 @@ import ( "text/tabwriter" "time" + "github.com/prometheus/tsdb/importer" + "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/tsdb" @@ -50,23 +52,27 @@ func execute() (err error) { var ( defaultDBPath = filepath.Join("benchout", "storage") - cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb") - benchCmd = cli.Command("bench", "run benchmarks") - benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark") - benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String() - benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int() - benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String() - listCmd = cli.Command("ls", "list db blocks") - listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool() - listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() - analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.") - analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() - analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String() - analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int() - dumpCmd = cli.Command("dump", "dump samples from a TSDB") - dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() - dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64() - dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() + cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb") + benchCmd = cli.Command("bench", "run benchmarks") + benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark") + benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String() + benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int() + benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String() + listCmd = cli.Command("ls", "list db blocks") + listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool() + listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() + analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.") + analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() + analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String() + analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int() + dumpCmd = cli.Command("dump", "dump samples from a TSDB") + dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() + dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64() + dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() + importCmd = cli.Command("import", "import samples from file containing information formatted in the Prometheus exposition format") + importFilePath = importCmd.Arg("file path", "file to import samples from (must be in Prometheus exposition format)").String() + importDbPath = importCmd.Arg("db path", "database path").String() + importSkipTimestampCheck = importCmd.Flag("skip-timestamp-check", "skip timestamp check when importing into an existing TSDB instance.").Default("false").Bool() ) logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) @@ -136,6 +142,11 @@ func execute() (err error) { err = merr.Err() }() return dumpSamples(db, *dumpMinTime, *dumpMaxTime) + case importCmd.FullCommand(): + // The mimetype is required to allow the text parser to function. + // The Prometheus exposition format has the following mimetype. + contentType := "application/openmetrics-text; version=0.0.1; charset=utf-8" + return importer.ImportFromFile(*importFilePath, contentType, *importDbPath, *importSkipTimestampCheck, logger) } return nil } diff --git a/go.mod b/go.mod index ccdd4372..a7ddd438 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,11 @@ require ( github.com/go-kit/kit v0.8.0 github.com/golang/snappy v0.0.1 github.com/oklog/ulid v1.3.1 + github.com/otiai10/copy v1.0.1 + github.com/otiai10/curr v0.0.0-20190513014714-f5a3d24e5776 // indirect github.com/pkg/errors v0.8.0 github.com/prometheus/client_golang v1.0.0 + github.com/prometheus/prometheus v2.5.0+incompatible golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/go.sum b/go.sum index 365fa5ec..7a922202 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +bou.ke/monkey v1.0.1 h1:zEMLInw9xvNakzUUPjfS4Ds6jYPqCFx3m7bRmG5NH2U= +bou.ke/monkey v1.0.1/go.mod h1:FgHuK96Rv2Nlf+0u1OOVDpCMdsWyOFmeeketDHE7LIg= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= @@ -27,11 +29,11 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= @@ -41,6 +43,15 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/otiai10/copy v1.0.1 h1:gtBjD8aq4nychvRZ2CyJvFWAw0aja+VHazDdruZKGZA= +github.com/otiai10/copy v1.0.1/go.mod h1:8bMCJrAqOtN/d9oyh5HR7HhLQMvcGMpGdwRDYsfOCHc= +github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJG+0mI8eUu6xqkFDYS2kb2saOteoSB3cE= +github.com/otiai10/curr v0.0.0-20190513014714-f5a3d24e5776 h1:o59bHXu8Ejas8Kq6pjoVJQ9/neN66SM8AKh6wI42BBs= +github.com/otiai10/curr v0.0.0-20190513014714-f5a3d24e5776/go.mod h1:3HNVkVOU7vZeFXocWuvtcS0XSFLcf2XUSDHkq9t1jU4= +github.com/otiai10/mint v1.2.3 h1:PsrRBmrxR68kyNu6YlqYHbNlItc5vOkuS6LBEsNttVA= +github.com/otiai10/mint v1.2.3/go.mod h1:YnfyPNhBvnY8bW4SGQHCs/aAFhkgySlMZbrF5U0bOVw= +github.com/otiai10/mint v1.2.4 h1:DxYL0itZyPaR5Z9HILdxSoHx+gNs6Yx+neOGS3IVUk0= +github.com/otiai10/mint v1.2.4/go.mod h1:d+b7n/0R3tdyUYYylALXpWQ/kTN+QobSq/4SRGBkR3M= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -59,6 +70,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFd github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/prometheus v2.5.0+incompatible h1:7QPitgO2kOFG8ecuRn9O/4L9+10He72rVRJvMXrE9Hg= +github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= diff --git a/importer/import.go b/importer/import.go new file mode 100644 index 00000000..293e748a --- /dev/null +++ b/importer/import.go @@ -0,0 +1,307 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importer + +import ( + "context" + "fmt" + "io/ioutil" + "math" + "os" + "path/filepath" + "strings" + "time" + + "github.com/go-kit/kit/log/level" + + "github.com/go-kit/kit/log" + "github.com/otiai10/copy" + prom_labels "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/tsdb" + tsdb_labels "github.com/prometheus/tsdb/labels" +) + +// Implementing the error interface to create a +// constant, which cannot be overridden. +// https://dave.cheney.net/2016/04/07/constant-errors +type Error string + +func (e Error) Error() string { + return string(e) +} + +// This error is thrown when we try to merge/add blocks to an existing TSDB instance, +// and the new blocks have a time overlap with the current blocks. +const OverlappingBlocksError = Error("blocks overlap with blocks currently in DB") + +// Duration of a block in milliseconds +const BlockDuration = 2 * 60 * 60 * 1000 + +type timestamp = int64 + +type metricSample struct { + TimestampMs timestamp + Value float64 + Labels tsdb_labels.Labels +} + +// ImportFromFile imports data from a file formatted according to the Prometheus exposition format, +// converts it into block(s), and places the newly created block(s) in the +// TSDB DB directory, where it is treated like any other block. +func ImportFromFile(filePath string, contentType string, dbPath string, skipTimestampCheck bool, logger log.Logger) error { + if logger == nil { + logger = log.NewNopLogger() + } + + tmpDbDir, err := ioutil.TempDir("", "importer") + if err != nil { + return err + } + defer os.RemoveAll(tmpDbDir) + + dbMint, dbMaxt, err := getDbTimeLimits(dbPath) + if err != nil { + return err + } + + f, err := os.Open(filePath) + if err != nil { + return err + } + defer f.Close() + + bytes, err := ioutil.ReadAll(f) + if err != nil { + return err + } + + blockPaths, err := pushMetrics(bytes, contentType, tmpDbDir, dbMint, dbMaxt, skipTimestampCheck, logger) + if err != nil { + return err + } + + level.Info(logger).Log("msg", "blocks created", "blockPaths", blockPaths) + + err = copyToDatabase(tmpDbDir, dbPath) + if err != nil { + return err + } + + return nil +} + +// pushMetrics parses metrics formatted in the Prometheus exposition format, +// and creates corresponding blocks. +// Returns paths to the newly created blocks, and error. +func pushMetrics(b []byte, contentType string, dbPath string, dbMint, dbMaxt timestamp, skipTimestampCheck bool, logger log.Logger) ([]string, error) { + var minValidTimestamp timestamp + minValidTimestamp = math.MaxInt64 + var maxValidTimestamp timestamp + maxValidTimestamp = math.MinInt64 + + var blockPaths []string + var currentBucket []*metricSample + var startTime time.Time + var currentTime time.Time + var err error + parser := textparse.New(b, contentType) + for { + var ent textparse.Entry + if ent, err = parser.Next(); err != nil { + // Error strings are just different enough across packages, + // hence this catch-all that just looks for "EOF" in the error + // string, and if it finds one, it means that the parsing is complete. + if strings.Contains(strings.ToLower(err.Error()), "eof") { + err = nil + break + } + // In case the error that we see is not related to the EOF. + return nil, err + } + switch ent { + case textparse.EntryType: + continue + case textparse.EntryHelp: + continue + case textparse.EntryUnit: + continue + case textparse.EntryComment: + continue + default: + } + _, currentTimestampMicroS, val := parser.Series() + + // The text parser converts all timestamps to microseconds. + // TSDB looks for timestamps in milliseconds. + var currentTimestampMs timestamp + if currentTimestampMicroS == nil { + currentTimestampMs = 0 + } else { + currentTimestampMs = *currentTimestampMicroS / 1e3 + } + + minValidTimestamp = minInt(minValidTimestamp, currentTimestampMs) + maxValidTimestamp = maxInt(maxValidTimestamp, currentTimestampMs) + + var lset prom_labels.Labels + _ = parser.Metric(&lset) + + tsdbLabels := tsdb_labels.FromMap(lset.Map()) + + currentTime = time.Unix(currentTimestampMs/1000, 0) + currentSample := &metricSample{TimestampMs: currentTimestampMs, Value: val, Labels: tsdbLabels} + + if startTime.IsZero() { + startTime = currentTime + currentBucket = append(currentBucket, currentSample) + continue + } + + timeDelta := currentTime.Sub(startTime) + if timeDelta.Seconds()*1000 >= BlockDuration { + startTime = currentTime + + start := int64(startTime.Second() * 1000) + end := int64(currentTime.Second() * 1000) + blockPath, err := pushToDisk(currentBucket, dbPath, dbMint, dbMaxt, start, end, skipTimestampCheck, logger) + if err != nil { + if err == OverlappingBlocksError { + level.Warn(logger).Log("msg", fmt.Sprintf("could not merge with range %d to %d as it overlaps with target DB", start, end)) + } else { + return nil, err + } + } + blockPaths = append(blockPaths, blockPath) + currentBucket = []*metricSample{currentSample} + } else { + currentBucket = append(currentBucket, currentSample) + } + } + // Last bucket to be added + blockPath, err := pushToDisk(currentBucket, dbPath, dbMint, dbMaxt, int64(startTime.Second()*1000), int64(currentTime.Second()*1000), skipTimestampCheck, logger) + if err != nil { + if err == OverlappingBlocksError { + level.Warn(logger).Log("msg", fmt.Sprintf("could not merge with range %d to %d as it overlaps with target DB", int64(startTime.Second()*1000), int64(currentTime.Second()*1000))) + } else { + return nil, err + } + } + blockPaths = append(blockPaths, blockPath) + return blockPaths, nil +} + +// pushToDisk verifies the sample is compatible with the target TSDB instance, and then creates a new block +// from the sample data. +func pushToDisk(samples []*metricSample, dbDir string, dbMint, dbMaxt, startTime, endTime timestamp, skipTimestampCheck bool, logger log.Logger) (string, error) { + if !skipTimestampCheck { + err := verifyIntegration(dbMint, dbMaxt, startTime, endTime) + if err != nil { + return "", OverlappingBlocksError + } + } + return createBlock(samples, dbDir, logger) +} + +// createHead creates a TSDB writer head to write the sample data to. +func createHead(samples []*metricSample, chunkRange int64, logger log.Logger) (*tsdb.Head, error) { + head, err := tsdb.NewHead(nil, logger, nil, chunkRange) + if err != nil { + return nil, err + } + app := head.Appender() + for _, sample := range samples { + _, err = app.Add(sample.Labels, sample.TimestampMs, sample.Value) + if err != nil { + return nil, err + } + } + err = app.Commit() + if err != nil { + return nil, err + } + return head, nil +} + +// createBlock creates a 2h block from the samples passed to it, and writes it to disk. +func createBlock(samples []*metricSample, dir string, logger log.Logger) (string, error) { + // 2h head block + head, err := createHead(samples, BlockDuration, logger) + if err != nil { + return "", err + } + compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, logger, tsdb.DefaultOptions.BlockRanges, nil) + if err != nil { + return "", err + } + + err = os.MkdirAll(dir, 0777) + if err != nil { + return "", err + } + + ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) + if err != nil { + return "", err + } + return filepath.Join(dir, ulid.String()), nil +} + +// copyToDatabase copies the snapshot created to the TSDB DB directory. +// TSDB operates such that it automatically picks up the newly created +// snapshot(s) and treats them as it would any other block. +func copyToDatabase(snapshotPath string, dbPath string) error { + return copy.Copy(snapshotPath, dbPath) +} + +// verifyIntegration returns an error if the any of the blocks in the DB intersect with +// the provided time range. +func verifyIntegration(dbmint, dbmaxt, mint, maxt timestamp) error { + if dbmaxt >= mint && dbmaxt <= maxt { + return OverlappingBlocksError + } + if dbmint >= mint && dbmint <= maxt { + return OverlappingBlocksError + } + return nil +} + +// getDbTimeLimits returns the first and last timestamps of the target TSDB instance. +func getDbTimeLimits(dbPath string) (timestamp, timestamp, error) { + mint := int64(math.MinInt64) + maxt := int64(math.MaxInt64) + // If we try to open a regular RW handle on an active TSDB instance, + // it will fail. Hence, we open a RO handle. + db, err := tsdb.OpenDBReadOnly(dbPath, nil) + if err != nil { + return mint, maxt, err + } + defer db.Close() + blocks, err := db.Blocks() + if err != nil { + if err.Error() != "no blocks found" { + return mint, maxt, err + } + } + for idx, block := range blocks { + bmint, bmaxt := block.Meta().MinTime, block.Meta().MaxTime + if idx == 0 { + mint, maxt = bmint, bmaxt + } else { + mint = minInt(mint, bmint) + maxt = maxInt(maxt, bmaxt) + } + } + return mint, maxt, nil +} diff --git a/importer/import_test.go b/importer/import_test.go new file mode 100644 index 00000000..08a2445e --- /dev/null +++ b/importer/import_test.go @@ -0,0 +1,184 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importer + +import ( + "io/ioutil" + "math" + "os" + "testing" + + "github.com/prometheus/tsdb/testutil" +) + +var contentType = "application/openmetrics-text; version=0.0.1; charset=utf-8" + +func TestParseMetrics(t *testing.T) { + tests := []struct { + ToParse string + IsOk bool + }{ + { + ToParse: ``, + IsOk: true, + }, + { + ToParse: `# HELP http_requests_total The total number of HTTP requests. +# TYPE http_requests_total counter +http_requests_total{method="post",code="200"} 1027 1565133713989 +http_requests_total{method="post",code="400"} 3 1575133713979 +`, + IsOk: true, + }, + { + ToParse: `# HELP http_requests_total The total number of HTTP requests. +# TYPE http_requests_total counter +http_requests_total{method="post",code="200"} 1027 1395066363000 +http_requests_total{method="post",code="400"} 3 1395066363000 +`, + IsOk: true, + }, + { + ToParse: `# HELP something_weird Something weird +# TYPE something_weird gauge +something_weird{problem="infinite timestamp"} +Inf -3982045 +`, + IsOk: false, + }, + { + ToParse: `# HELP rpc_duration_seconds A summary of the RPC duration in seconds. +# TYPE rpc_duration_seconds summary +rpc_duration_seconds{quantile="0.01"} 3102 +rpc_duration_seconds{quantile="0.05"} 3272 +`, + IsOk: true, + }, + { + ToParse: `# HELP no_type_metric This is a metric with no TYPE string +no_type_metric{type="bad_news_bears"} 0.0 111 +`, + IsOk: true, + }, + { + ToParse: `# HELP bad_ts This is a metric with an extreme timestamp +# TYPE bad_ts gauge +bad_ts{type="bad_timestamp"} 420 -1e99 +`, + IsOk: false, + }, + { + ToParse: `# HELP bad_ts This is a metric with an extreme timestamp +# TYPE bad_ts gauge +bad_ts{type="bad_timestamp"} 420 1e99 +`, + IsOk: false, + }, + { + ToParse: `no_help_no_type{foo="bar"} 42 6900 +`, + IsOk: true, + }, + { + ToParse: `bare_metric 42.24 +`, + IsOk: true, + }, + { + ToParse: `# HELP bad_metric This a bad metric +# TYPE bad_metric bad_type +bad_metric{type="has no type information"} 0.0 111 +`, + IsOk: false, + }, + { + ToParse: `# HELP no_nl This test has no newline so will fail +# TYPE no_nl gauge +no_nl{type="no newline"}`, + IsOk: false, + }, + } + for _, test := range tests { + tmpDbDir, err := ioutil.TempDir("", "importer") + testutil.Ok(t, err) + _, err = pushMetrics([]byte(test.ToParse), contentType, tmpDbDir, int64(math.MinInt64), int64(math.MaxInt64), false, nil) + if test.IsOk { + testutil.Ok(t, err) + } else { + testutil.NotOk(t, err) + } + _ = os.RemoveAll(tmpDbDir) + } +} + +func TestPushMetrics(t *testing.T) { + metrics := `# HELP http_requests_total The total number of HTTP requests. +# TYPE http_requests_total counter +http_requests_total{method="post",code="200"} 1027 1565133713989 +http_requests_total{method="post",code="400"} 3 1575133713979 +` + tmpDbDir, err := ioutil.TempDir("", "importer") + testutil.Ok(t, err) + defer os.RemoveAll(tmpDbDir) + _, err = pushMetrics([]byte(metrics), contentType, tmpDbDir, int64(math.MinInt64), int64(math.MaxInt64), false, nil) + testutil.Ok(t, err) +} +func TestImportFromFile(t *testing.T) { + text := `# HELP rpc_duration_seconds A summary of the RPC duration in seconds. +# TYPE rpc_duration_seconds summary +rpc_duration_seconds{quantile="0.01"} 3102 +rpc_duration_seconds{quantile="0.05"} 3272 +` + tmpFile, err := ioutil.TempFile("", "iff") + testutil.Ok(t, err) + defer tmpFile.Close() + tmpFile.WriteString(text) + tmpDbDir, err := ioutil.TempDir("", "iff-db") + testutil.Ok(t, err) + defer os.RemoveAll(tmpDbDir) + err = ImportFromFile(tmpFile.Name(), contentType, tmpDbDir, false, nil) + testutil.Ok(t, err) + + // No file found case + err = ImportFromFile("/foo/bar/baz/buzz", contentType, "/buzz/baz/bar/foo", false, nil) + testutil.NotOk(t, err) + + // Bad text file + text = `# HELP rpc_duration_seconds A summary of the RPC duration in seconds. +# TYPE rpc_duration_seconds bad_type +rpc_duration_seconds{quantile="0.01"} 3102 +rpc_duration_seconds{quantile="0.05"} 3272 +` + tmpFile2, err := ioutil.TempFile("", "iff") + testutil.Ok(t, err) + defer tmpFile2.Close() + tmpFile2.WriteString(text) + tmpDbDir2, err := ioutil.TempDir("", "iff-db") + testutil.Ok(t, err) + defer os.RemoveAll(tmpDbDir2) + err = ImportFromFile(tmpFile2.Name(), contentType, tmpDbDir2, false, nil) + testutil.NotOk(t, err) +} + +//func TestLargeDataset(t *testing.T) { +// //tmpDbDir := "/Users/dpanjabi/tmp/iff-db" +// //_ = os.RemoveAll(tmpDbDir) +// //err := os.MkdirAll(tmpDbDir, 0777) +// //testutil.Ok(t, err) +// tmpDbDir, err := ioutil.TempDir("", "iff-db") +// testutil.Ok(t, err) +// defer os.RemoveAll(tmpDbDir) +// filename := "/Users/dpanjabi/projects/src/github.com/prometheus/go-prom-importer/dummy_prometheus_metrics.dat" +// err = ImportFromFile(filename, contentType, tmpDbDir, false, nil) +// testutil.Ok(t, err) +//} diff --git a/importer/util.go b/importer/util.go new file mode 100644 index 00000000..9747fe3a --- /dev/null +++ b/importer/util.go @@ -0,0 +1,28 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importer + +func minInt(a, b int64) int64 { + if a < b { + return a + } + return b +} + +func maxInt(a, b int64) int64 { + if b < a { + return a + } + return b +}