Skip to content

Commit 3cc5c12

Browse files
craig[bot]kev-caoandy-kimball
committed
143930: backup: fix elide skipped layers to account for new incremental path suffix r=msbutler a=kev-cao Due to the changes introduced in #143226, the order of the backup manifests passed to `ElideSkippedLayers` is now changed. Whereas previously a compacted backup would always appear *after* any backup with the same end time, compacted backups now appear *before*. This meant that during restore, compacted backups ended up always being skipped and never used. To fix this, prior to running the `ElideSkippedLayers` algorithm, we now remove duplicate backups (i.e. same end time) and keep the backup with the earliest start time (aka the most compacted backup). Epic: None Release note: None 143963: cspann: fix race conditions in index split operation r=drewkimball a=andy-kimball This commit fixes two race conditions in the index split operation: 1. After setting the state of the left sub-partition to Ready, say that the split unexpectedly fails. Now say that the left sub- partition itself splits and is deleted. When the original split resumes, it will not be able to get the centroid for the left sub-partition, which is needed to run the K-means clustering algorithm. 2. As described by 1, it's possible that a splitting partition references target sub-partitions that are now missing from the index. This will trigger PartitionNotFound errors in insert code paths. The fixes are: 1. Update the logic so that vectors are first copied to the left and right sub-partitions before either sub-partition's state is updated from Updating to Ready. Only Ready sub-partitions can be split, so this should prevent race condition 1. 2. Update the insert logic so that searches of non-root partitions return multiple results, to make it extremely likely that a suitable insert partition will be found. For root partitions, check split target sub-partitions instead, since the splitting partition does not share a parent with its sub-partitions. Epic: CRDB-42943 Release note: None Co-authored-by: Kevin Cao <[email protected]> Co-authored-by: Andrew Kimball <[email protected]>
3 parents 5f91990 + 4731465 + c8605bb commit 3cc5c12

File tree

6 files changed

+441
-158
lines changed

6 files changed

+441
-158
lines changed

Diff for: pkg/backup/backupinfo/manifest_handling.go

+80-4
Original file line numberDiff line numberDiff line change
@@ -887,11 +887,19 @@ func ValidateEndTimeAndTruncate(
887887
endTime hlc.Timestamp,
888888
includeSkipped bool,
889889
) ([]string, []backuppb.BackupManifest, []jobspb.RestoreDetails_BackupLocalityInfo, error) {
890+
890891
if endTime.IsEmpty() {
891892
if includeSkipped {
892893
return defaultURIs, mainBackupManifests, localityInfo, nil
893894
}
894-
return ElideSkippedLayers(defaultURIs, mainBackupManifests, localityInfo)
895+
uris, manifests, locality, err := ElideSkippedLayers(defaultURIs, mainBackupManifests, localityInfo)
896+
if err != nil {
897+
return nil, nil, nil, err
898+
}
899+
if err := validateContinuity(manifests, endTime); err != nil {
900+
return nil, nil, nil, err
901+
}
902+
return uris, manifests, locality, nil
895903
}
896904
for i, b := range mainBackupManifests {
897905
// Find the backup that covers the requested time.
@@ -930,7 +938,16 @@ func ValidateEndTimeAndTruncate(
930938
if includeSkipped {
931939
return defaultURIs[:i+1], mainBackupManifests[:i+1], localityInfo[:i+1], nil
932940
}
933-
return ElideSkippedLayers(defaultURIs[:i+1], mainBackupManifests[:i+1], localityInfo[:i+1])
941+
uris, manifests, locality, err := ElideSkippedLayers(
942+
defaultURIs[:i+1], mainBackupManifests[:i+1], localityInfo[:i+1],
943+
)
944+
if err != nil {
945+
return nil, nil, nil, err
946+
}
947+
if err := validateContinuity(manifests, endTime); err != nil {
948+
return nil, nil, nil, err
949+
}
950+
return uris, manifests, locality, nil
934951

935952
}
936953

@@ -939,10 +956,42 @@ func ValidateEndTimeAndTruncate(
939956
)
940957
}
941958

942-
// ElideSkippedLayers removes backups that are skipped in the backup chain.
959+
// ValidateContinuity checks that the backups are continuous and cover the
960+
// requested end time.
961+
func validateContinuity(manifests []backuppb.BackupManifest, endTime hlc.Timestamp) error {
962+
if len(manifests) == 0 {
963+
return errors.AssertionFailedf("an empty chain of backups cannot cover an end time")
964+
}
965+
for i := range len(manifests) - 1 {
966+
if !manifests[i].EndTime.Equal(manifests[i+1].StartTime) {
967+
return errors.AssertionFailedf(
968+
"backups are not continuous: %dth backup ends at %+v, %dth backup starts at %+v",
969+
i, manifests[i].EndTime,
970+
i+1, manifests[i+1].StartTime,
971+
)
972+
}
973+
}
974+
if !endTime.IsEmpty() {
975+
lastManifest := manifests[len(manifests)-1]
976+
if !lastManifest.StartTime.Less(endTime) || !endTime.LessEq(lastManifest.EndTime) {
977+
return errors.AssertionFailedf(
978+
"requested time %s is not covered by the last backup",
979+
endTime,
980+
)
981+
}
982+
}
983+
return nil
984+
}
985+
986+
// ElideSkippedLayers removes backups that are skipped in the backup chain and
987+
// ensures only backups that will be used in the restore are returned.
988+
//
989+
// Note: This assumes that the provided backups are sorted in increasing order
990+
// by end time, and then sorted in increasing order by start time to break ties.
943991
func ElideSkippedLayers(
944992
uris []string, backups []backuppb.BackupManifest, loc []jobspb.RestoreDetails_BackupLocalityInfo,
945993
) ([]string, []backuppb.BackupManifest, []jobspb.RestoreDetails_BackupLocalityInfo, error) {
994+
uris, backups, loc = elideDuplicateEndTimes(uris, backups, loc)
946995
i := len(backups) - 1
947996
for i > 0 {
948997
// Find j such that backups[j] is parent of backups[i].
@@ -959,10 +1008,37 @@ func ElideSkippedLayers(
9591008
// Move up to check the chain from j now.
9601009
i = j
9611010
}
962-
9631011
return uris, backups, loc, nil
9641012
}
9651013

1014+
// elideDuplicateEndTimes ensures that backups in a list of backups are
1015+
// functionally unique by removing any duplicates that have the same end time,
1016+
// choosing backups with earlier start times and eliding the rest.
1017+
//
1018+
// Note: This assumes that the provided backups are sorted in increasing order
1019+
// by end time, and then sorted in increasing order by start time to break ties.
1020+
// This is the case for backups being returned by storage clients due to us
1021+
// encoding backup paths in a way that ensures this order.
1022+
func elideDuplicateEndTimes(
1023+
uris []string, backups []backuppb.BackupManifest, loc []jobspb.RestoreDetails_BackupLocalityInfo,
1024+
) ([]string, []backuppb.BackupManifest, []jobspb.RestoreDetails_BackupLocalityInfo) {
1025+
for i := range len(backups) - 1 {
1026+
j := i + 1
1027+
// Find j such that backups[j] no longer shares the same end time as
1028+
// backups[i].
1029+
for j < len(backups) && backups[i].EndTime.Equal(backups[j].EndTime) {
1030+
j++
1031+
}
1032+
// If there exists backups between i and j, remove them.
1033+
if j > i+1 {
1034+
uris = slices.Delete(uris, i+1, j)
1035+
backups = slices.Delete(backups, i+1, j)
1036+
loc = slices.Delete(loc, i+1, j)
1037+
}
1038+
}
1039+
return uris, backups, loc
1040+
}
1041+
9661042
// GetBackupIndexAtTime returns the index of the latest backup in
9671043
// `backupManifests` with a StartTime >= asOf.
9681044
func GetBackupIndexAtTime(

Diff for: pkg/backup/backupinfo/manifest_handling_test.go

+38-9
Original file line numberDiff line numberDiff line change
@@ -379,18 +379,46 @@ func TestMakeBackupCodec(t *testing.T) {
379379
func TestElideSkippedLayers(t *testing.T) {
380380
defer leaktest.AfterTest(t)()
381381

382+
// Note: The tests here work under the assumption that the input lists are
383+
// always sorted in ascending order by end time, and then sorted in ascending
384+
// order by start time.
382385
for _, tc := range []struct {
383386
name string
384387
times [][]int // len 2 slices of start and end time.
385-
expected []int // expected end times.
388+
expected [][]int // expected start and end times
386389
}{
387-
{"single", [][]int{{0, 1}}, []int{1}},
388-
{"double", [][]int{{0, 1}, {1, 2}}, []int{1, 2}},
389-
{"simple chain", [][]int{{0, 1}, {1, 2}, {2, 3}, {3, 5}, {5, 8}}, []int{1, 2, 3, 5, 8}},
390-
{"skip one", [][]int{{0, 1}, {1, 2}, {1, 3}, {3, 5}, {5, 8}}, []int{1, 3, 5, 8}},
391-
{"skip all", [][]int{{0, 1}, {1, 2}, {1, 3}, {3, 5}, {1, 8}}, []int{1, 8}},
392-
{"skip twice to first", [][]int{{0, 1}, {1, 2}, {1, 3}, {3, 5}, {3, 8}}, []int{1, 3, 8}},
393-
{"skip twice to second", [][]int{{0, 1}, {1, 2}, {1, 3}, {3, 5}, {2, 8}}, []int{1, 2, 8}},
390+
{"single", [][]int{{0, 1}}, [][]int{{0, 1}}},
391+
{"double", [][]int{{0, 1}, {1, 2}}, [][]int{{0, 1}, {1, 2}}},
392+
{
393+
"simple chain, no skips",
394+
[][]int{{0, 1}, {1, 2}, {2, 3}, {3, 5}, {5, 8}},
395+
[][]int{{0, 1}, {1, 2}, {2, 3}, {3, 5}, {5, 8}},
396+
},
397+
{
398+
"compaction of two backups",
399+
[][]int{{0, 1}, {1, 2}, {1, 3}, {2, 3}, {3, 5}, {5, 8}},
400+
[][]int{{0, 1}, {1, 3}, {3, 5}, {5, 8}},
401+
},
402+
{
403+
"compaction of entire chain",
404+
[][]int{{0, 1}, {1, 2}, {2, 3}, {3, 5}, {0, 8}, {5, 8}},
405+
[][]int{{0, 8}},
406+
},
407+
{
408+
"two compactions of two backups",
409+
[][]int{{0, 1}, {1, 2}, {1, 3}, {2, 3}, {3, 5}, {3, 8}, {5, 8}},
410+
[][]int{{0, 1}, {1, 3}, {3, 8}},
411+
},
412+
{
413+
"compaction includes a compacted backup in the middle",
414+
[][]int{{0, 1}, {1, 2}, {1, 3}, {2, 3}, {3, 5}, {1, 8}, {5, 8}},
415+
[][]int{{0, 1}, {1, 8}},
416+
},
417+
{
418+
"two compactions with the same end time",
419+
[][]int{{0, 1}, {1, 2}, {2, 3}, {3, 5}, {1, 8}, {3, 8}, {5, 8}},
420+
[][]int{{0, 1}, {1, 8}},
421+
},
394422
} {
395423
t.Run(tc.name, func(t *testing.T) {
396424
chain := make([]backuppb.BackupManifest, len(tc.times))
@@ -408,7 +436,8 @@ func TestElideSkippedLayers(t *testing.T) {
408436
require.Equal(t, len(tc.expected), len(locs))
409437
require.Equal(t, len(tc.expected), len(res))
410438
for i := range tc.expected {
411-
require.Equal(t, tc.expected[i], int(res[i].EndTime.WallTime), "expected %q\ngot: %q")
439+
actual := []int{int(res[i].StartTime.WallTime), int(res[i].EndTime.WallTime)}
440+
require.Equal(t, tc.expected[i], actual)
412441
}
413442
})
414443
}

Diff for: pkg/sql/vecindex/cspann/fixup_split.go

+43-56
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ import (
3939
// but instead are "forwarded" to the closest target sub-partition.
4040
// 7. Reload the splitting partition's vectors and copy the "left" subset to
4141
// the left sub-partition.
42-
// 8. Update the left sub-partition's state from Updating to Ready.
43-
// 9. Copy the "right" subset of vectors to the right sub-partition.
44-
// 10. Update the right sub-partition's state from Updating to Ready. At this
42+
// 8. Copy the "right" subset of vectors to the right sub-partition. At this
4543
// point, the splitting vectors are duplicated in the index. Any searches
4644
// will filter out duplicates.
45+
// 9. Update the left sub-partition's state from Updating to Ready.
46+
// 10. Update the right sub-partition's state from Updating to Ready.
4747
// 11. Remove the splitting partition from its parent. The duplicates are no
4848
// longer visible to searches.
4949
// 12. Delete the splitting partition from the index.
@@ -227,10 +227,30 @@ func (fw *fixupWorker) splitPartition(
227227
return err
228228
}
229229

230-
// Add vectors to nearest partition.
231-
err = fw.copyToSplitSubPartitions(ctx, partition, vectors, leftMetadata, rightMetadata)
232-
if err != nil {
233-
return err
230+
// If still updating the sub-partitions, then distribute vectors among them.
231+
if leftMetadata.StateDetails.State == UpdatingState {
232+
err = fw.copyToSplitSubPartitions(ctx, partition, vectors, leftMetadata, rightMetadata)
233+
if err != nil {
234+
return err
235+
}
236+
}
237+
238+
// Update sub-partition states from Updating to Ready.
239+
if leftMetadata.StateDetails.State == UpdatingState {
240+
expected := leftMetadata
241+
leftMetadata.StateDetails = MakeReadyDetails()
242+
err = fw.updateMetadata(ctx, leftPartitionKey, leftMetadata, expected)
243+
if err != nil {
244+
return err
245+
}
246+
}
247+
if rightMetadata.StateDetails.State == UpdatingState {
248+
expected := rightMetadata
249+
rightMetadata.StateDetails = MakeReadyDetails()
250+
err = fw.updateMetadata(ctx, rightPartitionKey, rightMetadata, expected)
251+
if err != nil {
252+
return err
253+
}
234254
}
235255

236256
// Check whether the splitting partition is the root.
@@ -615,8 +635,7 @@ func (fw *fixupWorker) deletePartition(
615635
}
616636

617637
// copyToSplitSubPartitions copies the given set of vectors to left and right
618-
// sub-partitions, based on which centroid they're closer to. It also updates
619-
// the state of each sub-partition from Updating to Ready.
638+
// sub-partitions, based on which centroid they're closer to.
620639
func (fw *fixupWorker) copyToSplitSubPartitions(
621640
ctx context.Context,
622641
sourcePartition *Partition,
@@ -659,57 +678,25 @@ func (fw *fixupWorker) copyToSplitSubPartitions(
659678
// transactional; if an error occurs, any vectors already added may not be
660679
// rolled back. This is OK, since the vectors are still present in the
661680
// source partition.
662-
if leftMetadata.StateDetails.State == UpdatingState {
663-
leftPartitionKey := sourceState.Target1
664-
err = fw.copyVectorsToSubPartition(ctx,
665-
leftPartitionKey, leftMetadata, leftVectors, leftChildKeys, leftValueBytes)
666-
if err != nil {
667-
return err
668-
}
669-
}
670-
if rightMetadata.StateDetails.State == UpdatingState {
671-
if sourcePartition.Level() != LeafLevel && vectors.Count == 1 {
672-
// This should have been a merge, not a split, but we're too far into the
673-
// split operation to back out now, so avoid an empty non-root partition by
674-
// duplicating the last remaining vector in both partitions.
675-
rightVectors = leftVectors
676-
rightChildKeys = leftChildKeys
677-
rightValueBytes = leftValueBytes
678-
}
679-
680-
rightPartitionKey := sourceState.Target2
681-
err = fw.copyVectorsToSubPartition(ctx,
682-
rightPartitionKey, rightMetadata, rightVectors, rightChildKeys, rightValueBytes)
683-
if err != nil {
684-
return err
685-
}
686-
}
687-
688-
return nil
689-
}
690-
691-
// copyVectorsToSubPartition copies the given set of vectors, along with
692-
// associated keys and values, to a split sub-partition with the given key. The
693-
// vectors will only be added if the partition's metadata matches the expected
694-
// value. It also updates the state of the partition from Updating to Ready.
695-
func (fw *fixupWorker) copyVectorsToSubPartition(
696-
ctx context.Context,
697-
partitionKey PartitionKey,
698-
metadata PartitionMetadata,
699-
vectors vector.Set,
700-
childKeys []ChildKey,
701-
valueBytes []ValueBytes,
702-
) error {
703-
// Add vectors to sub-partition, as long as metadata matches.
704-
err := fw.addToPartition(ctx, partitionKey, vectors, childKeys, valueBytes, metadata)
681+
leftPartitionKey := sourceState.Target1
682+
err = fw.addToPartition(ctx,
683+
leftPartitionKey, leftVectors, leftChildKeys, leftValueBytes, leftMetadata)
705684
if err != nil {
706685
return err
707686
}
708687

709-
// Update partition state from Updating to Ready.
710-
expected := metadata
711-
metadata.StateDetails = MakeReadyDetails()
712-
err = fw.updateMetadata(ctx, partitionKey, metadata, expected)
688+
if sourcePartition.Level() != LeafLevel && vectors.Count == 1 {
689+
// This should have been a merge, not a split, but we're too far into the
690+
// split operation to back out now, so avoid an empty non-root partition by
691+
// duplicating the last remaining vector in both partitions.
692+
rightVectors = leftVectors
693+
rightChildKeys = leftChildKeys
694+
rightValueBytes = leftValueBytes
695+
}
696+
697+
rightPartitionKey := sourceState.Target2
698+
err = fw.addToPartition(ctx,
699+
rightPartitionKey, rightVectors, rightChildKeys, rightValueBytes, rightMetadata)
713700
if err != nil {
714701
return err
715702
}

0 commit comments

Comments
 (0)