Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit b84c439

Browse files
committed
Refactored TestSeriesIterator.
* Simplified; merged seek and non seek cases together. Added explicit min/max only for chunk series iterator, where it is relevant. * Adjusted all seek implementation to match edge case requirement (double seek, failed seek + next). Signed-off-by: Bartek Plotka <[email protected]>
1 parent 220c3dd commit b84c439

File tree

4 files changed

+235
-267
lines changed

4 files changed

+235
-267
lines changed

chunkenc/chunk.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ type Appender interface {
5959
// Iterator iterates over the data of a time series.
6060
type Iterator interface {
6161
// Seek advances the iterator forward to the sample with the timestamp t or first value after t.
62-
// If the current iterator points to the sample with timestamp after t already,
63-
// Seek should not advance the iterator.
62+
// If the current iterator points to the sample with timestamp after t already, Seek should not advance the iterator.
6463
// Seek returns false if there is no such sample with the timestamp equal or larger than t.
64+
// Iterator can be exhausted when the Seek returns false.
6565
Seek(t int64) bool
6666
// At returns the current timestamp/value pair.
6767
At() (int64, float64)

chunkenc/chunk_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func testChunk(t *testing.T, c Chunk) {
107107
testutil.Ok(t, it3.Err())
108108
testutil.Equals(t, all[mid:], res3)
109109

110-
testutil.Equals(t, false, it3.Seek(all[len(all)-1].t + 1))
110+
testutil.Equals(t, false, it3.Seek(all[len(all)-1].t+1))
111111
}
112112

113113
func benchmarkIterator(b *testing.B, newChunk func() Chunk) {

querier.go

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,19 @@ type Series interface {
6060
ChunkIterator() ChunkIterator
6161
}
6262

63+
// ChunkIterator iterates over the chunk of a time series.
64+
type ChunkIterator interface {
65+
// Seek advances the iterator forward to the given timestamp.
66+
// It advances to the chunk with min time at t or first chunk with min time after t.
67+
Seek(t int64) bool
68+
// At returns the meta.
69+
At() chunks.Meta
70+
// Next advances the iterator by one.
71+
Next() bool
72+
// Err returns optional error if Next is false.
73+
Err() error
74+
}
75+
6376
// querier aggregates querying results from time blocks within
6477
// a single partition.
6578
type querier struct {
@@ -942,7 +955,7 @@ func (it *chainedSeriesIterator) Next() bool {
942955
if it.cur.Next() {
943956
return true
944957
}
945-
if err := it.cur.Err(); err != nil {
958+
if it.cur.Err() != nil {
946959
return false
947960
}
948961
if it.i == len(it.series)-1 {
@@ -1007,6 +1020,10 @@ func newVerticalMergeSeriesIterator(s ...Series) chunkenc.Iterator {
10071020
}
10081021

10091022
func (it *verticalMergeSeriesIterator) Seek(t int64) bool {
1023+
if it.initialized && it.curT >= t {
1024+
return true
1025+
}
1026+
10101027
it.aok, it.bok = it.a.Seek(t), it.b.Seek(t)
10111028
it.initialized = true
10121029
return it.Next()
@@ -1157,7 +1174,7 @@ func mergeOverlappingChunks(a, b chunks.Meta, aReuseIter, bReuseIter chunkenc.It
11571174
}
11581175

11591176
mint := int64(math.MaxInt64)
1160-
maxt := int64(0)
1177+
maxt := int64(math.MinInt64)
11611178

11621179
// TODO: This can end up being up to 240 samples per chunk, so we need to have a case to split to two.
11631180
for seriesIter.Next() {
@@ -1243,38 +1260,51 @@ func (it *chunkSeriesIterator) resetCurIterator() {
12431260
it.cur = it.bufDelIter
12441261
}
12451262

1246-
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
1247-
if t > it.maxt {
1263+
func (it *chunkSeriesIterator) Seek(t int64) bool {
1264+
if it.Err() != nil || t > it.maxt || it.i > len(it.chunks)-1 {
1265+
// Exhaust iterator.
1266+
it.i = len(it.chunks)
12481267
return false
12491268
}
12501269

1251-
// Seek to the first valid value after t.
12521270
if t < it.mint {
12531271
t = it.mint
12541272
}
12551273

1274+
currI := it.i
12561275
for ; it.chunks[it.i].MaxTime < t; it.i++ {
12571276
if it.i == len(it.chunks)-1 {
1277+
// Exhaust iterator.
1278+
it.i = len(it.chunks)
12581279
return false
12591280
}
12601281
}
12611282

1262-
it.resetCurIterator()
1283+
if currI != it.i {
1284+
it.resetCurIterator()
1285+
}
12631286

1264-
for it.cur.Next() {
1265-
t0, _ := it.cur.At()
1266-
if t0 >= t {
1267-
return true
1287+
tc, _ := it.cur.At()
1288+
for t > tc {
1289+
if !it.cur.Next() {
1290+
// Exhaust iterator.
1291+
it.i = len(it.chunks)
1292+
return false
12681293
}
1294+
tc, _ = it.cur.At()
12691295
}
1270-
return false
1296+
return true
12711297
}
12721298

12731299
func (it *chunkSeriesIterator) At() (t int64, v float64) {
12741300
return it.cur.At()
12751301
}
12761302

12771303
func (it *chunkSeriesIterator) Next() bool {
1304+
if it.Err() != nil || it.i > len(it.chunks)-1 {
1305+
return false
1306+
}
1307+
12781308
if it.cur.Next() {
12791309
t, _ := it.cur.At()
12801310

@@ -1369,19 +1399,6 @@ func (s errSeriesSet) Next() bool { return false }
13691399
func (s errSeriesSet) At() Series { return nil }
13701400
func (s errSeriesSet) Err() error { return s.err }
13711401

1372-
// ChunkIterator iterates over the chunk of a time series.
1373-
type ChunkIterator interface {
1374-
// Seek advances the iterator forward to the given timestamp.
1375-
// It advances to the chunk with min time at t or first chunk with min time after t.
1376-
Seek(t int64) bool
1377-
// At returns the meta.
1378-
At() chunks.Meta
1379-
// Next advances the iterator by one.
1380-
Next() bool
1381-
// Err returns optional error if Next is false.
1382-
Err() error
1383-
}
1384-
13851402
type chunkIterator struct {
13861403
chunks []chunks.Meta // series in time order
13871404
i int

0 commit comments

Comments
 (0)