Skip to content

Commit a023b5c

Browse files
authored
PRW2: Fix data corruption caused by shallow copy (#7337)
* Fix data corruption casused by shallow copy Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * fix test Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * fix lint Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> --------- Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent e62ef7a commit a023b5c

File tree

4 files changed

+209
-6
lines changed

4 files changed

+209
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
* [BUGFIX] Distributor: Return remote write V2 stats headers properly when the request is HA deduplicated. #7240
6363
* [BUGFIX] Cache: Fix Redis Cluster EXECABORT error in MSet by using individual SET commands instead of transactions for cluster mode. #7262
6464
* [BUGFIX] Distributor: Fix an `index out of range` panic in PRW2.0 handler caused by dirty metadata when reusing requests from `sync.Pool`. #7299
65+
* [BUGFIX] Distributor: Fix data corruption in the push handler caused by shallow copying `Samples` and `Histograms` when converting Remote Write V2 requests to V1. #7337
6566

6667

6768
## 1.20.1 2025-12-03

integration/remote_write_v2_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package integration
55
import (
66
"math/rand"
77
"path"
8+
"sync"
89
"testing"
910
"time"
1011

@@ -535,6 +536,150 @@ func Test_WriteStatWithReplication(t *testing.T) {
535536
testPushHeader(t, writeStats, 20, 0, 0)
536537
}
537538

539+
// This test verifies PRW1 and PRW2 memory pools do not interfere with each other.
540+
func TestIngest_PRW2_MemoryIndependence(t *testing.T) {
541+
const blockRangePeriod = 5 * time.Second
542+
543+
s, err := e2e.NewScenario(networkName)
544+
require.NoError(t, err)
545+
defer s.Close()
546+
547+
consul := e2edb.NewConsulWithName("consul")
548+
require.NoError(t, s.StartAndWaitReady(consul))
549+
550+
flags := mergeFlags(
551+
AlertmanagerLocalFlags(),
552+
map[string]string{
553+
"-store.engine": blocksStorageEngine,
554+
"-blocks-storage.backend": "filesystem",
555+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
556+
"-blocks-storage.bucket-store.sync-interval": "15m",
557+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
558+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
559+
"-blocks-storage.tsdb.ship-interval": "1s",
560+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
561+
// Ingester.
562+
"-ring.store": "consul",
563+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
564+
// Distributor.
565+
"-distributor.replication-factor": "1",
566+
"-distributor.remote-writev2-enabled": "true",
567+
// Store-gateway.
568+
"-store-gateway.sharding-enabled": "false",
569+
// alert manager
570+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
571+
},
572+
)
573+
574+
// make alert manager config dir
575+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
576+
577+
path := path.Join(s.SharedDir(), "cortex-1")
578+
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
579+
580+
// Start Cortex
581+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
582+
require.NoError(t, s.StartAndWaitReady(cortex))
583+
584+
// Wait until Cortex replicas have updated the ring state.
585+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
586+
587+
cPRW1, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-prw1")
588+
require.NoError(t, err)
589+
590+
cPRW2, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-prw2")
591+
require.NoError(t, err)
592+
var wg sync.WaitGroup
593+
594+
scrapeInterval := 5 * time.Second
595+
end := time.Now()
596+
start := end.Add(-time.Hour * 2)
597+
598+
expectedPushesPerProtocol := int(end.Sub(start) / scrapeInterval)
599+
600+
// We will concurrently push two distinct metrics using two different protocols.
601+
// test_metric_prw1 is pushed via PRW1 with Value: 1.0
602+
// test_metric_prw2 is pushed via PRW2 with Value: 999.0
603+
// If the memory pool overlaps due to shallow copy during V2->V1 conversion,
604+
// test_metric_prw1 will occasionally read 999.0.
605+
wg.Add(2)
606+
607+
// Goroutine 1: Send PRW1 Requests
608+
go func() {
609+
defer wg.Done()
610+
// Iterate from start to end by scrapeInterval
611+
for t := start; t.Before(end); t = t.Add(scrapeInterval) {
612+
ts := t.UnixMilli()
613+
614+
seriesV1 := []prompb.TimeSeries{
615+
{
616+
Labels: []prompb.Label{
617+
{Name: "__name__", Value: "test_metric_prw1"},
618+
},
619+
Samples: []prompb.Sample{
620+
{Value: 1.0, Timestamp: ts},
621+
},
622+
},
623+
}
624+
_, _ = cPRW1.Push(seriesV1)
625+
}
626+
}()
627+
628+
// Goroutine 2: Send PRW2 Requests
629+
go func() {
630+
defer wg.Done()
631+
// Iterate from start to end by scrapeInterval
632+
for t := start; t.Before(end); t = t.Add(scrapeInterval) {
633+
ts := t.UnixMilli()
634+
635+
symbols := []string{"", "__name__", "test_metric_prw2"}
636+
seriesV2 := []writev2.TimeSeries{
637+
{
638+
LabelsRefs: []uint32{1, 2},
639+
Samples: []writev2.Sample{{Value: 999.0, Timestamp: ts}},
640+
},
641+
}
642+
_, _ = cPRW2.PushV2(symbols, seriesV2)
643+
}
644+
}()
645+
646+
// Wait for all concurrent pushes to finish
647+
wg.Wait()
648+
649+
// Check PRW1 requests
650+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(expectedPushesPerProtocol)), []string{"cortex_distributor_push_requests_total"}, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "type", "prw1"))))
651+
// Check PRW2 requests
652+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(expectedPushesPerProtocol)), []string{"cortex_distributor_push_requests_total"}, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "type", "prw2"))))
653+
654+
resultV1, err := cPRW1.QueryRange(`test_metric_prw1`, start, end, scrapeInterval)
655+
require.NoError(t, err)
656+
require.Equal(t, model.ValMatrix, resultV1.Type())
657+
658+
matrixV1, ok := resultV1.(model.Matrix)
659+
require.True(t, ok)
660+
require.NotEmpty(t, matrixV1)
661+
662+
// Validate no data pollution occurred.
663+
for _, series := range matrixV1 {
664+
for _, sample := range series.Values {
665+
assert.Equal(t, 1.0, float64(sample.Value), "Memory pool overlapped: PRW1 metric has been corrupted!")
666+
}
667+
}
668+
669+
resultV2, err := cPRW2.QueryRange(`test_metric_prw2`, start, end, scrapeInterval)
670+
require.NoError(t, err)
671+
matrixV2, ok := resultV2.(model.Matrix)
672+
require.True(t, ok)
673+
require.NotEmpty(t, matrixV2)
674+
675+
// Validate no data pollution occurred.
676+
for _, series := range matrixV2 {
677+
for _, sample := range series.Values {
678+
assert.Equal(t, 999.0, float64(sample.Value), "Memory pool overlapped: PRW2 metric has been corrupted!")
679+
}
680+
}
681+
}
682+
538683
func testPushHeader(t *testing.T, stats remoteapi.WriteResponseStats, expectedSamples, expectedHistogram, expectedExemplars int) {
539684
require.Equal(t, expectedSamples, stats.Samples)
540685
require.Equal(t, expectedHistogram, stats.Histograms)

pkg/util/push/push.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -251,13 +251,14 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni
251251
return v1Req, err
252252
}
253253

254+
ts := cortexpb.TimeseriesFromPool()
255+
ts.Labels = cortexpb.FromLabelsToLabelAdapters(lbs)
256+
ts.Samples = append(ts.Samples, v2Ts.Samples...)
257+
ts.Exemplars = exemplars
258+
ts.Histograms = append(ts.Histograms, v2Ts.Histograms...)
259+
254260
v1Timeseries = append(v1Timeseries, cortexpb.PreallocTimeseries{
255-
TimeSeries: &cortexpb.TimeSeries{
256-
Labels: cortexpb.FromLabelsToLabelAdapters(lbs),
257-
Samples: v2Ts.Samples,
258-
Exemplars: exemplars,
259-
Histograms: v2Ts.Histograms,
260-
},
261+
TimeSeries: ts,
261262
})
262263

263264
if shouldConvertV2Metadata(v2Ts.Metadata) {

pkg/util/push/push_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,19 @@ func Test_convertV2RequestToV1_WithEnableTypeAndUnitLabels(t *testing.T) {
378378
for _, test := range tests {
379379
t.Run(test.desc, func(t *testing.T) {
380380
v1Req, err := convertV2RequestToV1(test.v2Req, test.enableTypeAndUnitLabels)
381+
382+
for i := range v1Req.Timeseries {
383+
if len(v1Req.Timeseries[i].Samples) == 0 {
384+
v1Req.Timeseries[i].Samples = nil
385+
}
386+
if len(v1Req.Timeseries[i].Exemplars) == 0 {
387+
v1Req.Timeseries[i].Exemplars = nil
388+
}
389+
if len(v1Req.Timeseries[i].Histograms) == 0 {
390+
v1Req.Timeseries[i].Histograms = nil
391+
}
392+
}
393+
381394
require.NoError(t, err)
382395
require.Equal(t, test.expectedV1Req, v1Req)
383396
})
@@ -1168,3 +1181,46 @@ func TestHandler_RemoteWriteV2_MetadataPoolReset(t *testing.T) {
11681181
resp2 := sendRequest(&req2Proto)
11691182
require.Equal(t, http.StatusNoContent, resp2.Code)
11701183
}
1184+
1185+
func Test_convertV2RequestToV1_DeepCopy(t *testing.T) {
1186+
fh := tsdbutil.GenerateTestFloatHistogram(1)
1187+
ph := cortexpb.FloatHistogramToHistogramProto(4, fh)
1188+
1189+
v2Req := &cortexpb.PreallocWriteRequestV2{
1190+
WriteRequestV2: cortexpb.WriteRequestV2{
1191+
Symbols: []string{"", "__name__", "test_metric"},
1192+
Timeseries: []cortexpb.PreallocTimeseriesV2{
1193+
{
1194+
TimeSeriesV2: &cortexpb.TimeSeriesV2{
1195+
LabelsRefs: []uint32{1, 2},
1196+
Samples: []cortexpb.Sample{
1197+
{Value: 1.0, TimestampMs: 1000},
1198+
},
1199+
Exemplars: []cortexpb.ExemplarV2{
1200+
{LabelsRefs: []uint32{1, 2}, Value: 2.0, Timestamp: 1000},
1201+
},
1202+
Histograms: []cortexpb.Histogram{
1203+
ph,
1204+
},
1205+
},
1206+
},
1207+
},
1208+
},
1209+
}
1210+
1211+
v1Req, err := convertV2RequestToV1(v2Req, false)
1212+
require.NoError(t, err)
1213+
require.Len(t, v1Req.Timeseries, 1)
1214+
1215+
v1Ts := v1Req.Timeseries[0]
1216+
v2Ts := v2Req.Timeseries[0]
1217+
1218+
require.True(t, len(v1Ts.Samples) > 0 && len(v2Ts.Samples) > 0)
1219+
require.NotSame(t, &v1Ts.Samples[0], &v2Ts.Samples[0], "Samples array must not share the same memory address")
1220+
1221+
require.True(t, len(v1Ts.Exemplars) > 0 && len(v2Ts.Exemplars) > 0)
1222+
require.NotSame(t, &v1Ts.Exemplars[0], &v2Ts.Exemplars[0], "Exemplars array must not share the same memory address")
1223+
1224+
require.True(t, len(v1Ts.Histograms) > 0 && len(v2Ts.Histograms) > 0)
1225+
require.NotSame(t, &v1Ts.Histograms[0], &v2Ts.Histograms[0], "Histograms array must not share the same memory address")
1226+
}

0 commit comments

Comments
 (0)