@@ -2843,38 +2843,48 @@ func TestCompactionErrorStats(t *testing.T) {
2843
2843
func TestCompactionCorruption (t * testing.T ) {
2844
2844
mem := vfs .NewMem ()
2845
2845
var numFinishedCompactions atomic.Int32
2846
+ var once sync.Once
2846
2847
opts := & Options {
2847
2848
FS : mem ,
2848
2849
FormatMajorVersion : FormatNewest ,
2849
2850
EventListener : & EventListener {
2851
+ BackgroundError : func (error ) {},
2850
2852
DataCorruption : func (info DataCorruptionInfo ) {
2851
2853
if testing .Verbose () {
2852
- fmt .Printf ("got expected data corruption: %s\n " , info .Path )
2854
+ once .Do (func () { fmt .Printf ("got expected data corruption: %s\n " , info .Path ) })
2855
+ }
2856
+ },
2857
+ CompactionBegin : func (info CompactionInfo ) {
2858
+ if testing .Verbose () {
2859
+ fmt .Printf ("%d: compaction begin (L%d)\n " , info .JobID , info .Output .Level )
2853
2860
}
2854
2861
},
2855
2862
CompactionEnd : func (info CompactionInfo ) {
2863
+ if testing .Verbose () {
2864
+ fmt .Printf ("%d: compaction end (L%d)\n " , info .JobID , info .Output .Level )
2865
+ }
2856
2866
if info .Err == nil {
2857
2867
numFinishedCompactions .Add (1 )
2858
2868
}
2859
2869
},
2860
2870
},
2871
+ L0CompactionThreshold : 1 ,
2872
+ L0CompactionFileThreshold : 10 ,
2861
2873
}
2862
2874
opts .WithFSDefaults ()
2863
2875
remoteStorage := remote .NewInMem ()
2864
2876
opts .Experimental .RemoteStorage = remote .MakeSimpleFactory (map [remote.Locator ]remote.Storage {
2865
2877
"external-locator" : remoteStorage ,
2866
2878
})
2879
+ opts .EnsureDefaults ()
2880
+ opts .Levels [0 ].TargetFileSize = 8192
2867
2881
d , err := Open ("" , opts )
2868
2882
require .NoError (t , err )
2869
2883
2870
2884
var now crtime.AtomicMono
2871
2885
now .Store (1 )
2872
2886
d .problemSpans .InitForTesting (manifest .NumLevels , d .cmp , func () crtime.Mono { return now .Load () })
2873
2887
2874
- randKey := func () []byte {
2875
- return []byte {'a' + byte (rand .IntN (26 ))}
2876
- }
2877
-
2878
2888
var workloadWG sync.WaitGroup
2879
2889
var stopWorkload atomic.Bool
2880
2890
defer stopWorkload .Store (true )
@@ -2885,12 +2895,14 @@ func TestCompactionCorruption(t *testing.T) {
2885
2895
defer workloadWG .Done ()
2886
2896
for ! stopWorkload .Load () {
2887
2897
b := d .NewBatch ()
2888
- v := make ([]byte , 100 + rand .IntN (1000 ))
2889
- for i := range v {
2890
- v [i ] = byte (rand .Uint32 ())
2891
- }
2898
+ // Write some random keys of the form a012345.
2892
2899
for i := 0 ; i < 100 ; i ++ {
2893
- if err := b .Set (randKey (), v , nil ); err != nil {
2900
+ v := make ([]byte , 100 + rand .IntN (100 ))
2901
+ for i := range v {
2902
+ v [i ] = byte (rand .Uint32 ())
2903
+ }
2904
+ key := fmt .Sprintf ("%c%06d" , 'a' + byte (rand .IntN (int ('z' - 'a' + 1 ))), rand .IntN (1000000 ))
2905
+ if err := b .Set ([]byte (key ), v , nil ); err != nil {
2894
2906
panic (err )
2895
2907
}
2896
2908
}
@@ -2900,12 +2912,24 @@ func TestCompactionCorruption(t *testing.T) {
2900
2912
if err := d .Flush (); err != nil {
2901
2913
panic (err )
2902
2914
}
2903
- time .Sleep (10 * time .Microsecond )
2915
+ time .Sleep (10 * time .Millisecond )
2904
2916
}
2905
2917
}()
2906
2918
}
2907
2919
2908
2920
datadriven .RunTest (t , "testdata/compaction_corruption" , func (t * testing.T , td * datadriven.TestData ) string {
2921
+ // wait until fn() returns true.
2922
+ wait := func (what string , fn func () bool ) {
2923
+ const timeout = 2 * time .Minute
2924
+ start := time .Now ()
2925
+ for ! fn () {
2926
+ if time .Since (start ) > timeout {
2927
+ td .Fatalf (t , "timeout waiting for %s\n %s\n " , what , d .DebugString ())
2928
+ }
2929
+ time .Sleep (10 * time .Millisecond )
2930
+ }
2931
+ }
2932
+
2909
2933
switch td .Cmd {
2910
2934
case "build-remote" :
2911
2935
require .NoError (t , runBuildRemoteCmd (td , d , remoteStorage ))
@@ -2942,37 +2966,26 @@ func TestCompactionCorruption(t *testing.T) {
2942
2966
workloadWG .Wait ()
2943
2967
2944
2968
case "wait-for-problem-span" :
2945
- timeout := time .Now ().Add (100 * time .Second )
2946
- for d .problemSpans .IsEmpty () {
2947
- if timeout .Before (time .Now ()) {
2948
- td .Fatalf (t , "timeout waiting for problem span" )
2949
- }
2950
- time .Sleep (10 * time .Millisecond )
2951
- }
2969
+ wait ("problem span" , func () bool {
2970
+ return ! d .problemSpans .IsEmpty ()
2971
+ })
2952
2972
if testing .Verbose () {
2953
2973
fmt .Printf ("%s: wait-for-problem-span:\n %s" , td .Pos , d .problemSpans .String ())
2954
2974
}
2955
2975
2956
2976
case "wait-for-compactions" :
2957
2977
target := numFinishedCompactions .Load () + 5
2958
- timeout := time .Now ().Add (10 * time .Second )
2959
- for numFinishedCompactions .Load () < target {
2960
- if timeout .Before (time .Now ()) {
2961
- td .Fatalf (t , "timeout waiting for compactions" )
2962
- }
2963
- time .Sleep (10 * time .Millisecond )
2964
- }
2978
+ wait ("compactions" , func () bool {
2979
+ return numFinishedCompactions .Load () >= target
2980
+ })
2965
2981
2966
2982
case "expire-spans" :
2967
2983
now .Store (now .Load () + crtime .Mono (30 * time .Minute ))
2968
2984
2969
2985
case "wait-for-no-external-files" :
2970
- timeout := time .Now ().Add (10 * time .Second )
2971
- for hasExternalFiles (d ) {
2972
- if timeout .Before (time .Now ()) {
2973
- td .Fatalf (t , "timeout waiting for compactions" )
2974
- }
2975
- }
2986
+ wait ("no external files" , func () bool {
2987
+ return ! hasExternalFiles (d )
2988
+ })
2976
2989
2977
2990
default :
2978
2991
return fmt .Sprintf ("unknown command: %s" , td .Cmd )
0 commit comments