@@ -1116,8 +1116,10 @@ type memSeries struct {
11161116
11171117 app chunkenc.Appender // Current appender for the chunk.
11181118
1119- // Write ids of most recent samples.
1120- writeIds []uint64
1119+ // Write ids of most recent samples. This is a ring buffer.
1120+ writeIds []uint64
1121+ writeIdFirst int // Position of first id in the ring.
1122+ writeIdCount int // How many ids in the ring.
11211123}
11221124
11231125func (s * memSeries ) minTime () int64 {
@@ -1154,7 +1156,7 @@ func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
11541156 ref : id ,
11551157 chunkRange : chunkRange ,
11561158 nextAt : math .MinInt64 ,
1157- writeIds : []uint64 {} ,
1159+ writeIds : make ( []uint64 , 4 ) ,
11581160 }
11591161 return s
11601162}
@@ -1245,26 +1247,39 @@ func (s *memSeries) append(t int64, v float64, writeId uint64) (success, chunkCr
12451247 s .sampleBuf [2 ] = s .sampleBuf [3 ]
12461248 s .sampleBuf [3 ] = sample {t : t , v : v }
12471249
1248- s .writeIds = append (s .writeIds , writeId )
1250+ if s .writeIdCount == len (s .writeIds ) {
1251+ // Ring buffer is full, expand by doubling.
1252+ newRing := make ([]uint64 , s .writeIdCount * 2 )
1253+ copy (newRing [s .writeIdCount + s .writeIdFirst :], s .writeIds [s .writeIdFirst :])
1254+ copy (newRing , s .writeIds [:s .writeIdFirst ])
1255+ s .writeIds = newRing
1256+ s .writeIdFirst += s .writeIdCount
1257+ }
1258+ s .writeIds [(s .writeIdFirst + s .writeIdCount )% len (s .writeIds )] = writeId
1259+ s .writeIdCount ++
12491260
12501261 return true , chunkCreated
12511262}
12521263
12531264// cleanupWriteIdsBelow cleans up older writeIds. Has to be called after acquiring
12541265// lock.
12551266func (s * memSeries ) cleanupWriteIdsBelow (bound uint64 ) {
1256- toRemove := 0
1257- for _ , id := range s .writeIds {
1258- if id < bound {
1259- toRemove ++
1267+ pos := s .writeIdFirst
1268+
1269+ for s .writeIdCount > 0 {
1270+ if s .writeIds [pos ] < bound {
1271+ s .writeIdFirst ++
1272+ s .writeIdCount --
12601273 } else {
12611274 break
12621275 }
1276+ pos ++
1277+ if pos == len (s .writeIds ) {
1278+ pos = 0
1279+ }
12631280 }
1264-
1265- if toRemove != 0 {
1266- // XXX This doesn't free the RAM.
1267- s .writeIds = s .writeIds [toRemove :]
1281+ if s .writeIdFirst >= len (s .writeIds ) {
1282+ s .writeIdFirst -= len (s .writeIds )
12681283 }
12691284}
12701285
@@ -1300,17 +1315,22 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato
13001315 previousSamples += d .chunk .NumSamples ()
13011316 }
13021317 }
1303- writeIdsToConsider := (previousSamples + c .chunk .NumSamples ()) - (totalSamples - len ( s . writeIds ) )
1304- if writeIdsToConsider > 0 {
1305- // writeIds extends back to at least this chunk .
1306- for index , writeId := range s . writeIds [: writeIdsToConsider ] {
1307- if _ , ok := isolation . incompleteWrites [ writeId ]; ok || writeId > isolation . maxWriteId {
1308- stopAfter = index - ( writeIdsToConsider - c . chunk . NumSamples ())
1309- if stopAfter < 0 {
1310- stopAfter = 0 // Stopped in a previous chunk.
1311- }
1312- break
1318+ writeIdsToConsider := (previousSamples + c .chunk .NumSamples ()) - (totalSamples - s . writeIdCount )
1319+ // Iterate over the ring, find the first one that the isolation state says not
1320+ // to return .
1321+ pos := s . writeIdFirst
1322+ for index := 0 ; index < writeIdsToConsider ; index ++ {
1323+ writeId := s . writeIds [ pos ]
1324+ if _ , ok := isolation . incompleteWrites [ writeId ]; ok || writeId > isolation . maxWriteId {
1325+ stopAfter = index - ( writeIdsToConsider - c . chunk .NumSamples ())
1326+ if stopAfter < 0 {
1327+ stopAfter = 0 // Stopped in a previous chunk.
13131328 }
1329+ break
1330+ }
1331+ pos ++
1332+ if pos == len (s .writeIds ) {
1333+ pos = 0
13141334 }
13151335 }
13161336 }
0 commit comments