Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit 7feb231

Browse files
tomwilkiebwplotka
authored andcommitted
Add ChunksIterator method to Series interface.
Signed-off-by: Bartek Plotka <[email protected]>
1 parent 3df36b4 commit 7feb231

File tree

2 files changed

+140
-4
lines changed

2 files changed

+140
-4
lines changed

querier.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ type Series interface {
5454

5555
// Iterator returns a new iterator of the data of the series.
5656
Iterator() SeriesIterator
57+
58+
// ChunkIterator returns a new iterator of the chunks of the series.
59+
ChunkIterator() ChunkIterator
5760
}
5861

5962
// querier aggregates querying results from time blocks within
@@ -876,6 +879,10 @@ func (s *chunkSeries) Iterator() SeriesIterator {
876879
return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt)
877880
}
878881

882+
func (s *chunkSeries) ChunkIterator() ChunkIterator {
883+
return &chunkIterator{chunks: s.chunks}
884+
}
885+
879886
// SeriesIterator iterates over the data of a time series.
880887
type SeriesIterator interface {
881888
// Seek advances the iterator forward to the given timestamp.
@@ -904,6 +911,14 @@ func (s *chainedSeries) Iterator() SeriesIterator {
904911
return newChainedSeriesIterator(s.series...)
905912
}
906913

914+
func (s *chainedSeries) ChunkIterator() ChunkIterator {
915+
ch := &chainedChunkIterator{}
916+
for _, s := range s.series {
917+
ch.chain = append(ch.chain, s.ChunkIterator())
918+
}
919+
return ch
920+
}
921+
907922
// chainedSeriesIterator implements a series iterater over a list
908923
// of time-sorted, non-overlapping iterators.
909924
type chainedSeriesIterator struct {
@@ -975,6 +990,14 @@ func (s *verticalChainedSeries) Iterator() SeriesIterator {
975990
return newVerticalMergeSeriesIterator(s.series...)
976991
}
977992

993+
func (s *verticalChainedSeries) ChunkIterator() ChunkIterator {
994+
ch := &chainedChunkIterator{}
995+
for _, s := range s.series {
996+
ch.chain = append(ch.chain, s.ChunkIterator())
997+
}
998+
return ch
999+
}
1000+
9781001
// verticalMergeSeriesIterator implements a series iterater over a list
9791002
// of time-sorted, time-overlapping iterators.
9801003
type verticalMergeSeriesIterator struct {
@@ -1210,3 +1233,67 @@ type errSeriesSet struct {
12101233
func (s errSeriesSet) Next() bool { return false }
12111234
func (s errSeriesSet) At() Series { return nil }
12121235
func (s errSeriesSet) Err() error { return s.err }
1236+
1237+
// ChunkIterator iterates over the chunk of a time series.
1238+
type ChunkIterator interface {
1239+
// At returns the meta.
1240+
At() chunks.Meta
1241+
// Next advances the iterator by one.
1242+
Next() bool
1243+
// Err returns optional error if Next is false.
1244+
Err() error
1245+
}
1246+
1247+
type chunkIterator struct {
1248+
chunks []chunks.Meta
1249+
i int
1250+
}
1251+
1252+
func (c *chunkIterator) Next() bool {
1253+
if c.i >= len(c.chunks) {
1254+
return false
1255+
}
1256+
c.i++
1257+
return true
1258+
}
1259+
1260+
func (c *chunkIterator) At() chunks.Meta {
1261+
return c.chunks[c.i-1]
1262+
}
1263+
1264+
func (c *chunkIterator) Err() error { return nil }
1265+
1266+
type chainedChunkIterator struct {
1267+
chain []ChunkIterator
1268+
i int
1269+
err error
1270+
}
1271+
1272+
func (c *chainedChunkIterator) Next() bool {
1273+
if c.i == 0 {
1274+
if len(c.chain) == 0 {
1275+
return false
1276+
}
1277+
c.i++
1278+
}
1279+
1280+
for {
1281+
if c.chain[c.i-1].Next() {
1282+
return true
1283+
}
1284+
if err := c.chain[c.i-1].Err(); err != nil {
1285+
c.err = err
1286+
return false
1287+
}
1288+
if c.i >= len(c.chain) {
1289+
return false
1290+
}
1291+
c.i++
1292+
}
1293+
}
1294+
1295+
func (c *chainedChunkIterator) At() chunks.Meta {
1296+
return c.chain[c.i-1].At()
1297+
}
1298+
1299+
func (c *chainedChunkIterator) Err() error { return c.err }

querier_test.go

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,13 @@ func expandSeriesIterator(it SeriesIterator) (r []tsdbutil.Sample, err error) {
188188
return r, it.Err()
189189
}
190190

191+
func expandChunkIterator(it ChunkIterator) (chks []chunks.Meta) {
192+
for it.Next() {
193+
chks = append(chks, it.At())
194+
}
195+
return chks
196+
}
197+
191198
type seriesSamples struct {
192199
lset map[string]string
193200
chunks [][]sample
@@ -661,8 +668,9 @@ type itSeries struct {
661668
si SeriesIterator
662669
}
663670

664-
func (s itSeries) Iterator() SeriesIterator { return s.si }
665-
func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
671+
func (s itSeries) Iterator() SeriesIterator { return s.si }
672+
func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
673+
func (s itSeries) ChunkIterator() ChunkIterator { return nil }
666674

667675
func TestSeriesIterator(t *testing.T) {
668676
itcases := []struct {
@@ -1044,6 +1052,46 @@ func TestSeriesIterator(t *testing.T) {
10441052
})
10451053
}
10461054

1055+
func TestChunkIterator(t *testing.T) {
1056+
it := &chunkIterator{}
1057+
testutil.Equals(t, []chunks.Meta(nil), expandChunkIterator(it))
1058+
testutil.Equals(t, false, it.Next())
1059+
1060+
chks := []chunks.Meta{
1061+
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{1, 2}}),
1062+
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{2, 1}, sample{2, 2}}),
1063+
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{3, 1}, sample{3, 2}}),
1064+
}
1065+
it = &chunkIterator{chunks: chks}
1066+
testutil.Equals(t, chks, expandChunkIterator(it))
1067+
testutil.Equals(t, false, it.Next())
1068+
}
1069+
1070+
func TestChainedChunkIterator(t *testing.T) {
1071+
it := &chainedChunkIterator{}
1072+
testutil.Equals(t, []chunks.Meta(nil), expandChunkIterator(it))
1073+
testutil.Equals(t, false, it.Next())
1074+
1075+
chks1 := []chunks.Meta{
1076+
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}, sample{1, 2}}),
1077+
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{2, 1}, sample{2, 2}}),
1078+
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{3, 1}, sample{3, 2}}),
1079+
}
1080+
chks2 := []chunks.Meta(nil)
1081+
chks3 := []chunks.Meta{
1082+
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{4, 1}, sample{4, 2}}),
1083+
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{5, 1}, sample{5, 2}}),
1084+
}
1085+
1086+
it = &chainedChunkIterator{chain: []ChunkIterator{
1087+
&chunkIterator{chunks: chks1},
1088+
&chunkIterator{chunks: chks2},
1089+
&chunkIterator{chunks: chks3},
1090+
}}
1091+
testutil.Equals(t, append(chks1, chks3...), expandChunkIterator(it))
1092+
testutil.Equals(t, false, it.Next())
1093+
}
1094+
10471095
// Regression for: https://github.com/prometheus/tsdb/pull/97
10481096
func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
10491097
chkMetas := []chunks.Meta{
@@ -1439,8 +1487,9 @@ func newSeries(l map[string]string, s []tsdbutil.Sample) Series {
14391487
iterator: func() SeriesIterator { return newListSeriesIterator(s) },
14401488
}
14411489
}
1442-
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
1443-
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
1490+
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
1491+
func (m *mockSeries) Iterator() SeriesIterator { return m.iterator() }
1492+
func (m *mockSeries) ChunkIterator() ChunkIterator { return nil }
14441493

14451494
type listSeriesIterator struct {
14461495
list []tsdbutil.Sample

0 commit comments

Comments
 (0)