@@ -1113,8 +1113,10 @@ type memSeries struct {
11131113
11141114 app chunkenc.Appender // Current appender for the chunk.
11151115
1116- // Write ids of most recent samples.
1117- writeIds []uint64
1116+ // Write ids of most recent samples. This is a ring buffer.
1117+ writeIds []uint64
1118+ writeIdFirst int // Position of first id in the ring.
1119+ writeIdCount int // How many ids in the ring.
11181120}
11191121
11201122func (s * memSeries ) minTime () int64 {
@@ -1151,7 +1153,7 @@ func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
11511153 ref : id ,
11521154 chunkRange : chunkRange ,
11531155 nextAt : math .MinInt64 ,
1154- writeIds : []uint64 {} ,
1156+ writeIds : make ( []uint64 , 4 ) ,
11551157 }
11561158 return s
11571159}
@@ -1242,26 +1244,39 @@ func (s *memSeries) append(t int64, v float64, writeId uint64) (success, chunkCr
12421244 s .sampleBuf [2 ] = s .sampleBuf [3 ]
12431245 s .sampleBuf [3 ] = sample {t : t , v : v }
12441246
1245- s .writeIds = append (s .writeIds , writeId )
1247+ if s .writeIdCount == len (s .writeIds ) {
1248+ // Ring buffer is full, expand by doubling.
1249+ newRing := make ([]uint64 , s .writeIdCount * 2 )
1250+ copy (newRing [s .writeIdCount + s .writeIdFirst :], s .writeIds [s .writeIdFirst :])
1251+ copy (newRing , s .writeIds [:s .writeIdFirst ])
1252+ s .writeIds = newRing
1253+ s .writeIdFirst += s .writeIdCount
1254+ }
1255+ s .writeIds [(s .writeIdFirst + s .writeIdCount )% len (s .writeIds )] = writeId
1256+ s .writeIdCount ++
12461257
12471258 return true , chunkCreated
12481259}
12491260
12501261// cleanupWriteIdsBelow cleans up older writeIds. Has to be called after acquiring
12511262// lock.
12521263func (s * memSeries ) cleanupWriteIdsBelow (bound uint64 ) {
1253- toRemove := 0
1254- for _ , id := range s .writeIds {
1255- if id < bound {
1256- toRemove ++
1264+ pos := s .writeIdFirst
1265+
1266+ for s .writeIdCount > 0 {
1267+ if s .writeIds [pos ] < bound {
1268+ s .writeIdFirst ++
1269+ s .writeIdCount --
12571270 } else {
12581271 break
12591272 }
1273+ pos ++
1274+ if pos == len (s .writeIds ) {
1275+ pos = 0
1276+ }
12601277 }
1261-
1262- if toRemove != 0 {
1263- // XXX This doesn't free the RAM.
1264- s .writeIds = s .writeIds [toRemove :]
1278+ if s .writeIdFirst >= len (s .writeIds ) {
1279+ s .writeIdFirst -= len (s .writeIds )
12651280 }
12661281}
12671282
@@ -1297,17 +1312,22 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato
12971312 previousSamples += d .chunk .NumSamples ()
12981313 }
12991314 }
1300- writeIdsToConsider := (previousSamples + c .chunk .NumSamples ()) - (totalSamples - len ( s . writeIds ) )
1301- if writeIdsToConsider > 0 {
1302- // writeIds extends back to at least this chunk .
1303- for index , writeId := range s . writeIds [: writeIdsToConsider ] {
1304- if _ , ok := isolation . incompleteWrites [ writeId ]; ok || writeId > isolation . maxWriteId {
1305- stopAfter = index - ( writeIdsToConsider - c . chunk . NumSamples ())
1306- if stopAfter < 0 {
1307- stopAfter = 0 // Stopped in a previous chunk.
1308- }
1309- break
1315+ writeIdsToConsider := (previousSamples + c .chunk .NumSamples ()) - (totalSamples - s . writeIdCount )
1316+ // Iterate over the ring, find the first one that the isolation state says not
1317+ // to return .
1318+ pos := s . writeIdFirst
1319+ for index := 0 ; index < writeIdsToConsider ; index ++ {
1320+ writeId := s . writeIds [ pos ]
1321+ if _ , ok := isolation . incompleteWrites [ writeId ]; ok || writeId > isolation . maxWriteId {
1322+ stopAfter = index - ( writeIdsToConsider - c . chunk .NumSamples ())
1323+ if stopAfter < 0 {
1324+ stopAfter = 0 // Stopped in a previous chunk.
13101325 }
1326+ break
1327+ }
1328+ pos ++
1329+ if pos == len (s .writeIds ) {
1330+ pos = 0
13111331 }
13121332 }
13131333 }
0 commit comments