Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit c407499

Browse files
committed
Added simple verticalMergeChunkIterator implementation - no tests so far.
TODO: tests & splitting into 2. Signed-off-by: Bartek Plotka <[email protected]>
1 parent d097d3f commit c407499

File tree

1 file changed

+139
-5
lines changed

1 file changed

+139
-5
lines changed

querier.go

Lines changed: 139 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package tsdb
1515

1616
import (
1717
"fmt"
18+
"math"
1819
"sort"
1920
"strings"
2021
"unicode/utf8"
@@ -55,7 +56,7 @@ type Series interface {
5556
// Iterator returns a new iterator of the data of the series.
5657
Iterator() SeriesIterator
5758

58-
// ChunkIterator returns a new iterator of the chunks of the series.
59+
// ChunkIterator returns a new iterator for the non-overlapping chunks of the series.
5960
ChunkIterator() ChunkIterator
6061
}
6162

@@ -990,13 +991,11 @@ func (s *verticalChainedSeries) Iterator() SeriesIterator {
990991
return newVerticalMergeSeriesIterator(s.series...)
991992
}
992993

993-
// ChunkIterator is currently not implemented.
994-
// TODO(bwplotka): Implement once we will want to use chunks in vertical compaction.
995994
func (s *verticalChainedSeries) ChunkIterator() ChunkIterator {
996-
return errChunkIterator{err: errors.New("Not Implemented")}
995+
return newVerticalMergeChunkIterator(s.series...)
997996
}
998997

999-
// verticalMergeSeriesIterator implements a series iterater over a list
998+
// verticalMergeSeriesIterator implements a series iterator over a list
1000999
// of time-sorted, time-overlapping iterators.
10011000
type verticalMergeSeriesIterator struct {
10021001
a, b SeriesIterator
@@ -1076,6 +1075,141 @@ func (it *verticalMergeSeriesIterator) Err() error {
10761075
return it.b.Err()
10771076
}
10781077

1078+
type noSeekSeriesIterator struct {
1079+
chunkenc.Iterator
1080+
err error
1081+
}
1082+
1083+
func (it *noSeekSeriesIterator) Seek(t int64) bool {
1084+
it.err = errors.New("not implemented: Seek method invoked for noSeekSeriesIterator")
1085+
return false
1086+
}
1087+
1088+
func (it *noSeekSeriesIterator) Err() error {
1089+
if it.err != nil {
1090+
return it.err
1091+
}
1092+
return it.Iterator.Err()
1093+
}
1094+
1095+
// verticalMergeChunkIterator implements a ChunkIterator over a list
1096+
// of time-sorted, time-overlapping chunk iterators for the same labels (same series).
1097+
// Any overlap in chunks will be merged using verticalMergeSeriesIterator.
1098+
type verticalMergeChunkIterator struct {
1099+
a, b ChunkIterator
1100+
aok, bok, initialized bool
1101+
1102+
curMeta chunks.Meta
1103+
err error
1104+
1105+
aReuseIter, bReuseIter chunkenc.Iterator
1106+
}
1107+
1108+
func newVerticalMergeChunkIterator(s ...Series) ChunkIterator {
1109+
if len(s) == 1 {
1110+
return s[0].ChunkIterator()
1111+
} else if len(s) == 2 {
1112+
return &verticalMergeChunkIterator{
1113+
a: s[0].ChunkIterator(),
1114+
b: s[1].ChunkIterator(),
1115+
}
1116+
}
1117+
return &verticalMergeChunkIterator{
1118+
a: s[0].ChunkIterator(),
1119+
b: newVerticalMergeChunkIterator(s[1:]...),
1120+
}
1121+
}
1122+
1123+
func (it *verticalMergeChunkIterator) Next() bool {
1124+
if !it.initialized {
1125+
it.aok = it.a.Next()
1126+
it.bok = it.b.Next()
1127+
it.initialized = true
1128+
}
1129+
1130+
if !it.aok && !it.bok {
1131+
return false
1132+
}
1133+
1134+
if !it.aok {
1135+
it.curMeta = it.b.At()
1136+
it.bok = it.b.Next()
1137+
return true
1138+
}
1139+
if !it.bok {
1140+
it.curMeta = it.a.At()
1141+
it.aok = it.a.Next()
1142+
return true
1143+
}
1144+
1145+
aCurMeta := it.a.At()
1146+
bCurMeta := it.b.At()
1147+
1148+
if aCurMeta.MaxTime < bCurMeta.MinTime {
1149+
it.curMeta = aCurMeta
1150+
it.aok = it.a.Next()
1151+
return true
1152+
}
1153+
1154+
if bCurMeta.MaxTime < aCurMeta.MinTime {
1155+
it.curMeta = bCurMeta
1156+
it.bok = it.b.Next()
1157+
return true
1158+
}
1159+
1160+
chk := chunkenc.NewXORChunk()
1161+
app, err := chk.Appender()
1162+
if err != nil {
1163+
it.err = err
1164+
return false
1165+
}
1166+
seriesIter := &verticalMergeSeriesIterator{
1167+
a: &noSeekSeriesIterator{Iterator: aCurMeta.Chunk.Iterator(it.aReuseIter)},
1168+
b: &noSeekSeriesIterator{Iterator: bCurMeta.Chunk.Iterator(it.bReuseIter)},
1169+
}
1170+
1171+
mint := int64(math.MaxInt64)
1172+
maxt := int64(0)
1173+
1174+
// TODO: This can end up being up to 240 samples per chunk, so we need to have a case to split to two.
1175+
for seriesIter.Next() {
1176+
t, v := seriesIter.At()
1177+
app.Append(t, v)
1178+
1179+
maxt = t
1180+
if mint == math.MaxInt64 {
1181+
mint = t
1182+
}
1183+
}
1184+
if err := seriesIter.Err(); err != nil {
1185+
it.err = err
1186+
return false
1187+
}
1188+
1189+
it.curMeta = chunks.Meta{
1190+
MinTime: mint,
1191+
MaxTime: maxt,
1192+
Chunk: chk,
1193+
}
1194+
it.aok = it.a.Next()
1195+
it.bok = it.b.Next()
1196+
return true
1197+
}
1198+
1199+
func (it *verticalMergeChunkIterator) At() chunks.Meta {
1200+
return it.curMeta
1201+
}
1202+
1203+
func (it *verticalMergeChunkIterator) Err() error {
1204+
if it.err != nil {
1205+
return it.err
1206+
}
1207+
if it.a.Err() != nil {
1208+
return it.a.Err()
1209+
}
1210+
return it.b.Err()
1211+
}
1212+
10791213
// chunkSeriesIterator implements a series iterator on top
10801214
// of a list of time-sorted, non-overlapping chunks.
10811215
type chunkSeriesIterator struct {

0 commit comments

Comments
 (0)