Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 5 additions & 76 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package tsdb
import (
"context"
"encoding/binary"

"errors"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/go-kit/kit/log"

"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
Expand Down Expand Up @@ -57,7 +56,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()

blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
blockDir := CreateBlock(t, tmpdir, GenSeries(1, 1, 0, 1))
b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed)
Expand All @@ -77,7 +76,7 @@ func TestCreateBlock(t *testing.T) {
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil)
b, err := OpenBlock(nil, CreateBlock(t, tmpdir, GenSeries(1, 1, 0, 10)), nil)
if err == nil {
testutil.Ok(t, b.Close())
}
Expand Down Expand Up @@ -134,7 +133,7 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()

blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
blockDir := CreateBlock(t, tmpdir, GenSeries(1, 1, 0, 1))
files, err := sequenceFiles(chunkDir(blockDir))
testutil.Ok(t, err)
testutil.Assert(t, len(files) > 0, "No chunk created.")
Expand Down Expand Up @@ -168,7 +167,7 @@ func TestBlockSize(t *testing.T) {

// Create a block and compare the reported size vs actual disk size.
{
blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100))
blockDirInit = CreateBlock(t, tmpdir, GenSeries(10, 1, 1, 100))
blockInit, err = OpenBlock(nil, blockDirInit, nil)
testutil.Ok(t, err)
defer func() {
Expand Down Expand Up @@ -204,76 +203,6 @@ func TestBlockSize(t *testing.T) {
}
}

// createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []Series) string {
head := createHead(tb, series)
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil)
testutil.Ok(tb, err)

testutil.Ok(tb, os.MkdirAll(dir, 0777))

// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil)
testutil.Ok(tb, err)
return filepath.Join(dir, ulid.String())
}

func createHead(tb testing.TB, series []Series) *Head {
head, err := NewHead(nil, nil, nil, 2*60*60*1000)
testutil.Ok(tb, err)
defer head.Close()

app := head.Appender()
for _, s := range series {
ref := uint64(0)
it := s.Iterator()
for it.Next() {
t, v := it.At()
if ref != 0 {
err := app.AddFast(ref, t, v)
if err == nil {
continue
}
}
ref, err = app.Add(s.Labels(), t, v)
testutil.Ok(tb, err)
}
testutil.Ok(tb, it.Err())
}
err = app.Commit()
testutil.Ok(tb, err)
return head
}

const (
defaultLabelName = "labelName"
defaultLabelValue = "labelValue"
)

// genSeries generates series with a given number of labels and values.
func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
if totalSeries == 0 || labelCount == 0 {
return nil
}

series := make([]Series, totalSeries)

for i := 0; i < totalSeries; i++ {
lbls := make(map[string]string, labelCount)
lbls[defaultLabelName] = strconv.Itoa(i)
for j := 1; len(lbls) < labelCount; j++ {
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
}
samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
for t := mint; t < maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()})
}
series[i] = newSeries(lbls, samples)
}
return series
}

// populateSeries generates series from given labels, mint and maxt.
func populateSeries(lbls []map[string]string, mint, maxt int64) []Series {
if len(lbls) == 0 {
Expand Down
87 changes: 51 additions & 36 deletions cmd/tsdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,18 @@ import (

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/labels"
"gopkg.in/alecthomas/kingpin.v2"
)

const (
printBlocksTableHeader = "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES"
defaultAnalyzeLimit = "20"
timeDelta = 30000
)

func main() {
Expand All @@ -62,7 +69,7 @@ func execute() (err error) {
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default(defaultAnalyzeLimit).Int()
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
Expand Down Expand Up @@ -95,7 +102,7 @@ func execute() (err error) {
if err != nil {
return err
}
printBlocks(blocks, listCmdHumanReadable)
printBlocks(os.Stdout, blocks, listCmdHumanReadable)
case analyzeCmd.FullCommand():
db, err := tsdb.OpenDBReadOnly(*analyzePath, nil)
if err != nil {
Expand All @@ -110,21 +117,12 @@ func execute() (err error) {
if err != nil {
return err
}
var block tsdb.BlockReader
if *analyzeBlockID != "" {
for _, b := range blocks {
if b.Meta().ULID.String() == *analyzeBlockID {
block = b
break
}
}
} else if len(blocks) > 0 {
block = blocks[len(blocks)-1]
}
if block == nil {
return fmt.Errorf("block not found")
block, err := extractBlock(blocks, analyzeBlockID)
if err != nil {
return err
}
return analyzeBlock(block, *analyzeLimit)

return analyzeBlock(os.Stdout, block, *analyzeLimit)
case dumpCmd.FullCommand():
db, err := tsdb.OpenDBReadOnly(*dumpPath, nil)
if err != nil {
Expand All @@ -140,6 +138,25 @@ func execute() (err error) {
return nil
}

// extractBlock takes a slice of BlockReader and returns a specific block by ID.
func extractBlock(blocks []tsdb.BlockReader, analyzeBlockID *string) (tsdb.BlockReader, error) {
var block tsdb.BlockReader
if *analyzeBlockID != "" {
for _, b := range blocks {
if b.Meta().ULID.String() == *analyzeBlockID {
block = b
break
}
}
} else if len(blocks) > 0 {
block = blocks[len(blocks)-1]
}
if block == nil {
return nil, fmt.Errorf("block not found")
}
return block, nil
}

type writeBenchmark struct {
outPath string
samplesFile string
Expand Down Expand Up @@ -235,8 +252,6 @@ func (b *writeBenchmark) run() error {
return nil
}

const timeDelta = 30000

func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) {
var mu sync.Mutex
var total uint64
Expand Down Expand Up @@ -434,11 +449,11 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
return mets, nil
}

func printBlocks(blocks []tsdb.BlockReader, humanReadable *bool) {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
func printBlocks(w io.Writer, blocks []tsdb.BlockReader, humanReadable *bool) {
tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
defer tw.Flush()

fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES")
fmt.Fprintln(tw, printBlocksTableHeader)
for _, b := range blocks {
meta := b.Meta()

Expand All @@ -461,12 +476,12 @@ func getFormatedTime(timestamp int64, humanReadable *bool) string {
return strconv.FormatInt(timestamp, 10)
}

func analyzeBlock(b tsdb.BlockReader, limit int) error {
func analyzeBlock(w io.Writer, b tsdb.BlockReader, limit int) error {
meta := b.Meta()
fmt.Printf("Block ID: %s\n", meta.ULID)
fmt.Fprintf(w, "Block ID: %s\n", meta.ULID)
// Presume 1ms resolution that Prometheus uses.
fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
fmt.Printf("Series: %d\n", meta.Stats.NumSeries)
fmt.Fprintf(w, "Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
fmt.Fprintf(w, "Series: %d\n", meta.Stats.NumSeries)
ir, err := b.Index()
if err != nil {
return err
Expand All @@ -477,7 +492,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
if err != nil {
return err
}
fmt.Printf("Label names: %d\n", len(allLabelNames))
fmt.Fprintf(w, "Label names: %d\n", len(allLabelNames))

type postingInfo struct {
key string
Expand All @@ -489,7 +504,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
sort.Slice(postingInfos, func(i, j int) bool { return postingInfos[i].metric > postingInfos[j].metric })

for i, pc := range postingInfos {
fmt.Printf("%d %s\n", pc.metric, pc.key)
fmt.Fprintf(w, "%d %s\n", pc.metric, pc.key)
if i >= limit {
break
}
Expand Down Expand Up @@ -523,31 +538,31 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
if p.Err() != nil {
return p.Err()
}
fmt.Printf("Postings (unique label pairs): %d\n", len(labelpairsUncovered))
fmt.Printf("Postings entries (total label pairs): %d\n", entries)
fmt.Fprintf(w, "Postings (unique label pairs): %d\n", len(labelpairsUncovered))
fmt.Fprintf(w, "Postings entries (total label pairs): %d\n", entries)

postingInfos = postingInfos[:0]
for k, m := range labelpairsUncovered {
postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))})
}

fmt.Printf("\nLabel pairs most involved in churning:\n")
fmt.Fprintf(w, "\nLabel pairs most involved in churning:\n")
printInfo(postingInfos)

postingInfos = postingInfos[:0]
for k, m := range labelsUncovered {
postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))})
}

fmt.Printf("\nLabel names most involved in churning:\n")
fmt.Fprintf(w, "\nLabel names most involved in churning:\n")
printInfo(postingInfos)

postingInfos = postingInfos[:0]
for k, m := range labelpairsCount {
postingInfos = append(postingInfos, postingInfo{k, m})
}

fmt.Printf("\nMost common label pairs:\n")
fmt.Fprintf(w, "\nMost common label pairs:\n")
printInfo(postingInfos)

postingInfos = postingInfos[:0]
Expand All @@ -571,7 +586,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
postingInfos = append(postingInfos, postingInfo{n, cumulativeLength})
}

fmt.Printf("\nLabel names with highest cumulative label value length:\n")
fmt.Fprintf(w, "\nLabel names with highest cumulative label value length:\n")
printInfo(postingInfos)

postingInfos = postingInfos[:0]
Expand All @@ -582,7 +597,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
}
postingInfos = append(postingInfos, postingInfo{n, uint64(lv.Len())})
}
fmt.Printf("\nHighest cardinality labels:\n")
fmt.Fprintf(w, "\nHighest cardinality labels:\n")
printInfo(postingInfos)

postingInfos = postingInfos[:0]
Expand Down Expand Up @@ -610,7 +625,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
postingInfos = append(postingInfos, postingInfo{n, uint64(count)})
}
}
fmt.Printf("\nHighest cardinality metric names:\n")
fmt.Fprintf(w, "\nHighest cardinality metric names:\n")
printInfo(postingInfos)
return nil
}
Expand Down
Loading