-
Notifications
You must be signed in to change notification settings - Fork 177
Add ChunksIterator method to Series interface. #665
base: master
Are you sure you want to change the base?
Changes from 3 commits
64eeed9
4ed00e1
d097d3f
c407499
af8fb41
6f6dd39
78e9e36
f09fb89
220c3dd
b84c439
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ package tsdb | |
|
|
||
| import ( | ||
| "fmt" | ||
| "math" | ||
| "sort" | ||
| "strings" | ||
| "unicode/utf8" | ||
|
|
@@ -55,7 +56,7 @@ type Series interface { | |
| // Iterator returns a new iterator of the data of the series. | ||
| Iterator() SeriesIterator | ||
|
|
||
| // ChunkIterator returns a new iterator of the chunks of the series. | ||
| // ChunkIterator returns a new iterator for the non-overlapping chunks of the series. | ||
| ChunkIterator() ChunkIterator | ||
| } | ||
|
|
||
|
|
@@ -919,7 +920,7 @@ func (s *chainedSeries) ChunkIterator() ChunkIterator { | |
| return ch | ||
| } | ||
|
|
||
| // chainedSeriesIterator implements a series iterater over a list | ||
| // chainedSeriesIterator implements a series iterated over a list | ||
| // of time-sorted, non-overlapping iterators. | ||
| type chainedSeriesIterator struct { | ||
| series []Series // series in time order | ||
|
|
@@ -991,14 +992,10 @@ func (s *verticalChainedSeries) Iterator() SeriesIterator { | |
| } | ||
|
|
||
| func (s *verticalChainedSeries) ChunkIterator() ChunkIterator { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be useful to also add a test for this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a test that covers this? |
||
| ch := &chainedChunkIterator{chain: make([]ChunkIterator, 0, len(s.series))} | ||
| for _, s := range s.series { | ||
| ch.chain = append(ch.chain, s.ChunkIterator()) | ||
| } | ||
| return ch | ||
| return newVerticalMergeChunkIterator(s.series...) | ||
| } | ||
|
|
||
| // verticalMergeSeriesIterator implements a series iterater over a list | ||
| // verticalMergeSeriesIterator implements a series iterator over a list | ||
| // of time-sorted, time-overlapping iterators. | ||
| type verticalMergeSeriesIterator struct { | ||
| a, b SeriesIterator | ||
|
|
@@ -1078,6 +1075,141 @@ func (it *verticalMergeSeriesIterator) Err() error { | |
| return it.b.Err() | ||
| } | ||
|
|
||
| type noSeekSeriesIterator struct { | ||
| chunkenc.Iterator | ||
| err error | ||
| } | ||
|
|
||
| func (it *noSeekSeriesIterator) Seek(t int64) bool { | ||
| it.err = errors.New("not implemented: Seek method invoked for noSeekSeriesIterator") | ||
|
||
| return false | ||
| } | ||
|
|
||
| func (it *noSeekSeriesIterator) Err() error { | ||
| if it.err != nil { | ||
| return it.err | ||
| } | ||
| return it.Iterator.Err() | ||
| } | ||
|
|
||
| // verticalMergeChunkIterator implements a ChunkIterator over a list | ||
| // of time-sorted, time-overlapping chunk iterators for the same labels (same series). | ||
| // Any overlap in chunks will be merged using verticalMergeSeriesIterator. | ||
| type verticalMergeChunkIterator struct { | ||
| a, b ChunkIterator | ||
| aok, bok, initialized bool | ||
|
|
||
| curMeta chunks.Meta | ||
| err error | ||
|
|
||
| aReuseIter, bReuseIter chunkenc.Iterator | ||
| } | ||
|
|
||
| func newVerticalMergeChunkIterator(s ...Series) ChunkIterator { | ||
| if len(s) == 1 { | ||
| return s[0].ChunkIterator() | ||
| } else if len(s) == 2 { | ||
| return &verticalMergeChunkIterator{ | ||
| a: s[0].ChunkIterator(), | ||
| b: s[1].ChunkIterator(), | ||
| } | ||
| } | ||
| return &verticalMergeChunkIterator{ | ||
| a: s[0].ChunkIterator(), | ||
| b: newVerticalMergeChunkIterator(s[1:]...), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find this really hard to understand. I know it is used in the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The recursive approach is sometimes hard to follow indeed. Let's postpone cleaning this though, as we do this everywhere.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did run a debug session and the final struct is really bizarre, sort of deeply nested. But yeah a conversation for another PR. |
||
| } | ||
| } | ||
|
|
||
| func (it *verticalMergeChunkIterator) Next() bool { | ||
bwplotka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if !it.initialized { | ||
| it.aok = it.a.Next() | ||
| it.bok = it.b.Next() | ||
| it.initialized = true | ||
| } | ||
|
|
||
| if !it.aok && !it.bok { | ||
| return false | ||
| } | ||
|
|
||
| if !it.aok { | ||
| it.curMeta = it.b.At() | ||
| it.bok = it.b.Next() | ||
| return true | ||
| } | ||
| if !it.bok { | ||
| it.curMeta = it.a.At() | ||
| it.aok = it.a.Next() | ||
| return true | ||
| } | ||
|
|
||
| aCurMeta := it.a.At() | ||
| bCurMeta := it.b.At() | ||
|
|
||
| if aCurMeta.MaxTime < bCurMeta.MinTime { | ||
| it.curMeta = aCurMeta | ||
| it.aok = it.a.Next() | ||
| return true | ||
| } | ||
|
|
||
| if bCurMeta.MaxTime < aCurMeta.MinTime { | ||
| it.curMeta = bCurMeta | ||
| it.bok = it.b.Next() | ||
| return true | ||
| } | ||
|
|
||
| chk := chunkenc.NewXORChunk() | ||
| app, err := chk.Appender() | ||
| if err != nil { | ||
| it.err = err | ||
| return false | ||
| } | ||
| seriesIter := &verticalMergeSeriesIterator{ | ||
| a: &noSeekSeriesIterator{Iterator: aCurMeta.Chunk.Iterator(it.aReuseIter)}, | ||
| b: &noSeekSeriesIterator{Iterator: bCurMeta.Chunk.Iterator(it.bReuseIter)}, | ||
| } | ||
|
|
||
| mint := int64(math.MaxInt64) | ||
| maxt := int64(0) | ||
krasi-georgiev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // TODO: This can end up being up to 240 samples per chunk, so we need to have a case to split to two. | ||
bwplotka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| for seriesIter.Next() { | ||
| t, v := seriesIter.At() | ||
| app.Append(t, v) | ||
|
|
||
| maxt = t | ||
| if mint == math.MaxInt64 { | ||
| mint = t | ||
| } | ||
| } | ||
| if err := seriesIter.Err(); err != nil { | ||
| it.err = err | ||
| return false | ||
| } | ||
|
|
||
| it.curMeta = chunks.Meta{ | ||
| MinTime: mint, | ||
| MaxTime: maxt, | ||
| Chunk: chk, | ||
| } | ||
| it.aok = it.a.Next() | ||
| it.bok = it.b.Next() | ||
| return true | ||
| } | ||
|
|
||
| func (it *verticalMergeChunkIterator) At() chunks.Meta { | ||
| return it.curMeta | ||
| } | ||
|
|
||
| func (it *verticalMergeChunkIterator) Err() error { | ||
| if it.err != nil { | ||
| return it.err | ||
| } | ||
| if it.a.Err() != nil { | ||
| return it.a.Err() | ||
| } | ||
| return it.b.Err() | ||
| } | ||
|
|
||
| // chunkSeriesIterator implements a series iterator on top | ||
| // of a list of time-sorted, non-overlapping chunks. | ||
| type chunkSeriesIterator struct { | ||
|
|
@@ -1244,8 +1376,16 @@ type ChunkIterator interface { | |
| Err() error | ||
| } | ||
|
|
||
| type errChunkIterator struct { | ||
| err error | ||
| } | ||
|
|
||
| func (s errChunkIterator) Next() bool { return false } | ||
| func (s errChunkIterator) At() chunks.Meta { return chunks.Meta{} } | ||
| func (s errChunkIterator) Err() error { return s.err } | ||
|
|
||
| type chunkIterator struct { | ||
| chunks []chunks.Meta | ||
| chunks []chunks.Meta // series in time order | ||
| i int | ||
| } | ||
|
|
||
|
|
@@ -1263,8 +1403,10 @@ func (c *chunkIterator) At() chunks.Meta { | |
|
|
||
| func (c *chunkIterator) Err() error { return nil } | ||
|
|
||
| // chainedChunkIterator implements flat iteration for chunks iterated over a list | ||
| // of time-sorted, non-overlapping iterators for each series. | ||
| type chainedChunkIterator struct { | ||
| chain []ChunkIterator | ||
| chain []ChunkIterator // chunk iterators for each series in time order | ||
| i int | ||
| err error | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.