From d43a8da859a3d86c40dadb4c0fc5f0b3bc32845a Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 13 May 2025 23:01:08 -0700 Subject: [PATCH 1/2] Implementing Parquet Queryable with fallback Signed-off-by: alanprot --- go.mod | 2 +- go.sum | 4 +- pkg/querier/parquet_queryable.go | 349 ++++++++++ .../parquet-common/convert/convert.go | 17 +- .../parquet-common/convert/merge.go | 76 +++ .../parquet-common/convert/writer.go | 7 +- .../parquet-common/schema/encoder.go | 207 ++++-- .../parquet-common/schema/schema.go | 3 +- .../parquet-common/schema/schema_builder.go | 40 ++ .../parquet-common/search/constraint.go | 610 ++++++++++++++++++ .../parquet-common/search/materialize.go | 511 +++++++++++++++ .../search/parquet_queriable.go | 281 ++++++++ .../parquet-common/search/rowrange.go | 166 +++++ .../parquet-common/storage/bucket_read_at.go | 61 ++ .../parquet-common/storage/parquet_shard.go | 96 +++ .../parquet-common/util/bucket_read_at.go | 35 - .../parquet-common/util/strutil.go | 84 +++ .../parquet-common/util/util.go | 82 ++- vendor/modules.txt | 4 +- 19 files changed, 2528 insertions(+), 107 deletions(-) create mode 100644 pkg/querier/parquet_queryable.go create mode 100644 vendor/github.com/prometheus-community/parquet-common/search/constraint.go create mode 100644 vendor/github.com/prometheus-community/parquet-common/search/materialize.go create mode 100644 vendor/github.com/prometheus-community/parquet-common/search/parquet_queriable.go create mode 100644 vendor/github.com/prometheus-community/parquet-common/search/rowrange.go create mode 100644 vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go create mode 100644 vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go delete mode 100644 vendor/github.com/prometheus-community/parquet-common/util/bucket_read_at.go create mode 100644 vendor/github.com/prometheus-community/parquet-common/util/strutil.go diff --git a/go.mod b/go.mod index 6c65d04a68..853b5af63e 100644 --- a/go.mod +++ b/go.mod @@ -82,7 +82,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/parquet-go/parquet-go v0.25.0 - github.com/prometheus-community/parquet-common v0.0.0-20250428074311-306c8486441d + github.com/prometheus-community/parquet-common v0.0.0-20250514003255-382b6ec8ae40 github.com/prometheus/procfs v0.15.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.3 diff --git a/go.sum b/go.sum index 20e7469560..08d1794d98 100644 --- a/go.sum +++ b/go.sum @@ -1573,8 +1573,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250428074311-306c8486441d h1:j7d62fP5x6yUFNgNDth5JCLOoj6ZclXkBneSATbPZig= -github.com/prometheus-community/parquet-common v0.0.0-20250428074311-306c8486441d/go.mod h1:Eo3B53ZLcfCEV06clM4UIFTgwxRXm0BHdiaRslKe3Y8= +github.com/prometheus-community/parquet-common v0.0.0-20250514003255-382b6ec8ae40 h1:45NOJV7a7QGKg7rITB8wCs/f5O4bpe3mPAiTFsqcX8s= +github.com/prometheus-community/parquet-common v0.0.0-20250514003255-382b6ec8ae40/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A= github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 h1:owfYHh79h8Y5HvNMGyww+DaVwo10CKiRW1RQrrZzIwg= github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0/go.mod h1:rT989D4UtOcfd9tVqIZRVIM8rkg+9XbreBjFNEKXvVI= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go new file mode 100644 index 0000000000..0ae35f364d --- /dev/null +++ b/pkg/querier/parquet_queryable.go @@ -0,0 +1,349 @@ +package querier + +import ( + "context" + "fmt" + "time" + + "github.com/go-kit/log" + "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/search" + parquet_storage "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/thanos/pkg/strutil" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/multierror" + "github.com/cortexproject/cortex/pkg/util/services" +) + +type contextKey int + +var ( + blockIdsCtxKey contextKey = 0 +) + +type parquetQueryableWithFallback struct { + services.Service + + queryStoreAfter time.Duration + parquetQueryable storage.Queryable + blockStorageQueryable BlocksStoreQueryable + + finder BlocksFinder + + // Subservices manager. + subservices *services.Manager + subservicesWatcher *services.FailureWatcher +} + +func newParquetQueryable( + storageCfg cortex_tsdb.BlocksStorageConfig, + limits BlocksStoreLimits, + config Config, + blockStorageQueryable BlocksStoreQueryable, + logger log.Logger, + reg prometheus.Registerer, +) (storage.Queryable, error) { + bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, nil, "parquet-querier", logger, reg) + + if err != nil { + return nil, err + } + + // Create the blocks finder. + var finder BlocksFinder + if storageCfg.BucketStore.BucketIndex.Enabled { + finder = NewBucketIndexBlocksFinder(BucketIndexBlocksFinderConfig{ + IndexLoader: bucketindex.LoaderConfig{ + CheckInterval: time.Minute, + UpdateOnStaleInterval: storageCfg.BucketStore.SyncInterval, + UpdateOnErrorInterval: storageCfg.BucketStore.BucketIndex.UpdateOnErrorInterval, + IdleTimeout: storageCfg.BucketStore.BucketIndex.IdleTimeout, + }, + MaxStalePeriod: storageCfg.BucketStore.BucketIndex.MaxStalePeriod, + IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay, + IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin, + }, bucketClient, limits, logger, reg) + } else { + finder = NewBucketScanBlocksFinder(BucketScanBlocksFinderConfig{ + ScanInterval: storageCfg.BucketStore.SyncInterval, + TenantsConcurrency: storageCfg.BucketStore.TenantSyncConcurrency, + MetasConcurrency: storageCfg.BucketStore.MetaSyncConcurrency, + CacheDir: storageCfg.BucketStore.SyncDir, + IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay, + IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin, + BlockDiscoveryStrategy: storageCfg.BucketStore.BlockDiscoveryStrategy, + }, bucketClient, limits, logger, reg) + } + + manager, err := services.NewManager(finder) + if err != nil { + return nil, err + } + + pq, err := search.NewParquetQueryable(nil, func(ctx context.Context, mint, maxt int64) ([]*parquet_storage.ParquetShard, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + + blocks := ctx.Value(blockIdsCtxKey).([]*bucketindex.Block) + userBkt := bucket.NewUserBucketClient(userID, bucketClient, limits) + + shards := make([]*parquet_storage.ParquetShard, 0, len(blocks)) + + for _, block := range blocks { + blockName := fmt.Sprintf("%v/block", block.ID.String()) + shard, err := parquet_storage.OpenParquetShard(ctx, userBkt, blockName, 0) + if err != nil { + return nil, err + } + shards = append(shards, shard) + } + + return shards, nil + }) + + q := &parquetQueryableWithFallback{ + subservices: manager, + blockStorageQueryable: blockStorageQueryable, + parquetQueryable: pq, + queryStoreAfter: config.QueryStoreAfter, + subservicesWatcher: services.NewFailureWatcher(), + finder: finder, + } + + q.Service = services.NewBasicService(q.starting, q.running, q.stopping) + + return pq, nil +} + +func (p *parquetQueryableWithFallback) starting(ctx context.Context) error { + p.subservicesWatcher.WatchManager(p.subservices) + if err := services.StartManagerAndAwaitHealthy(ctx, p.subservices); err != nil { + return errors.Wrap(err, "unable to start blocks storage queryable subservices") + } + return nil +} + +func (p *parquetQueryableWithFallback) running(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case err := <-p.subservicesWatcher.Chan(): + return errors.Wrap(err, "block storage queryable subservice failed") + } + } +} + +func (p *parquetQueryableWithFallback) stopping(_ error) error { + return services.StopManagerAndAwaitStopped(context.Background(), p.subservices) +} + +func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querier, error) { + pq, err := p.parquetQueryable.Querier(mint, maxt) + if err != nil { + return nil, err + } + + bsq, err := p.blockStorageQueryable.Querier(mint, maxt) + if err != nil { + return nil, err + } + + return &parquetQuerier{ + minT: mint, + maxT: maxt, + parquetQuerier: pq, + queryStoreAfter: p.queryStoreAfter, + blocksStoreQuerier: bsq, + finder: p.finder, + }, nil +} + +type parquetQuerier struct { + minT, maxT int64 + + finder BlocksFinder + + parquetQuerier storage.Querier + blocksStoreQuerier storage.Querier + + // If set, the querier manipulates the max time to not be greater than + // "now - queryStoreAfter" so that most recent blocks are not queried. + queryStoreAfter time.Duration +} + +func (q *parquetQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT) + if err != nil { + return nil, nil, err + } + + limit := 0 + + if hints != nil { + limit = hints.Limit + } + + var ( + result []string + rAnnotations annotations.Annotations + ) + + if len(parquet) > 0 { + res, ann, qErr := q.parquetQuerier.LabelValues(context.WithValue(ctx, blockIdsCtxKey, parquet), name, hints, matchers...) + if qErr != nil { + return nil, nil, err + } + result = res + rAnnotations = ann + } + + if len(remaining) > 0 { + res, ann, qErr := q.blocksStoreQuerier.LabelValues(context.WithValue(ctx, blockIdsCtxKey, remaining), name, hints, matchers...) + if qErr != nil { + return nil, nil, err + } + + if len(result) == 0 { + result = res + } else { + result = strutil.MergeSlices(limit, result, res) + } + + if rAnnotations != nil { + rAnnotations = rAnnotations.Merge(ann) + } + } + + return result, rAnnotations, nil +} + +func (q *parquetQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT) + if err != nil { + return nil, nil, err + } + + limit := 0 + + if hints != nil { + limit = hints.Limit + } + + var ( + result []string + rAnnotations annotations.Annotations + ) + + if len(parquet) > 0 { + res, ann, qErr := q.parquetQuerier.LabelNames(context.WithValue(ctx, blockIdsCtxKey, parquet), hints, matchers...) + if qErr != nil { + return nil, nil, err + } + result = res + rAnnotations = ann + } + + if len(remaining) > 0 { + res, ann, qErr := q.blocksStoreQuerier.LabelNames(context.WithValue(ctx, blockIdsCtxKey, remaining), hints, matchers...) + if qErr != nil { + return nil, nil, err + } + + if len(result) == 0 { + result = res + } else { + result = strutil.MergeSlices(limit, result, res) + } + + if rAnnotations != nil { + rAnnotations = rAnnotations.Merge(ann) + } + } + + return result, rAnnotations, nil +} + +func (q *parquetQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + mint, maxt, limit := q.minT, q.maxT, 0 + + if hints != nil { + mint, maxt, limit = hints.Start, hints.End, hints.Limit + } + + remaining, parquet, err := q.getBlocks(ctx, mint, maxt) + if err != nil { + return storage.ErrSeriesSet(err) + } + + serieSets := []storage.SeriesSet{} + + if len(parquet) > 0 { + serieSets = append(serieSets, q.parquetQuerier.Select(context.WithValue(ctx, blockIdsCtxKey, parquet), sortSeries, hints, matchers...)) + } + + if len(remaining) > 0 { + serieSets = append(serieSets, q.blocksStoreQuerier.Select(context.WithValue(ctx, blockIdsCtxKey, remaining), sortSeries, hints, matchers...)) + } + + if len(serieSets) == 1 { + return serieSets[0] + } + + return storage.NewMergeSeriesSet(serieSets, limit, storage.ChainedSeriesMerge) +} + +func (q *parquetQuerier) Close() error { + mErr := multierror.MultiError{} + mErr.Add(q.parquetQuerier.Close()) + mErr.Add(q.blocksStoreQuerier.Close()) + return mErr.Err() +} + +func (q *parquetQuerier) getBlocks(ctx context.Context, minT, maxT int64) ([]*bucketindex.Block, []*bucketindex.Block, error) { + // If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until + // now - queryStoreAfter, because the most recent time range is covered by ingesters. This + // optimization is particularly important for the blocks storage because can be used to skip + // querying most recent not-compacted-yet blocks from the storage. + if q.queryStoreAfter > 0 { + now := time.Now() + maxT = min(maxT, util.TimeToMillis(now.Add(-q.queryStoreAfter))) + + if maxT < minT { + return nil, nil, nil + } + } + + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, nil, err + } + + blocks, _, err := q.finder.GetBlocks(ctx, userID, minT, maxT) + if err != nil { + return nil, nil, err + } + + parquetBlocks := make([]*bucketindex.Block, 0, len(blocks)) + remaining := make([]*bucketindex.Block, 0, len(blocks)) + for _, b := range blocks { + if b.Parquet != nil { + parquetBlocks = append(parquetBlocks, b) + continue + } + remaining = append(remaining, b) + } + + return remaining, parquetBlocks, nil +} diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go index 10e426a241..d4862646a6 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go @@ -23,16 +23,17 @@ import ( "strings" "time" + "github.com/efficientgo/core/errors" "github.com/hashicorp/go-multierror" "github.com/parquet-go/parquet-go" - "github.com/pkg/errors" - "github.com/prometheus-community/parquet-common/schema" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/thanos-io/objstore" + + "github.com/prometheus-community/parquet-common/schema" ) var DefaultConvertOpts = convertOpts{ @@ -119,6 +120,18 @@ func WithName(name string) ConvertOption { } } +func WithNumRowGroups(n int) ConvertOption { + return func(opts *convertOpts) { + opts.numRowGroups = n + } +} + +func WithRowGroupSize(size int) ConvertOption { + return func(opts *convertOpts) { + opts.rowGroupSize = size + } +} + func WithConcurrency(concurrency int) ConvertOption { return func(opts *convertOpts) { opts.concurrency = concurrency diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/merge.go b/vendor/github.com/prometheus-community/parquet-common/convert/merge.go index d6f40ed0f7..90db2ec54e 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/merge.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/merge.go @@ -16,6 +16,8 @@ package convert import ( "container/heap" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" @@ -124,3 +126,77 @@ func (h *heapChunkSeries) Pop() any { h.heap = old[0 : n-1] return x } + +func NewChunksSeriesSet(series []storage.ChunkSeries) storage.ChunkSeriesSet { + return &concreteChunkSeries{ + series: series, + curr: -1, + } +} + +type concreteChunkSeries struct { + series []storage.ChunkSeries + curr int +} + +func (c *concreteChunkSeries) Next() bool { + c.curr++ + return c.curr < len(c.series) +} + +func (c *concreteChunkSeries) At() storage.ChunkSeries { + return c.series[c.curr] +} + +func (c *concreteChunkSeries) Err() error { + return nil +} + +func (c *concreteChunkSeries) Warnings() annotations.Annotations { + return nil +} + +// NewSeriesSetFromChunkSeriesSet is an adaptor to transform ChunkSeriesSet to SeriesSet when the +func NewSeriesSetFromChunkSeriesSet(ss storage.ChunkSeriesSet, skipChunks bool) storage.SeriesSet { + if skipChunks { + return seriesSetFromChunksSeriesSet{chunkSeriesSet: ss} + } + return storage.NewSeriesSetFromChunkSeriesSet(ss) +} + +var ( + _ storage.SeriesSet = (*seriesSetFromChunksSeriesSet)(nil) + _ storage.Series = (*seriesSetFromChunksSeries)(nil) +) + +type seriesSetFromChunksSeries struct { + chunkSeries storage.ChunkSeries +} + +func (s seriesSetFromChunksSeries) Labels() labels.Labels { + return s.chunkSeries.Labels() +} + +func (s seriesSetFromChunksSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator { + return storage.NewListSeriesIterator(nil) +} + +type seriesSetFromChunksSeriesSet struct { + chunkSeriesSet storage.ChunkSeriesSet +} + +func (s seriesSetFromChunksSeriesSet) Next() bool { + return s.chunkSeriesSet.Next() +} + +func (s seriesSetFromChunksSeriesSet) At() storage.Series { + return &seriesSetFromChunksSeries{chunkSeries: s.chunkSeriesSet.At()} +} + +func (s seriesSetFromChunksSeriesSet) Err() error { + return nil +} + +func (s seriesSetFromChunksSeriesSet) Warnings() annotations.Annotations { + return nil +} diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/writer.go b/vendor/github.com/prometheus-community/parquet-common/convert/writer.go index 2f20a32d07..e182bf3c98 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/writer.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/writer.go @@ -18,14 +18,15 @@ import ( "fmt" "io" + "github.com/efficientgo/core/errors" "github.com/hashicorp/go-multierror" "github.com/parquet-go/parquet-go" - "github.com/pkg/errors" - "github.com/prometheus-community/parquet-common/schema" - "github.com/prometheus-community/parquet-common/util" "github.com/prometheus/prometheus/util/zeropool" "github.com/thanos-io/objstore" "golang.org/x/sync/errgroup" + + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/util" ) type ShardedWriter struct { diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go b/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go index a858a22309..6b6556a4c3 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go @@ -17,6 +17,7 @@ import ( "bytes" "encoding/binary" "fmt" + "io" "math" "slices" "sort" @@ -49,19 +50,36 @@ func (e *PrometheusParquetChunksEncoder) Encode(it chunks.Iterator) ([][]byte, e dataColSize := len(e.schema.DataColsIndexes) - reEncodedChunks := make([]chunks.Meta, dataColSize) - reEncodedChunksAppenders := make([]chunkenc.Appender, dataColSize) + reEncodedChunks := make([]map[chunkenc.Encoding][]*chunks.Meta, dataColSize) + reEncodedChunksAppenders := make([]map[chunkenc.Encoding]chunkenc.Appender, dataColSize) for i := 0; i < dataColSize; i++ { - reEncodedChunks[i] = chunks.Meta{ - Chunk: chunkenc.NewXORChunk(), - MinTime: math.MaxInt64, - } - app, err := reEncodedChunks[i].Chunk.Appender() - if err != nil { - return nil, err + reEncodedChunks[i] = make(map[chunkenc.Encoding][]*chunks.Meta) + reEncodedChunksAppenders[i] = make(map[chunkenc.Encoding]chunkenc.Appender) + + for _, enc := range []chunkenc.Encoding{chunkenc.EncXOR, chunkenc.EncHistogram, chunkenc.EncFloatHistogram} { + var chunk chunkenc.Chunk + switch enc { + case chunkenc.EncXOR: + chunk = chunkenc.NewXORChunk() + case chunkenc.EncHistogram: + chunk = chunkenc.NewHistogramChunk() + case chunkenc.EncFloatHistogram: + chunk = chunkenc.NewFloatHistogramChunk() + default: + return nil, fmt.Errorf("unknown encoding %v", enc) + } + + reEncodedChunks[i][enc] = append(reEncodedChunks[i][enc], &chunks.Meta{ + Chunk: chunk, + MinTime: math.MaxInt64, + }) + app, err := reEncodedChunks[i][enc][0].Chunk.Appender() + if err != nil { + return nil, err + } + reEncodedChunksAppenders[i][enc] = app } - reEncodedChunksAppenders[i] = app } var sampleIt chunkenc.Iterator @@ -70,42 +88,103 @@ func (e *PrometheusParquetChunksEncoder) Encode(it chunks.Iterator) ([][]byte, e switch chk.Chunk.Encoding() { case chunkenc.EncXOR: for vt := sampleIt.Next(); vt != chunkenc.ValNone; vt = sampleIt.Next() { - // TODO: Native histograms support if vt != chunkenc.ValFloat { return nil, fmt.Errorf("found value type %v in float chunk", vt) } t, v := sampleIt.At() chkIdx := e.schema.DataColumIdx(t) - reEncodedChunksAppenders[chkIdx].Append(t, v) - if t < reEncodedChunks[chkIdx].MinTime { - reEncodedChunks[chkIdx].MinTime = t + reEncodedChunksAppenders[chkIdx][chunkenc.EncXOR].Append(t, v) + if t < reEncodedChunks[chkIdx][chunkenc.EncXOR][len(reEncodedChunks[chkIdx][chunkenc.EncXOR])-1].MinTime { + reEncodedChunks[chkIdx][chunkenc.EncXOR][len(reEncodedChunks[chkIdx][chunkenc.EncXOR])-1].MinTime = t + } + if t > reEncodedChunks[chkIdx][chunkenc.EncXOR][len(reEncodedChunks[chkIdx][chunkenc.EncXOR])-1].MaxTime { + reEncodedChunks[chkIdx][chunkenc.EncXOR][len(reEncodedChunks[chkIdx][chunkenc.EncXOR])-1].MaxTime = t + } + } + case chunkenc.EncFloatHistogram: + for vt := sampleIt.Next(); vt != chunkenc.ValNone; vt = sampleIt.Next() { + if vt != chunkenc.ValFloatHistogram { + return nil, fmt.Errorf("found value type %v in float histogram chunk", vt) + } + t, v := sampleIt.AtFloatHistogram(nil) + + chkIdx := e.schema.DataColumIdx(t) + newC, recoded, app, err := reEncodedChunksAppenders[chkIdx][chunkenc.EncFloatHistogram].AppendFloatHistogram(nil, t, v, false) + if err != nil { + return nil, err + } + reEncodedChunksAppenders[chkIdx][chunkenc.EncFloatHistogram] = app + if newC != nil { + if !recoded { + reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram] = append(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram], &chunks.Meta{ + MinTime: math.MaxInt64, + }) + } + reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram])-1].Chunk = newC + } + + if t < reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram])-1].MinTime { + reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram])-1].MinTime = t + } + if t > reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram])-1].MaxTime { + reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram])-1].MaxTime = t + } + } + case chunkenc.EncHistogram: + for vt := sampleIt.Next(); vt != chunkenc.ValNone; vt = sampleIt.Next() { + if vt != chunkenc.ValHistogram { + return nil, fmt.Errorf("found value type %v in histogram chunk", vt) + } + t, v := sampleIt.AtHistogram(nil) + + chkIdx := e.schema.DataColumIdx(t) + newC, recoded, app, err := reEncodedChunksAppenders[chkIdx][chunkenc.EncHistogram].AppendHistogram(nil, t, v, false) + if err != nil { + return nil, err + } + reEncodedChunksAppenders[chkIdx][chunkenc.EncHistogram] = app + if newC != nil { + if !recoded { + reEncodedChunks[chkIdx][chunkenc.EncHistogram] = append(reEncodedChunks[chkIdx][chunkenc.EncHistogram], &chunks.Meta{ + MinTime: math.MaxInt64, + }) + } + reEncodedChunks[chkIdx][chunkenc.EncHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncHistogram])-1].Chunk = newC + } + + if t < reEncodedChunks[chkIdx][chunkenc.EncHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncHistogram])-1].MinTime { + reEncodedChunks[chkIdx][chunkenc.EncHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncHistogram])-1].MinTime = t } - if t > reEncodedChunks[chkIdx].MaxTime { - reEncodedChunks[chkIdx].MaxTime = t + if t > reEncodedChunks[chkIdx][chunkenc.EncHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncHistogram])-1].MaxTime { + reEncodedChunks[chkIdx][chunkenc.EncHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncHistogram])-1].MaxTime = t } } default: - continue + return nil, fmt.Errorf("unknown encoding %v", chk.Chunk.Encoding()) } } result := make([][]byte, dataColSize) - for i, chk := range reEncodedChunks { - if chk.Chunk.NumSamples() == 0 { - continue + for i, chunks := range reEncodedChunks { + for _, enc := range []chunkenc.Encoding{chunkenc.EncXOR, chunkenc.EncHistogram, chunkenc.EncFloatHistogram} { + for _, chk := range chunks[enc] { + if chk.Chunk.NumSamples() == 0 { + continue + } + var b [varint.MaxLen64]byte + n := binary.PutUvarint(b[:], uint64(chk.Chunk.Encoding())) + result[i] = append(result[i], b[:n]...) + n = binary.PutUvarint(b[:], uint64(chk.MinTime)) + result[i] = append(result[i], b[:n]...) + n = binary.PutUvarint(b[:], uint64(chk.MaxTime)) + result[i] = append(result[i], b[:n]...) + n = binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) + result[i] = append(result[i], b[:n]...) + result[i] = append(result[i], chk.Chunk.Bytes()...) + } } - var b [varint.MaxLen64]byte - n := binary.PutUvarint(b[:], uint64(chk.Chunk.Encoding())) - result[i] = append(result[i], b[:n]...) - n = binary.PutUvarint(b[:], uint64(chk.MinTime)) - result[i] = append(result[i], b[:n]...) - n = binary.PutUvarint(b[:], uint64(chk.MaxTime)) - result[i] = append(result[i], b[:n]...) - n = binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) - result[i] = append(result[i], b[:n]...) - result[i] = append(result[i], chk.Chunk.Bytes()...) } return result, nil } @@ -125,39 +204,47 @@ func (e *PrometheusParquetChunksDecoder) Decode(data []byte, mint, maxt int64) ( b := bytes.NewBuffer(data) - chkEnc, err := binary.ReadUvarint(b) - if err != nil { - return nil, err - } + for { + chkEnc, err := binary.ReadUvarint(b) + if err == io.EOF { + break + } - minTime, err := binary.ReadUvarint(b) - if err != nil { - return nil, err - } - if int64(minTime) > maxt { - return nil, nil - } + if err != nil { + return nil, err + } - maxTime, err := binary.ReadUvarint(b) - if err != nil { - return nil, err - } - size, err := binary.ReadUvarint(b) - if err != nil { - return nil, err - } - cData := b.Bytes()[:size] - chk, err := e.Pool.Get(chunkenc.Encoding(chkEnc), cData) - if err != nil { - return nil, err - } + minTime, err := binary.ReadUvarint(b) + if err != nil { + return nil, err + } + + maxTime, err := binary.ReadUvarint(b) + if err != nil { + return nil, err + } + size, err := binary.ReadUvarint(b) + if err != nil { + return nil, err + } + cData := b.Bytes()[:size] + chk, err := e.Pool.Get(chunkenc.Encoding(chkEnc), cData) + if err != nil { + return nil, err + } + b.Next(int(size)) - if int64(maxTime) >= mint { - result = append(result, chunks.Meta{ - MinTime: int64(minTime), - MaxTime: int64(maxTime), - Chunk: chk, - }) + if int64(minTime) > maxt { + continue + } + + if int64(maxTime) >= mint { + result = append(result, chunks.Meta{ + MinTime: int64(minTime), + MaxTime: int64(maxTime), + Chunk: chk, + }) + } } return result, nil diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/schema.go b/vendor/github.com/prometheus-community/parquet-common/schema/schema.go index 5fac886fc7..6c86484190 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/schema.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/schema.go @@ -40,8 +40,7 @@ func ExtractLabelFromColumn(col string) (string, bool) { if !strings.HasPrefix(col, LabelColumnPrefix) { return "", false } - - return strings.TrimPrefix(col, LabelColumnPrefix), true + return col[len(LabelColumnPrefix):], true } func IsDataColumn(col string) bool { diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go index 8585ade6c8..2ed8b423e8 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go @@ -17,6 +17,8 @@ import ( "fmt" "strconv" + "github.com/efficientgo/core/errors" + "github.com/parquet-go/parquet-go" ) @@ -44,6 +46,44 @@ func NewBuilder(mint, maxt, colDuration int64) *Builder { return b } +func FromLabelsFile(lf *parquet.File) (*TSDBSchema, error) { + md := MetadataToMap(lf.Metadata().KeyValueMetadata) + mint, err := strconv.ParseInt(md[MinTMd], 0, 64) + if err != nil { + return nil, errors.Wrap(err, "failed to convert mint to int") + } + + maxt, err := strconv.ParseInt(md[MaxTMd], 10, 64) + if err != nil { + return nil, errors.Wrap(err, "failed to convert max to int") + } + + dataColDurationMs, err := strconv.ParseInt(md[DataColSizeMd], 10, 64) + if err != nil { + return nil, errors.Wrap(err, "failed to convert dataColDurationMs to int") + } + g := make(parquet.Group) + + b := &Builder{ + g: g, + metadata: md, + mint: mint, + maxt: maxt, + dataColDurationMs: dataColDurationMs, + } + + for _, c := range lf.Schema().Columns() { + lbl, ok := ExtractLabelFromColumn(c[0]) + if !ok { + continue + } + + b.AddLabelNameColumn(lbl) + } + + return b.Build() +} + func (b *Builder) AddLabelNameColumn(lbls ...string) { for _, lbl := range lbls { b.g[LabelToColumn(lbl)] = parquet.Optional(parquet.Encoded(parquet.String(), &parquet.RLEDictionary)) diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go new file mode 100644 index 0000000000..4c60c56849 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go @@ -0,0 +1,610 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package search + +import ( + "bytes" + "context" + "fmt" + "slices" + "sort" + + "github.com/parquet-go/parquet-go" + "github.com/prometheus/prometheus/model/labels" + + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus-community/parquet-common/util" +) + +type Constraint interface { + fmt.Stringer + + // filter returns a set of non-overlapping increasing row indexes that may satisfy the constraint. + filter(ctx context.Context, rg parquet.RowGroup, primary bool, rr []RowRange) ([]RowRange, error) + // init initializes the constraint with respect to the file schema and projections. + init(f *storage.ParquetFile) error + // path is the path for the column that is constrained + path() string +} + +func MatchersToConstraint(matchers ...*labels.Matcher) ([]Constraint, error) { + r := make([]Constraint, 0, len(matchers)) + for _, matcher := range matchers { + switch matcher.Type { + case labels.MatchEqual: + if matcher.Value == "" { + r = append(r, Null(schema.LabelToColumn(matcher.Name))) + continue + } + r = append(r, Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value))) + case labels.MatchNotEqual: + if matcher.Value == "" { + r = append(r, Not(Null(schema.LabelToColumn(matcher.Name)))) + continue + } + r = append(r, Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value)))) + case labels.MatchRegexp: + if matcher.Value == "" { + r = append(r, Null(schema.LabelToColumn(matcher.Name))) + continue + } + if matcher.Value == ".+" { + r = append(r, Not(Null(schema.LabelToColumn(matcher.Name)))) + continue + } + res, err := labels.NewFastRegexMatcher(matcher.Value) + if err != nil { + return nil, err + } + r = append(r, Regex(schema.LabelToColumn(matcher.Name), res)) + case labels.MatchNotRegexp: + if matcher.Value == "" { + r = append(r, Not(Null(schema.LabelToColumn(matcher.Name)))) + continue + } + if matcher.Value == ".+" { + r = append(r, Null(schema.LabelToColumn(matcher.Name))) + continue + } + res, err := labels.NewFastRegexMatcher(matcher.Value) + if err != nil { + return nil, err + } + r = append(r, Not(Regex(schema.LabelToColumn(matcher.Name), res))) + default: + return nil, fmt.Errorf("unsupported matcher type %s", matcher.Type) + } + } + return r, nil +} + +func Initialize(f *storage.ParquetFile, cs ...Constraint) error { + for i := range cs { + if err := cs[i].init(f); err != nil { + return fmt.Errorf("unable to initialize constraint %d: %w", i, err) + } + } + return nil +} + +func Filter(ctx context.Context, rg parquet.RowGroup, cs ...Constraint) ([]RowRange, error) { + // Constraints for sorting columns are cheaper to evaluate, so we sort them first. + sc := rg.SortingColumns() + + var n int + for i := range sc { + if n == len(cs) { + break + } + for j := range cs { + if cs[j].path() == sc[i].Path()[0] { + cs[n], cs[j] = cs[j], cs[n] + n++ + } + } + } + var err error + rr := []RowRange{{from: int64(0), count: rg.NumRows()}} + for i := range cs { + isPrimary := len(sc) > 0 && cs[i].path() == sc[0].Path()[0] + rr, err = cs[i].filter(ctx, rg, isPrimary, rr) + if err != nil { + return nil, fmt.Errorf("unable to filter with constraint %d: %w", i, err) + } + } + return rr, nil +} + +// symbolTable is a helper that can decode the i-th value of a page. +// Using it we only need to allocate an int32 slice and not a slice of +// string values. +// It only works for optional dictionary encoded columns. All of our label +// columns are that though. +type symbolTable struct { + dict parquet.Dictionary + syms []int32 +} + +func (s *symbolTable) Get(i int) parquet.Value { + switch s.syms[i] { + case -1: + return parquet.NullValue() + default: + return s.dict.Index(s.syms[i]) + } +} + +func (s *symbolTable) GetIndex(i int) int32 { + return s.syms[i] +} + +func (s *symbolTable) Reset(pg parquet.Page) { + dict := pg.Dictionary() + data := pg.Data() + syms := data.Int32() + defs := pg.DefinitionLevels() + + if s.syms == nil { + s.syms = make([]int32, len(defs)) + } else { + s.syms = slices.Grow(s.syms, len(defs))[:len(defs)] + } + + sidx := 0 + for i := range defs { + if defs[i] == 1 { + s.syms[i] = syms[sidx] + sidx++ + } else { + s.syms[i] = -1 + } + } + s.dict = dict +} + +type equalConstraint struct { + pth string + + val parquet.Value + f *storage.ParquetFile + + comp func(l, r parquet.Value) int +} + +func (ec *equalConstraint) String() string { + return fmt.Sprintf("equal(%q,%q)", ec.pth, ec.val) +} + +func Equal(path string, value parquet.Value) Constraint { + return &equalConstraint{pth: path, val: value} +} + +func (ec *equalConstraint) filter(ctx context.Context, rg parquet.RowGroup, primary bool, rr []RowRange) ([]RowRange, error) { + if len(rr) == 0 { + return nil, nil + } + from, to := rr[0].from, rr[len(rr)-1].from+rr[len(rr)-1].count + + col, ok := rg.Schema().Lookup(ec.path()) + if !ok { + // If match empty, return rr (filter nothing) + // otherwise return empty + if ec.matches(parquet.ValueOf("")) { + return rr, nil + } + return []RowRange{}, nil + } + cc := rg.ColumnChunks()[col.ColumnIndex] + + if skip, err := ec.skipByBloomfilter(cc); err != nil { + return nil, fmt.Errorf("unable to skip by bloomfilter: %w", err) + } else if skip { + return nil, nil + } + + pgs := ec.f.GetPages(ctx, cc) + defer func() { _ = pgs.Close() }() + + oidx, err := cc.OffsetIndex() + if err != nil { + return nil, fmt.Errorf("unable to read offset index: %w", err) + } + cidx, err := cc.ColumnIndex() + if err != nil { + return nil, fmt.Errorf("unable to read column index: %w", err) + } + var ( + symbols = new(symbolTable) + res = make([]RowRange, 0) + ) + for i := 0; i < cidx.NumPages(); i++ { + // If page does not intersect from, to; we can immediately discard it + pfrom := oidx.FirstRowIndex(i) + pcount := rg.NumRows() - pfrom + if i < oidx.NumPages()-1 { + pcount = oidx.FirstRowIndex(i+1) - pfrom + } + pto := pfrom + pcount + if pfrom > to { + break + } + if pto < from { + continue + } + // Page intersects [from, to] but we might be able to discard it with statistics + if cidx.NullPage(i) { + continue + } + minv, maxv := cidx.MinValue(i), cidx.MaxValue(i) + + if !ec.val.IsNull() && !maxv.IsNull() && ec.comp(ec.val, maxv) > 0 { + if cidx.IsDescending() { + break + } + continue + } + if !ec.val.IsNull() && !minv.IsNull() && ec.comp(ec.val, minv) < 0 { + if cidx.IsAscending() { + break + } + continue + } + // We cannot discard the page through statistics but we might need to read it to see if it has the value + if err := pgs.SeekToRow(pfrom); err != nil { + return nil, fmt.Errorf("unable to seek to row: %w", err) + } + pg, err := pgs.ReadPage() + if err != nil { + return nil, fmt.Errorf("unable to read page: %w", err) + } + + symbols.Reset(pg) + + // The page has the value, we need to find the matching row ranges + n := int(pg.NumRows()) + bl := int(max(pfrom, from) - pfrom) + br := n - int(pto-min(pto, to)) + var l, r int + switch { + case cidx.IsAscending() && primary: + l = sort.Search(n, func(i int) bool { return ec.comp(ec.val, symbols.Get(i)) <= 0 }) + r = sort.Search(n, func(i int) bool { return ec.comp(ec.val, symbols.Get(i)) < 0 }) + + if lv, rv := max(bl, l), min(br, r); rv > lv { + res = append(res, RowRange{pfrom + int64(lv), int64(rv - lv)}) + } + default: + off, count := bl, 0 + for j := bl; j < br; j++ { + if ec.comp(ec.val, symbols.Get(j)) != 0 { + if count != 0 { + res = append(res, RowRange{pfrom + int64(off), int64(count)}) + } + off, count = j, 0 + } else { + if count == 0 { + off = j + } + count++ + } + } + if count != 0 { + res = append(res, RowRange{pfrom + int64(off), int64(count)}) + } + } + } + if len(res) == 0 { + return nil, nil + } + return intersectRowRanges(simplify(res), rr), nil +} + +func (ec *equalConstraint) init(f *storage.ParquetFile) error { + c, ok := f.Schema().Lookup(ec.path()) + ec.f = f + if !ok { + return nil + } + if c.Node.Type().Kind() != ec.val.Kind() { + return fmt.Errorf("schema: cannot search value of kind %s in column of kind %s", ec.val.Kind(), c.Node.Type().Kind()) + } + ec.comp = c.Node.Type().Compare + return nil +} + +func (ec *equalConstraint) path() string { + return ec.pth +} + +func (ec *equalConstraint) matches(v parquet.Value) bool { + return bytes.Equal(v.ByteArray(), ec.val.ByteArray()) +} + +func (ec *equalConstraint) skipByBloomfilter(cc parquet.ColumnChunk) (bool, error) { + bf := cc.BloomFilter() + if bf == nil { + return false, nil + } + ok, err := bf.Check(ec.val) + if err != nil { + return false, fmt.Errorf("unable to check bloomfilter: %w", err) + } + return !ok, nil +} + +func Regex(path string, r *labels.FastRegexMatcher) Constraint { + return ®exConstraint{pth: path, cache: make(map[parquet.Value]bool), r: r} +} + +type regexConstraint struct { + pth string + cache map[parquet.Value]bool + f *storage.ParquetFile + r *labels.FastRegexMatcher +} + +func (rc *regexConstraint) String() string { + return fmt.Sprintf("regex(%v,%v)", rc.pth, rc.r.GetRegexString()) +} + +func (rc *regexConstraint) filter(ctx context.Context, rg parquet.RowGroup, primary bool, rr []RowRange) ([]RowRange, error) { + if len(rr) == 0 { + return nil, nil + } + from, to := rr[0].from, rr[len(rr)-1].from+rr[len(rr)-1].count + + col, ok := rg.Schema().Lookup(rc.path()) + if !ok { + // If match empty, return rr (filter nothing) + // otherwise return empty + if rc.matches(parquet.ValueOf("")) { + return rr, nil + } + return []RowRange{}, nil + } + cc := rg.ColumnChunks()[col.ColumnIndex] + + pgs := rc.f.GetPages(ctx, cc) + defer func() { _ = pgs.Close() }() + + oidx, err := cc.OffsetIndex() + if err != nil { + return nil, fmt.Errorf("unable to read offset index: %w", err) + } + cidx, err := cc.ColumnIndex() + if err != nil { + return nil, fmt.Errorf("unable to read column index: %w", err) + } + var ( + symbols = new(symbolTable) + res = make([]RowRange, 0) + ) + for i := 0; i < cidx.NumPages(); i++ { + // If page does not intersect from, to; we can immediately discard it + pfrom := oidx.FirstRowIndex(i) + pcount := rg.NumRows() - pfrom + if i < oidx.NumPages()-1 { + pcount = oidx.FirstRowIndex(i+1) - pfrom + } + pto := pfrom + pcount + if pfrom > to { + break + } + if pto < from { + continue + } + // Page intersects [from, to] but we might be able to discard it with statistics + if cidx.NullPage(i) { + continue + } + // TODO: use setmatches / prefix for statistics + + // We cannot discard the page through statistics but we might need to read it to see if it has the value + if err := pgs.SeekToRow(pfrom); err != nil { + return nil, fmt.Errorf("unable to seek to row: %w", err) + } + pg, err := pgs.ReadPage() + if err != nil { + return nil, fmt.Errorf("unable to read page: %w", err) + } + + symbols.Reset(pg) + + // The page has the value, we need to find the matching row ranges + n := int(pg.NumRows()) + bl := int(max(pfrom, from) - pfrom) + br := n - int(pto-min(pto, to)) + off, count := bl, 0 + for j := bl; j < br; j++ { + if !rc.matches(symbols.Get(j)) { + if count != 0 { + res = append(res, RowRange{pfrom + int64(off), int64(count)}) + } + off, count = j, 0 + } else { + if count == 0 { + off = j + } + count++ + } + } + if count != 0 { + res = append(res, RowRange{pfrom + int64(off), int64(count)}) + } + } + if len(res) == 0 { + return nil, nil + } + return intersectRowRanges(simplify(res), rr), nil +} + +func (rc *regexConstraint) init(f *storage.ParquetFile) error { + c, ok := f.Schema().Lookup(rc.path()) + rc.f = f + if !ok { + return nil + } + if stringKind := parquet.String().Type().Kind(); c.Node.Type().Kind() != stringKind { + return fmt.Errorf("schema: cannot search value of kind %s in column of kind %s", stringKind, c.Node.Type().Kind()) + } + rc.cache = make(map[parquet.Value]bool) + return nil +} + +func (rc *regexConstraint) path() string { + return rc.pth +} + +func (rc *regexConstraint) matches(v parquet.Value) bool { + accept, seen := rc.cache[v] + if !seen { + accept = rc.r.MatchString(util.YoloString(v.ByteArray())) + rc.cache[v] = accept + } + return accept +} + +func Not(c Constraint) Constraint { + return ¬Constraint{c: c} +} + +type notConstraint struct { + c Constraint +} + +func (nc *notConstraint) String() string { + return fmt.Sprintf("not(%v)", nc.c.String()) +} + +func (nc *notConstraint) filter(ctx context.Context, rg parquet.RowGroup, primary bool, rr []RowRange) ([]RowRange, error) { + base, err := nc.c.filter(ctx, rg, primary, rr) + if err != nil { + return nil, fmt.Errorf("unable to compute child constraint: %w", err) + } + // no need to intersect since its already subset of rr + return complementRowRanges(base, rr), nil +} + +func (nc *notConstraint) init(f *storage.ParquetFile) error { + return nc.c.init(f) +} + +func (nc *notConstraint) path() string { + return nc.c.path() +} + +type nullConstraint struct { + pth string +} + +func (null *nullConstraint) String() string { + return fmt.Sprintf("null(%q)", null.pth) +} + +func Null(path string) Constraint { + return &nullConstraint{pth: path} +} + +func (null *nullConstraint) filter(ctx context.Context, rg parquet.RowGroup, _ bool, rr []RowRange) ([]RowRange, error) { + if len(rr) == 0 { + return nil, nil + } + from, to := rr[0].from, rr[len(rr)-1].from+rr[len(rr)-1].count + + col, ok := rg.Schema().Lookup(null.path()) + if !ok { + // filter nothing + return rr, nil + } + cc := rg.ColumnChunks()[col.ColumnIndex] + + pgs := cc.Pages() + defer func() { _ = pgs.Close() }() + + oidx, err := cc.OffsetIndex() + if err != nil { + return nil, fmt.Errorf("unable to read offset index: %w", err) + } + cidx, err := cc.ColumnIndex() + if err != nil { + return nil, fmt.Errorf("unable to read column index: %w", err) + } + res := make([]RowRange, 0) + for i := 0; i < cidx.NumPages(); i++ { + // If page does not intersect from, to; we can immediately discard it + pfrom := oidx.FirstRowIndex(i) + pcount := rg.NumRows() - pfrom + if i < oidx.NumPages()-1 { + pcount = oidx.FirstRowIndex(i+1) - pfrom + } + pto := pfrom + pcount + if pfrom > to { + break + } + if pto < from { + continue + } + + if cidx.NullPage(i) { + res = append(res, RowRange{from: pfrom, count: pcount}) + continue + } + + if cidx.NullCount(i) == 0 { + continue + } + + // We cannot discard the page through statistics but we might need to read it to see if it has the value + if err := pgs.SeekToRow(pfrom); err != nil { + return nil, fmt.Errorf("unable to seek to row: %w", err) + } + pg, err := pgs.ReadPage() + if err != nil { + return nil, fmt.Errorf("unable to read page: %w", err) + } + // The page has null value, we need to find the matching row ranges + bl := int(max(pfrom, from) - pfrom) + off, count := bl, 0 + for j, def := range pg.DefinitionLevels() { + if def != 1 { + if count == 0 { + off = j + } + count++ + } else { + if count != 0 { + res = append(res, RowRange{pfrom + int64(off), int64(count)}) + } + off, count = j, 0 + } + } + + if count != 0 { + res = append(res, RowRange{pfrom + int64(off), int64(count)}) + } + } + if len(res) == 0 { + return nil, nil + } + return intersectRowRanges(simplify(res), rr), nil +} + +func (null *nullConstraint) init(*storage.ParquetFile) error { + return nil +} + +func (null *nullConstraint) path() string { + return null.pth +} diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go new file mode 100644 index 0000000000..ffa3f86298 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -0,0 +1,511 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package search + +import ( + "context" + "fmt" + "io" + "runtime" + "slices" + "sort" + + "github.com/efficientgo/core/errors" + "github.com/parquet-go/parquet-go" + "github.com/prometheus/prometheus/model/labels" + prom_storage "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunks" + "golang.org/x/sync/errgroup" + + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus-community/parquet-common/util" +) + +type Materializer struct { + b *storage.ParquetShard + s *schema.TSDBSchema + d *schema.PrometheusParquetChunksDecoder + + colIdx int + concurrency int + + dataColToIndex []int +} + +func NewMaterializer(s *schema.TSDBSchema, d *schema.PrometheusParquetChunksDecoder, block *storage.ParquetShard) (*Materializer, error) { + colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexes) + if !ok { + return nil, errors.New(fmt.Sprintf("schema index %s not found", schema.ColIndexes)) + } + + dataColToIndex := make([]int, len(block.ChunksFile().Schema().Columns())) + for i := 0; i < len(s.DataColsIndexes); i++ { + c, ok := block.ChunksFile().Schema().Lookup(schema.DataColumn(i)) + if !ok { + return nil, errors.New(fmt.Sprintf("schema column %s not found", schema.DataColumn(i))) + } + + dataColToIndex[i] = c.ColumnIndex + } + + return &Materializer{ + s: s, + d: d, + b: block, + colIdx: colIdx.ColumnIndex, + concurrency: runtime.GOMAXPROCS(0), + dataColToIndex: dataColToIndex, + }, nil +} + +// Materialize reconstructs the ChunkSeries that belong to the specified row ranges (rr). +// It uses the row group index (rgi) and time bounds (mint, maxt) to filter and decode the series. +func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) ([]prom_storage.ChunkSeries, error) { + sLbls, err := m.materializeAllLabels(ctx, rgi, rr) + if err != nil { + return nil, errors.Wrapf(err, "error materializing labels") + } + + results := make([]prom_storage.ChunkSeries, len(sLbls)) + for i, s := range sLbls { + sort.Sort(s) + results[i] = &concreteChunksSeries{ + lbls: s, + } + } + + if !skipChunks { + chks, err := m.materializeChunks(ctx, rgi, mint, maxt, rr) + if err != nil { + return nil, errors.Wrap(err, "materializer failed to materialize chunks") + } + + for i, result := range results { + result.(*concreteChunksSeries).chks = chks[i] + } + + // If we are not skipping chunks and there is no chunks for the time range queried, lets remove the series + results = slices.DeleteFunc(results, func(cs prom_storage.ChunkSeries) bool { + return len(cs.(*concreteChunksSeries).chks) == 0 + }) + } + return results, err +} + +func (m *Materializer) MaterializeAllLabelNames() []string { + r := make([]string, 0, len(m.b.LabelsFile().Schema().Columns())) + for _, c := range m.b.LabelsFile().Schema().Columns() { + lbl, ok := schema.ExtractLabelFromColumn(c[0]) + if !ok { + continue + } + + r = append(r, lbl) + } + return r +} + +func (m *Materializer) MaterializeLabelNames(ctx context.Context, rgi int, rr []RowRange) ([]string, error) { + labelsRg := m.b.LabelsFile().RowGroups()[rgi] + cc := labelsRg.ColumnChunks()[m.colIdx] + colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), labelsRg, cc, rr) + if err != nil { + return nil, errors.Wrap(err, "materializer failed to materialize columns") + } + + seen := make(map[string]struct{}) + colsMap := make(map[string]struct{}, 10) + for _, colsIdx := range colsIdxs { + key := util.YoloString(colsIdx.ByteArray()) + if _, ok := seen[key]; !ok { + idxs, err := schema.DecodeUintSlice(colsIdx.ByteArray()) + if err != nil { + return nil, errors.Wrap(err, "failed to decode column index") + } + for _, idx := range idxs { + if _, ok := colsMap[m.b.LabelsFile().Schema().Columns()[idx][0]]; !ok { + colsMap[m.b.LabelsFile().Schema().Columns()[idx][0]] = struct{}{} + } + } + } + } + lbls := make([]string, 0, len(colsMap)) + for col := range colsMap { + l, ok := schema.ExtractLabelFromColumn(col) + if !ok { + return nil, errors.New(fmt.Sprintf("error extracting label name from col %v", col)) + } + lbls = append(lbls, l) + } + return lbls, nil +} + +func (m *Materializer) MaterializeLabelValues(ctx context.Context, name string, rgi int, rr []RowRange) ([]string, error) { + labelsRg := m.b.LabelsFile().RowGroups()[rgi] + cIdx, ok := m.b.LabelsFile().Schema().Lookup(schema.LabelToColumn(name)) + if !ok { + return []string{}, nil + } + cc := labelsRg.ColumnChunks()[cIdx.ColumnIndex] + values, err := m.materializeColumn(ctx, m.b.LabelsFile(), labelsRg, cc, rr) + if err != nil { + return nil, errors.Wrap(err, "materializer failed to materialize columns") + } + + r := make([]string, 0, len(values)) + vMap := make(map[string]struct{}, 10) + for _, v := range values { + strValue := util.YoloString(v.ByteArray()) + if _, ok := vMap[strValue]; !ok { + r = append(r, strValue) + vMap[strValue] = struct{}{} + } + } + return r, nil +} + +func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name string, rgi int) ([]string, error) { + labelsRg := m.b.LabelsFile().RowGroups()[rgi] + cIdx, ok := m.b.LabelsFile().Schema().Lookup(schema.LabelToColumn(name)) + if !ok { + return []string{}, nil + } + cc := labelsRg.ColumnChunks()[cIdx.ColumnIndex] + pages := m.b.LabelsFile().GetPages(ctx, cc) + p, err := pages.ReadPage() + if err != nil { + return []string{}, errors.Wrap(err, "failed to read page") + } + defer parquet.Release(p) + + r := make([]string, 0, p.Dictionary().Len()) + for i := 0; i < p.Dictionary().Len(); i++ { + r = append(r, p.Dictionary().Index(int32(i)).String()) + } + return r, nil +} + +func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([]labels.Labels, error) { + labelsRg := m.b.LabelsFile().RowGroups()[rgi] + cc := labelsRg.ColumnChunks()[m.colIdx] + colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), labelsRg, cc, rr) + if err != nil { + return nil, errors.Wrap(err, "materializer failed to materialize columns") + } + + colsMap := make(map[int]*[]parquet.Value, 10) + results := make([]labels.Labels, len(colsIdxs)) + + for _, colsIdx := range colsIdxs { + idxs, err := schema.DecodeUintSlice(colsIdx.ByteArray()) + if err != nil { + return nil, errors.Wrap(err, "materializer failed to decode column index") + } + for _, idx := range idxs { + v := make([]parquet.Value, 0, len(colsIdxs)) + colsMap[idx] = &v + } + } + + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(m.concurrency) + + for cIdx, v := range colsMap { + errGroup.Go(func() error { + cc := labelsRg.ColumnChunks()[cIdx] + values, err := m.materializeColumn(ctx, m.b.LabelsFile(), labelsRg, cc, rr) + if err != nil { + return errors.Wrap(err, "failed to materialize labels values") + } + *v = append(*v, values...) + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + for cIdx, values := range colsMap { + labelName, ok := schema.ExtractLabelFromColumn(m.b.LabelsFile().Schema().Columns()[cIdx][0]) + if !ok { + return nil, fmt.Errorf("column %d not found in schema", cIdx) + } + for i, value := range *values { + if value.IsNull() { + continue + } + results[i] = append(results[i], labels.Label{ + Name: labelName, + Value: util.YoloString(value.ByteArray()), + }) + } + } + + return results, nil +} + +func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, maxt int64, rr []RowRange) ([][]chunks.Meta, error) { + minDataCol := m.s.DataColumIdx(mint) + maxDataCol := m.s.DataColumIdx(maxt) + rg := m.b.ChunksFile().RowGroups()[rgi] + totalRows := int64(0) + for _, r := range rr { + totalRows += r.count + } + r := make([][]chunks.Meta, totalRows) + + for i := minDataCol; i <= min(maxDataCol, len(m.dataColToIndex)-1); i++ { + values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rg, rg.ColumnChunks()[m.dataColToIndex[i]], rr) + if err != nil { + return r, err + } + + for vi, value := range values { + chks, err := m.d.Decode(value.ByteArray(), mint, maxt) + if err != nil { + return r, errors.Wrap(err, "failed to decode chunks") + } + r[vi] = append(r[vi], chks...) + } + } + + return r, nil +} + +func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, group parquet.RowGroup, cc parquet.ColumnChunk, rr []RowRange) ([]parquet.Value, error) { + if len(rr) == 0 { + return nil, nil + } + + oidx, err := cc.OffsetIndex() + if err != nil { + return nil, errors.Wrap(err, "could not get offset index") + } + + cidx, err := cc.ColumnIndex() + if err != nil { + return nil, errors.Wrap(err, "could not get column index") + } + + pagesToRowsMap := make(map[int][]RowRange, len(rr)) + + for i := 0; i < cidx.NumPages(); i++ { + pageRowRange := RowRange{ + from: oidx.FirstRowIndex(i), + } + pageRowRange.count = group.NumRows() + + if i < oidx.NumPages()-1 { + pageRowRange.count = oidx.FirstRowIndex(i+1) - pageRowRange.from + } + + for _, r := range rr { + if pageRowRange.Overlaps(r) { + pagesToRowsMap[i] = append(pagesToRowsMap[i], r) + } + } + } + + r := make(map[RowRange][]parquet.Value, len(rr)) + for _, v := range rr { + r[v] = []parquet.Value{} + } + + errGroup := &errgroup.Group{} + errGroup.SetLimit(m.concurrency) + + for _, p := range coalescePageRanges(pagesToRowsMap, oidx) { + errGroup.Go(func() error { + pgs := file.GetPages(ctx, cc) + defer func() { _ = pgs.Close() }() + err := pgs.SeekToRow(p.rows[0].from) + if err != nil { + return errors.Wrap(err, "could not seek to row") + } + + vi := new(valuesIterator) + remainingRr := p.rows + currentRr := remainingRr[0] + next := currentRr.from + remaining := currentRr.count + currentRow := currentRr.from + + remainingRr = remainingRr[1:] + for len(remainingRr) > 0 || remaining > 0 { + page, err := pgs.ReadPage() + if err != nil { + return errors.Wrap(err, "could not read page") + } + vi.Reset(page) + for vi.Next() { + if currentRow == next { + r[currentRr] = append(r[currentRr], vi.At()) + remaining-- + if remaining > 0 { + next = next + 1 + } else if len(remainingRr) > 0 { + currentRr = remainingRr[0] + next = currentRr.from + remaining = currentRr.count + remainingRr = remainingRr[1:] + } + } + currentRow++ + } + parquet.Release(page) + + if vi.Error() != nil { + return vi.Error() + } + } + return nil + }) + } + err = errGroup.Wait() + if err != nil { + return nil, errors.Wrap(err, "failed to materialize columns") + } + + values := make([]parquet.Value, 0, len(rr)) + for _, v := range rr { + values = append(values, r[v]...) + } + return values, err +} + +type pageEntryRead struct { + pages []int + rows []RowRange +} + +// Merge nearby pages to enable efficient sequential reads. +// Pages that are not close to each other will be scheduled for concurrent reads. +func coalescePageRanges(pagedIdx map[int][]RowRange, offset parquet.OffsetIndex) []pageEntryRead { + // TODO: Add the max gap size as parameter + partitioner := util.NewGapBasedPartitioner(10 * 1024) + if len(pagedIdx) == 0 { + return []pageEntryRead{} + } + idxs := make([]int, 0, len(pagedIdx)) + for idx := range pagedIdx { + idxs = append(idxs, idx) + } + + slices.Sort(idxs) + + parts := partitioner.Partition(len(idxs), func(i int) (uint64, uint64) { + return uint64(offset.Offset(idxs[i])), uint64(offset.Offset(idxs[i]) + offset.CompressedPageSize(idxs[i])) + }) + + r := make([]pageEntryRead, 0, len(parts)) + for _, part := range parts { + pagesToRead := pageEntryRead{} + for i := part.ElemRng[0]; i < part.ElemRng[1]; i++ { + pagesToRead.pages = append(pagesToRead.pages, idxs[i]) + pagesToRead.rows = append(pagesToRead.rows, pagedIdx[idxs[i]]...) + } + pagesToRead.rows = simplify(pagesToRead.rows) + r = append(r, pagesToRead) + } + + return r +} + +type valuesIterator struct { + p parquet.Page + + // TODO: consider using unique.Handle + cachedSymbols map[int32]parquet.Value + st symbolTable + + vr parquet.ValueReader + + current int + buffer []parquet.Value + currentBufferIndex int + err error +} + +func (vi *valuesIterator) Reset(p parquet.Page) { + vi.p = p + vi.vr = nil + if p.Dictionary() != nil { + vi.st.Reset(p) + vi.cachedSymbols = make(map[int32]parquet.Value, p.Dictionary().Len()) + } else { + vi.vr = p.Values() + vi.buffer = make([]parquet.Value, 0, 128) + vi.currentBufferIndex = -1 + } + vi.current = -1 +} + +func (vi *valuesIterator) Next() bool { + if vi.err != nil { + return false + } + + vi.current++ + if vi.current >= int(vi.p.NumRows()) { + return false + } + + vi.currentBufferIndex++ + + if vi.currentBufferIndex == len(vi.buffer) { + n, err := vi.vr.ReadValues(vi.buffer[:cap(vi.buffer)]) + if err != nil && err != io.EOF { + vi.err = err + } + vi.buffer = vi.buffer[:n] + vi.currentBufferIndex = 0 + } + + return true +} + +func (vi *valuesIterator) Error() error { + return vi.err +} + +func (vi *valuesIterator) At() parquet.Value { + if vi.vr == nil { + dicIndex := vi.st.GetIndex(vi.current) + // Cache a clone of the current symbol table entry. + // This allows us to release the original page while avoiding unnecessary future clones. + if _, ok := vi.cachedSymbols[dicIndex]; !ok { + vi.cachedSymbols[dicIndex] = vi.st.Get(vi.current).Clone() + } + return vi.cachedSymbols[dicIndex] + } + + return vi.buffer[vi.currentBufferIndex].Clone() +} + +var _ prom_storage.ChunkSeries = &concreteChunksSeries{} + +type concreteChunksSeries struct { + lbls labels.Labels + chks []chunks.Meta +} + +func (c concreteChunksSeries) Labels() labels.Labels { + return c.lbls +} + +func (c concreteChunksSeries) Iterator(_ chunks.Iterator) chunks.Iterator { + return prom_storage.NewListChunkSeriesIterator(c.chks...) +} diff --git a/vendor/github.com/prometheus-community/parquet-common/search/parquet_queriable.go b/vendor/github.com/prometheus-community/parquet-common/search/parquet_queriable.go new file mode 100644 index 0000000000..cf609b3116 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/search/parquet_queriable.go @@ -0,0 +1,281 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package search + +import ( + "context" + "sort" + + "github.com/prometheus/prometheus/model/labels" + prom_storage "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" + + "github.com/prometheus-community/parquet-common/convert" + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus-community/parquet-common/util" +) + +type ShardsFinderFunction func(ctx context.Context, mint, maxt int64) ([]*storage.ParquetShard, error) + +type parquetQueryable struct { + shardsFinder ShardsFinderFunction + d *schema.PrometheusParquetChunksDecoder +} + +func NewParquetQueryable(d *schema.PrometheusParquetChunksDecoder, shardFinder ShardsFinderFunction) (prom_storage.Queryable, error) { + return &parquetQueryable{ + shardsFinder: shardFinder, + d: d, + }, nil +} + +func (p parquetQueryable) Querier(mint, maxt int64) (prom_storage.Querier, error) { + return &parquetQuerier{ + mint: mint, + maxt: maxt, + shardsFinder: p.shardsFinder, + d: p.d, + }, nil +} + +type parquetQuerier struct { + mint, maxt int64 + shardsFinder ShardsFinderFunction + d *schema.PrometheusParquetChunksDecoder +} + +func (p parquetQuerier) LabelValues(ctx context.Context, name string, hints *prom_storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + shards, err := p.queryableShards(ctx, p.mint, p.maxt) + if err != nil { + return nil, nil, err + } + + limit := int64(0) + + if hints != nil { + limit = int64(hints.Limit) + } + + resNameValues := [][]string{} + + for _, s := range shards { + r, err := s.LabelValues(ctx, name, matchers) + if err != nil { + return nil, nil, err + } + + resNameValues = append(resNameValues, r...) + } + + return util.MergeUnsortedSlices(int(limit), resNameValues...), nil, nil +} + +func (p parquetQuerier) LabelNames(ctx context.Context, hints *prom_storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + shards, err := p.queryableShards(ctx, p.mint, p.maxt) + if err != nil { + return nil, nil, err + } + + limit := int64(0) + + if hints != nil { + limit = int64(hints.Limit) + } + + resNameSets := [][]string{} + + for _, s := range shards { + r, err := s.LabelNames(ctx, matchers) + if err != nil { + return nil, nil, err + } + + resNameSets = append(resNameSets, r...) + } + + return util.MergeUnsortedSlices(int(limit), resNameSets...), nil, nil +} + +func (p parquetQuerier) Close() error { + return nil +} + +func (p parquetQuerier) Select(ctx context.Context, sorted bool, sp *prom_storage.SelectHints, matchers ...*labels.Matcher) prom_storage.SeriesSet { + shards, err := p.queryableShards(ctx, p.mint, p.maxt) + if err != nil { + return prom_storage.ErrSeriesSet(err) + } + seriesSet := make([]prom_storage.ChunkSeriesSet, len(shards)) + + minT, maxT := p.mint, p.maxt + if sp != nil { + minT, maxT = sp.Start, sp.End + } + skipChunks := sp != nil && sp.Func == "series" + + for i, shard := range shards { + ss, err := shard.Query(ctx, sorted, minT, maxT, skipChunks, matchers) + if err != nil { + return prom_storage.ErrSeriesSet(err) + } + seriesSet[i] = ss + } + ss := convert.NewMergeChunkSeriesSet(seriesSet, labels.Compare, prom_storage.NewConcatenatingChunkSeriesMerger()) + + return convert.NewSeriesSetFromChunkSeriesSet(ss, skipChunks) +} + +func (p parquetQuerier) queryableShards(ctx context.Context, mint, maxt int64) ([]*queryableShard, error) { + shards, err := p.shardsFinder(ctx, mint, maxt) + if err != nil { + return nil, err + } + qBlocks := make([]*queryableShard, len(shards)) + for i, shard := range shards { + qb, err := newQueryableShard(shard, p.d) + if err != nil { + return nil, err + } + qBlocks[i] = qb + } + return qBlocks, nil +} + +type queryableShard struct { + shard *storage.ParquetShard + m *Materializer +} + +func newQueryableShard(block *storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder) (*queryableShard, error) { + s, err := block.TSDBSchema() + if err != nil { + return nil, err + } + m, err := NewMaterializer(s, d, block) + if err != nil { + return nil, err + } + + return &queryableShard{ + shard: block, + m: m, + }, nil +} + +func (b queryableShard) Query(ctx context.Context, sorted bool, mint, maxt int64, skipChunks bool, matchers []*labels.Matcher) (prom_storage.ChunkSeriesSet, error) { + cs, err := MatchersToConstraint(matchers...) + if err != nil { + return nil, err + } + err = Initialize(b.shard.LabelsFile(), cs...) + if err != nil { + return nil, err + } + + results := make([]prom_storage.ChunkSeries, 0, 1024) + for i, group := range b.shard.LabelsFile().RowGroups() { + rr, err := Filter(ctx, group, cs...) + if err != nil { + return nil, err + } + series, err := b.m.Materialize(ctx, i, mint, maxt, skipChunks, rr) + if err != nil { + return nil, err + } + results = append(results, series...) + } + + if sorted { + sort.Sort(byLabels(results)) + } + return convert.NewChunksSeriesSet(results), nil +} + +func (b queryableShard) LabelNames(ctx context.Context, matchers []*labels.Matcher) ([][]string, error) { + if len(matchers) == 0 { + return [][]string{b.m.MaterializeAllLabelNames()}, nil + } + cs, err := MatchersToConstraint(matchers...) + if err != nil { + return nil, err + } + err = Initialize(b.shard.LabelsFile(), cs...) + if err != nil { + return nil, err + } + + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + for i, group := range b.shard.LabelsFile().RowGroups() { + rr, err := Filter(ctx, group, cs...) + if err != nil { + return nil, err + } + series, err := b.m.MaterializeLabelNames(ctx, i, rr) + if err != nil { + return nil, err + } + results[i] = series + } + + return results, nil +} + +func (b queryableShard) LabelValues(ctx context.Context, name string, matchers []*labels.Matcher) ([][]string, error) { + if len(matchers) == 0 { + return b.allLabelValues(ctx, name) + } + cs, err := MatchersToConstraint(matchers...) + if err != nil { + return nil, err + } + err = Initialize(b.shard.LabelsFile(), cs...) + if err != nil { + return nil, err + } + + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + for i, group := range b.shard.LabelsFile().RowGroups() { + rr, err := Filter(ctx, group, cs...) + if err != nil { + return nil, err + } + series, err := b.m.MaterializeLabelValues(ctx, name, i, rr) + if err != nil { + return nil, err + } + results[i] = series + } + + return results, nil +} + +func (b queryableShard) allLabelValues(ctx context.Context, name string) ([][]string, error) { + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + for i := range b.shard.LabelsFile().RowGroups() { + series, err := b.m.MaterializeAllLabelValues(ctx, name, i) + if err != nil { + return nil, err + } + results[i] = series + } + + return results, nil +} + +type byLabels []prom_storage.ChunkSeries + +func (b byLabels) Len() int { return len(b) } +func (b byLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b byLabels) Less(i, j int) bool { return labels.Compare(b[i].Labels(), b[j].Labels()) < 0 } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go b/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go new file mode 100644 index 0000000000..10cf136136 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go @@ -0,0 +1,166 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package search + +import ( + "sort" +) + +type RowRange struct { + from int64 + count int64 +} + +func NewRowRange(from, count int64) *RowRange { + return &RowRange{ + from: from, + count: count, + } +} + +// Overlaps returns true if the receiver and the given RowRange share any overlapping rows. +// Both ranges are treated as half-open intervals: [from, from+count). +func (rr RowRange) Overlaps(o RowRange) bool { + endA := rr.from + rr.count + endB := o.from + o.count + return rr.from < endB && o.from < endA +} + +// intersect intersects the row ranges from left hand sight with the row ranges from rhs +// it assumes that lhs and rhs are simplified and returns a simplified result. +// it operates in o(l+r) time by cursoring through ranges with a two pointer approach. +func intersectRowRanges(lhs, rhs []RowRange) []RowRange { + res := make([]RowRange, 0) + for l, r := 0, 0; l < len(lhs) && r < len(rhs); { + al, bl := lhs[l].from, lhs[l].from+lhs[l].count + ar, br := rhs[r].from, rhs[r].from+rhs[r].count + + // check if rows intersect + if al <= br && ar <= bl { + os, oe := max(al, ar), min(bl, br) + res = append(res, RowRange{from: os, count: oe - os}) + } + + // advance the cursor of the range that ends first + if bl <= br { + l++ + } else { + r++ + } + } + return simplify(res) +} + +// complementRowRanges returns the ranges that are in rhs but not in lhs. +// For example, if you have: +// lhs: [{from: 1, count: 3}] // represents rows 1,2,3 +// rhs: [{from: 0, count: 5}] // represents rows 0,1,2,3,4 +// The complement would be [{from: 0, count: 1}, {from: 4, count: 1}] // represents rows 0,4 +// because these are the rows in rhs that are not in lhs. +// +// The function assumes that lhs and rhs are simplified (no overlapping ranges) +// and returns a simplified result. It operates in O(l+r) time by using a two-pointer approach +// to efficiently process both ranges. +func complementRowRanges(lhs, rhs []RowRange) []RowRange { + res := make([]RowRange, 0) + + l, r := 0, 0 + for l < len(lhs) && r < len(rhs) { + al, bl := lhs[l].from, lhs[l].from+lhs[l].count + ar, br := rhs[r].from, rhs[r].from+rhs[r].count + + // check if rows intersect + switch { + case al > br || ar > bl: + // no intersection, advance cursor that ends first + if bl <= br { + l++ + } else { + res = append(res, RowRange{from: ar, count: br - ar}) + r++ + } + case al < ar && bl > br: + // l contains r, complement of l in r is empty, advance r + r++ + case al < ar && bl <= br: + // l covers r from left but has room on top + oe := min(bl, br) + rhs[r].from += oe - ar + rhs[r].count -= oe - ar + l++ + case al >= ar && bl > br: + // l covers r from right but has room on bottom + os := max(al, ar) + res = append(res, RowRange{from: ar, count: os - ar}) + r++ + case al >= ar && bl <= br: + // l is included r + os, oe := max(al, ar), min(bl, br) + res = append(res, RowRange{from: rhs[r].from, count: os - rhs[r].from}) + rhs[r].from = oe + rhs[r].count = br - oe + l++ + } + } + + for ; r < len(rhs); r++ { + res = append(res, rhs[r]) + } + + return simplify(res) +} + +func simplify(rr []RowRange) []RowRange { + if len(rr) == 0 { + return nil + } + + sort.Slice(rr, func(i, j int) bool { + return rr[i].from < rr[j].from + }) + + tmp := make([]RowRange, 0) + l := rr[0] + for i := 1; i < len(rr); i++ { + r := rr[i] + al, bl := l.from, l.from+l.count + ar, br := r.from, r.from+r.count + if bl < ar { + tmp = append(tmp, l) + l = r + continue + } + + from := min(al, ar) + count := max(bl, br) - from + if count == 0 { + continue + } + + l = RowRange{ + from: from, + count: count, + } + } + + tmp = append(tmp, l) + res := make([]RowRange, 0, len(tmp)) + for i := range tmp { + if tmp[i].count != 0 { + res = append(res, tmp[i]) + } + } + + return res +} diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go b/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go new file mode 100644 index 0000000000..0f022ca800 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go @@ -0,0 +1,61 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "io" + + "github.com/thanos-io/objstore" +) + +type ReadAtWithContext interface { + io.ReaderAt + WithContext(ctx context.Context) io.ReaderAt +} + +type bReadAt struct { + path string + obj objstore.Bucket + ctx context.Context +} + +func NewBucketReadAt(ctx context.Context, path string, obj objstore.Bucket) ReadAtWithContext { + return &bReadAt{ + path: path, + obj: obj, + ctx: ctx, + } +} + +func (b *bReadAt) WithContext(ctx context.Context) io.ReaderAt { + return &bReadAt{ + path: b.path, + obj: b.obj, + ctx: ctx, + } +} + +func (b *bReadAt) ReadAt(p []byte, off int64) (n int, err error) { + rc, err := b.obj.GetRange(b.ctx, b.path, off, int64(len(p))) + if err != nil { + return 0, err + } + defer func() { _ = rc.Close() }() + n, err = rc.Read(p) + if err == io.EOF { + err = nil + } + return +} diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go new file mode 100644 index 0000000000..628ebc7579 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go @@ -0,0 +1,96 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "sync" + + "github.com/parquet-go/parquet-go" + "github.com/thanos-io/objstore" + + "github.com/prometheus-community/parquet-common/schema" +) + +type ParquetFile struct { + *parquet.File + ReadAtWithContext +} + +func (f *ParquetFile) GetPages(ctx context.Context, cc parquet.ColumnChunk) *parquet.FilePages { + colChunk := cc.(*parquet.FileColumnChunk) + pages := colChunk.PagesFrom(f.WithContext(ctx)) + return pages +} + +func OpenFile(r ReadAtWithContext, size int64, options ...parquet.FileOption) (*ParquetFile, error) { + file, err := parquet.OpenFile(r, size, options...) + if err != nil { + return nil, err + } + return &ParquetFile{ + File: file, + ReadAtWithContext: r, + }, nil +} + +type ParquetShard struct { + labelsFile, chunksFile *ParquetFile + schema *schema.TSDBSchema + o sync.Once +} + +// OpenParquetShard opens the sharded parquet block, +// using the options param. +func OpenParquetShard(ctx context.Context, bkt objstore.Bucket, name string, shard int, options ...parquet.FileOption) (*ParquetShard, error) { + labelsFileName := schema.LabelsPfileNameForShard(name, shard) + chunksFileName := schema.ChunksPfileNameForShard(name, shard) + labelsAttr, err := bkt.Attributes(ctx, labelsFileName) + if err != nil { + return nil, err + } + labelsFile, err := OpenFile(NewBucketReadAt(ctx, labelsFileName, bkt), labelsAttr.Size, options...) + if err != nil { + return nil, err + } + + chunksFileAttr, err := bkt.Attributes(ctx, chunksFileName) + if err != nil { + return nil, err + } + chunksFile, err := OpenFile(NewBucketReadAt(ctx, chunksFileName, bkt), chunksFileAttr.Size, options...) + if err != nil { + return nil, err + } + return &ParquetShard{ + labelsFile: labelsFile, + chunksFile: chunksFile, + }, nil +} + +func (b *ParquetShard) LabelsFile() *ParquetFile { + return b.labelsFile +} + +func (b *ParquetShard) ChunksFile() *ParquetFile { + return b.chunksFile +} + +func (b *ParquetShard) TSDBSchema() (*schema.TSDBSchema, error) { + var err error + b.o.Do(func() { + b.schema, err = schema.FromLabelsFile(b.labelsFile.File) + }) + return b.schema, err +} diff --git a/vendor/github.com/prometheus-community/parquet-common/util/bucket_read_at.go b/vendor/github.com/prometheus-community/parquet-common/util/bucket_read_at.go deleted file mode 100644 index d65c46ed50..0000000000 --- a/vendor/github.com/prometheus-community/parquet-common/util/bucket_read_at.go +++ /dev/null @@ -1,35 +0,0 @@ -package util - -import ( - "context" - "io" - - "github.com/thanos-io/objstore" -) - -type bReadAt struct { - path string - obj objstore.Bucket - ctx context.Context -} - -func NewBucketReadAt(ctx context.Context, path string, obj objstore.Bucket) io.ReaderAt { - return &bReadAt{ - path: path, - obj: obj, - ctx: ctx, - } -} - -func (b *bReadAt) ReadAt(p []byte, off int64) (n int, err error) { - rc, err := b.obj.GetRange(b.ctx, b.path, off, int64(len(p))) - if err != nil { - return 0, err - } - defer func() { _ = rc.Close() }() - n, err = rc.Read(p) - if err == io.EOF { - err = nil - } - return -} diff --git a/vendor/github.com/prometheus-community/parquet-common/util/strutil.go b/vendor/github.com/prometheus-community/parquet-common/util/strutil.go new file mode 100644 index 0000000000..f4e777ac83 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/util/strutil.go @@ -0,0 +1,84 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "sort" + "strings" +) + +// MergeSlices merges a set of sorted string slices into a single ones +// while removing all duplicates. +// If limit is set, only the first limit results will be returned. 0 to disable. +func MergeSlices(limit int, a ...[]string) []string { + if len(a) == 0 { + return nil + } + if len(a) == 1 { + return truncateToLimit(limit, a[0]) + } + l := len(a) / 2 + return mergeTwoStringSlices(limit, MergeSlices(limit, a[:l]...), MergeSlices(limit, a[l:]...)) +} + +// MergeUnsortedSlices behaves like StringSlices but input slices are validated +// for sortedness and are sorted if they are not ordered yet. +// If limit is set, only the first limit results will be returned. 0 to disable. +func MergeUnsortedSlices(limit int, a ...[]string) []string { + for _, s := range a { + if !sort.StringsAreSorted(s) { + sort.Strings(s) + } + } + return MergeSlices(limit, a...) +} + +func mergeTwoStringSlices(limit int, a, b []string) []string { + a = truncateToLimit(limit, a) + b = truncateToLimit(limit, b) + + maxl := len(a) + if len(b) > len(a) { + maxl = len(b) + } + + res := make([]string, 0, maxl*10/9) + + for len(a) > 0 && len(b) > 0 { + d := strings.Compare(a[0], b[0]) + + if d == 0 { + res = append(res, a[0]) + a, b = a[1:], b[1:] + } else if d < 0 { + res = append(res, a[0]) + a = a[1:] + } else if d > 0 { + res = append(res, b[0]) + b = b[1:] + } + } + // Append all remaining elements. + res = append(res, a...) + res = append(res, b...) + res = truncateToLimit(limit, res) + return res +} + +func truncateToLimit(limit int, a []string) []string { + if limit > 0 && len(a) > limit { + return a[:limit] + } + return a +} diff --git a/vendor/github.com/prometheus-community/parquet-common/util/util.go b/vendor/github.com/prometheus-community/parquet-common/util/util.go index 16bcdc0fc1..4851a51a7a 100644 --- a/vendor/github.com/prometheus-community/parquet-common/util/util.go +++ b/vendor/github.com/prometheus-community/parquet-common/util/util.go @@ -1,6 +1,27 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package util -import "github.com/parquet-go/parquet-go" +import ( + "unsafe" + + "github.com/parquet-go/parquet-go" +) + +func YoloString(buf []byte) string { + return *((*string)(unsafe.Pointer(&buf))) +} func CloneRows(rows []parquet.Row) []parquet.Row { rr := make([]parquet.Row, len(rows)) @@ -9,3 +30,62 @@ func CloneRows(rows []parquet.Row) []parquet.Row { } return rr } + +// Copied from thanos repository: +// https://github.com/thanos-io/thanos/blob/2a5a856e34adb2653dda700c4d87637236afb2dd/pkg/store/bucket.go#L3466 + +type Part struct { + Start uint64 + End uint64 + + ElemRng [2]int +} + +type Partitioner interface { + // Partition partitions length entries into n <= length ranges that cover all + // input ranges + // It supports overlapping ranges. + // NOTE: It expects range to be sorted by start time. + Partition(length int, rng func(int) (uint64, uint64)) []Part +} + +type gapBasedPartitioner struct { + maxGapSize uint64 +} + +func NewGapBasedPartitioner(maxGapSize uint64) Partitioner { + return gapBasedPartitioner{ + maxGapSize: maxGapSize, + } +} + +// Partition partitions length entries into n <= length ranges that cover all +// input ranges by combining entries that are separated by reasonably small gaps. +// It is used to combine multiple small ranges from object storage into bigger, more efficient/cheaper ones. +func (g gapBasedPartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []Part) { + j := 0 + k := 0 + for k < length { + j = k + k++ + + p := Part{} + p.Start, p.End = rng(j) + + // Keep growing the range until the end or we encounter a large gap. + for ; k < length; k++ { + s, e := rng(k) + + if p.End+g.maxGapSize < s { + break + } + + if p.End <= e { + p.End = e + } + } + p.ElemRng = [2]int{j, k} + parts = append(parts, p) + } + return parts +} diff --git a/vendor/modules.txt b/vendor/modules.txt index de7f7918ba..9a66e84ae6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -837,10 +837,12 @@ github.com/pkg/errors # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250428074311-306c8486441d +# github.com/prometheus-community/parquet-common v0.0.0-20250514003255-382b6ec8ae40 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/schema +github.com/prometheus-community/parquet-common/search +github.com/prometheus-community/parquet-common/storage github.com/prometheus-community/parquet-common/util # github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 ## explicit; go 1.20 From 7844a5bf5d82f7116bfda22e0260323e4a34b3f5 Mon Sep 17 00:00:00 2001 From: alanprot Date: Thu, 15 May 2025 08:53:23 -0700 Subject: [PATCH 2/2] changing modules to create the parquet queryable Signed-off-by: alanprot --- pkg/cortex/modules.go | 13 +++++++++++-- pkg/querier/parquet_queryable.go | 12 ++++++------ pkg/querier/querier.go | 4 ++++ 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 3002e64667..a390b989d6 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -400,11 +400,20 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) { var servs []services.Service //nolint:revive // I prefer this form over removing 'else', because it allows q to have smaller scope. + var queriable prom_storage.Queryable if q, err := initQueryableForEngine(t.Cfg, t.Overrides, prometheus.DefaultRegisterer); err != nil { return nil, fmt.Errorf("failed to initialize querier: %v", err) } else { + queriable = q + if t.Cfg.Querier.QueryParquetFiles { + pq, err := querier.NewParquetQueryable(t.Cfg.Querier, t.Cfg.BlocksStorage, t.Overrides, q, util_log.Logger, prometheus.DefaultRegisterer) + if err != nil { + return nil, fmt.Errorf("failed to initialize parquet querier: %v", err) + } + queriable = pq + } t.StoreQueryables = append(t.StoreQueryables, querier.UseAlwaysQueryable(q)) - if s, ok := q.(services.Service); ok { + if s, ok := queriable.(services.Service); ok { servs = append(servs, s) } } @@ -424,7 +433,7 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) { } } -func initQueryableForEngine(cfg Config, limits *validation.Overrides, reg prometheus.Registerer) (prom_storage.Queryable, error) { +func initQueryableForEngine(cfg Config, limits *validation.Overrides, reg prometheus.Registerer) (*querier.BlocksStoreQueryable, error) { // When running in single binary, if the blocks sharding is disabled and no custom // store-gateway address has been configured, we can set it to the running process. if cfg.isModuleEnabled(All) && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" { diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 0ae35f364d..ef885b1857 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -35,7 +35,7 @@ type parquetQueryableWithFallback struct { queryStoreAfter time.Duration parquetQueryable storage.Queryable - blockStorageQueryable BlocksStoreQueryable + blockStorageQueryable *BlocksStoreQueryable finder BlocksFinder @@ -44,11 +44,11 @@ type parquetQueryableWithFallback struct { subservicesWatcher *services.FailureWatcher } -func newParquetQueryable( +func NewParquetQueryable( + config Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits BlocksStoreLimits, - config Config, - blockStorageQueryable BlocksStoreQueryable, + blockStorageQueryable *BlocksStoreQueryable, logger log.Logger, reg prometheus.Registerer, ) (storage.Queryable, error) { @@ -112,7 +112,7 @@ func newParquetQueryable( return shards, nil }) - q := &parquetQueryableWithFallback{ + p := &parquetQueryableWithFallback{ subservices: manager, blockStorageQueryable: blockStorageQueryable, parquetQueryable: pq, @@ -121,7 +121,7 @@ func newParquetQueryable( finder: finder, } - q.Service = services.NewBasicService(q.starting, q.running, q.stopping) + p.Service = services.NewBasicService(p.starting, p.running, p.stopping) return pq, nil } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f13121caf9..ce2e324312 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -87,6 +87,9 @@ type Config struct { // Ignore max query length check at Querier. IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"` EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"` + + // Query Parquet files if available + QueryParquetFiles bool `yaml:"query_parquet_files" doc:"hidden"` } var ( @@ -130,6 +133,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.") f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.") f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") + f.BoolVar(&cfg.QueryParquetFiles, "querier.query-parquet-files", false, "[Experimental] If true, querier will try to query the parquet files if available.") } // Validate the config