Skip to content

Commit e905172

Browse files
committed
db: change MaxConcurrentCompactions() to return a range
`MaxConcurrentCompactions()` returns an upper limit on the compaction concurrency. Within this upper limit, Pebble decides what the concurrency limit is (depending on L0 amplification and compaction debt). There are cases where we want more concurrency for other reasons (like space amp). Having a knob can be useful to tweak things in a production environment. This change changes `MaxConcurrentCompactions()` to `CompactionConcurrencyRange()` which returns both a lower and upper value. The lower limit is the "baseline" value for the concurrency, which Pebble can increase dynamically up to the upper limit. Prior to this change, the (implicit) lower limit was always 1.
1 parent 7c06a36 commit e905172

20 files changed

+178
-85
lines changed

cmd/pebble/db.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ func newPebbleDB(dir string) DB {
7272
Merger: &pebble.Merger{
7373
Name: "cockroach_merge_operator",
7474
},
75-
MaxConcurrentCompactions: func() int {
76-
return 3
75+
CompactionConcurrencyRange: func() (int, int) {
76+
return 1, 3
7777
},
7878
}
7979
// In FormatColumnarBlocks (the value of FormatNewest at the time of

cmd/pebble/replay_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,15 @@ func TestParseOptionsStr(t *testing.T) {
2121
testCases := []testCase{
2222
{
2323
c: replayConfig{optionsString: `[Options] max_concurrent_compactions=9`},
24-
options: &pebble.Options{MaxConcurrentCompactions: func() int { return 9 }},
24+
options: &pebble.Options{CompactionConcurrencyRange: func() (int, int) { return 1, 9 }},
25+
},
26+
{
27+
c: replayConfig{optionsString: `[Options] concurrent_compactions=4`},
28+
options: &pebble.Options{CompactionConcurrencyRange: func() (int, int) { return 4, 4 }},
29+
},
30+
{
31+
c: replayConfig{optionsString: `[Options] concurrent_compactions=4 max_concurrent_compactions=9`},
32+
options: &pebble.Options{CompactionConcurrencyRange: func() (int, int) { return 4, 9 }},
2533
},
2634
{
2735
c: replayConfig{optionsString: `[Options] bytes_per_sync=90000`},

compaction.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64
5757
// than this threshold before expansion.
5858
//
5959
// NB: this heuristic is an approximation since we may run more compactions
60-
// than MaxConcurrentCompactions.
61-
diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions())
60+
// than the upper concurrency limit.
61+
_, maxConcurrency := opts.CompactionConcurrencyRange()
62+
diskMax := (availBytes / 2) / uint64(maxConcurrency)
6263
if v > diskMax {
6364
v = diskMax
6465
}
@@ -1918,7 +1919,7 @@ func (d *DB) GetAllowedWithoutPermission() int {
19181919
allowedBasedOnManual := 0
19191920
manualBacklog := int(d.mu.compact.manualLen.Load())
19201921
if manualBacklog > 0 {
1921-
maxAllowed := d.opts.MaxConcurrentCompactions()
1922+
_, maxAllowed := d.opts.CompactionConcurrencyRange()
19221923
allowedBasedOnManual = min(maxAllowed, manualBacklog+allowedBasedOnBacklog)
19231924
}
19241925
return max(allowedBasedOnBacklog, allowedBasedOnManual)
@@ -1982,10 +1983,12 @@ func (d *DB) pickManualCompaction(env compactionEnv) (pc *pickedCompaction) {
19821983
// Returns true iff a compaction was started.
19831984
func (d *DB) tryScheduleDeleteOnlyCompaction() bool {
19841985
if d.opts.private.disableDeleteOnlyCompactions || d.opts.DisableAutomaticCompactions ||
1985-
d.mu.compact.compactingCount >= d.opts.MaxConcurrentCompactions() ||
19861986
len(d.mu.compact.deletionHints) == 0 {
19871987
return false
19881988
}
1989+
if _, maxConcurrency := d.opts.CompactionConcurrencyRange(); d.mu.compact.compactingCount >= maxConcurrency {
1990+
return false
1991+
}
19891992
v := d.mu.versions.currentVersion()
19901993
snapshots := d.mu.snapshots.toSlice()
19911994
// We need to save the value of exciseEnabled in the compaction itself, as

compaction_picker.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,7 +1221,10 @@ func responsibleForGarbageBytes(
12211221
}
12221222

12231223
func (p *compactionPickerByScore) getCompactionConcurrency() int {
1224-
maxConcurrentCompactions := p.opts.MaxConcurrentCompactions()
1224+
lower, upper := p.opts.CompactionConcurrencyRange()
1225+
if lower >= upper {
1226+
return upper
1227+
}
12251228
// Compaction concurrency is controlled by L0 read-amp. We allow one
12261229
// additional compaction per L0CompactionConcurrency sublevels, as well as
12271230
// one additional compaction per CompactionDebtConcurrency bytes of
@@ -1238,23 +1241,25 @@ func (p *compactionPickerByScore) getCompactionConcurrency() int {
12381241
// Rearranging,
12391242
// n <= l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency.
12401243
// So we can run up to
1241-
// l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency + 1 compactions
1242-
l0ReadAmpCompactions := 1
1244+
// l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency extra compactions.
1245+
l0ReadAmpCompactions := 0
12431246
if p.opts.Experimental.L0CompactionConcurrency > 0 {
12441247
l0ReadAmp := p.l0Organizer.MaxDepthAfterOngoingCompactions()
1245-
l0ReadAmpCompactions = (l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency) + 1
1248+
l0ReadAmpCompactions = (l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency)
12461249
}
12471250
// compactionDebt >= ccSignal2 then can run another compaction, where
12481251
// ccSignal2 = uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
12491252
// Rearranging,
12501253
// n <= compactionDebt / p.opts.Experimental.CompactionDebtConcurrency
12511254
// So we can run up to
1252-
// compactionDebt / p.opts.Experimental.CompactionDebtConcurrency + 1 compactions.
1253-
compactionDebtCompactions := 1
1255+
// compactionDebt / p.opts.Experimental.CompactionDebtConcurrency extra
1256+
// compactions.
1257+
compactionDebtCompactions := 0
12541258
if p.opts.Experimental.CompactionDebtConcurrency > 0 {
12551259
compactionDebt := p.estimatedCompactionDebt(0)
1256-
compactionDebtCompactions = int(compactionDebt/p.opts.Experimental.CompactionDebtConcurrency) + 1
1260+
compactionDebtCompactions = int(compactionDebt / p.opts.Experimental.CompactionDebtConcurrency)
12571261
}
1262+
12581263
compactableGarbageCompactions := 0
12591264
garbageFractionLimit := p.opts.Experimental.CompactionGarbageFractionForMaxConcurrency()
12601265
if garbageFractionLimit > 0 && p.dbSizeBytes > 0 {
@@ -1263,11 +1268,12 @@ func (p *compactionPickerByScore) getCompactionConcurrency() int {
12631268
*rangeDeletionsBytesEstimateAnnotator.MultiLevelAnnotation(p.vers.Levels[:])
12641269
garbageFraction := float64(compactableGarbageBytes) / float64(p.dbSizeBytes)
12651270
compactableGarbageCompactions =
1266-
int((garbageFraction / garbageFractionLimit) * float64(maxConcurrentCompactions))
1271+
int((garbageFraction / garbageFractionLimit) * float64(upper-lower))
12671272
}
1268-
concurrencyBasedOnSignals :=
1269-
max(l0ReadAmpCompactions, compactionDebtCompactions, compactableGarbageCompactions)
1270-
return min(maxConcurrentCompactions, max(concurrencyBasedOnSignals, 1))
1273+
1274+
extraCompactions := max(l0ReadAmpCompactions, compactionDebtCompactions, compactableGarbageCompactions, 0)
1275+
1276+
return min(lower+extraCompactions, upper)
12711277
}
12721278

12731279
// TODO(sumeer): remove unless someone actually finds this useful.

compaction_picker_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,8 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
218218
if d.HasArg("max-concurrent-compactions") {
219219
d.ScanArgs(t, "max-concurrent-compactions", &maxConcurrentCompactions)
220220
}
221-
opts.MaxConcurrentCompactions = func() int {
222-
return maxConcurrentCompactions
221+
opts.CompactionConcurrencyRange = func() (int, int) {
222+
return 1, maxConcurrentCompactions
223223
}
224224
if d.HasArg("compaction-debt-concurrency") {
225225
d.ScanArgs(t, "compaction-debt-concurrency", &opts.Experimental.CompactionDebtConcurrency)
@@ -647,7 +647,8 @@ func TestCompactionPickerL0(t *testing.T) {
647647
func TestCompactionPickerConcurrency(t *testing.T) {
648648
opts := DefaultOptions()
649649
opts.Experimental.L0CompactionConcurrency = 1
650-
opts.MaxConcurrentCompactions = func() int { return 4 }
650+
lowerConcurrencyLimit, upperConcurrencyLimit := 1, 4
651+
opts.CompactionConcurrencyRange = func() (int, int) { return lowerConcurrencyLimit, upperConcurrencyLimit }
651652

652653
parseMeta := func(s string) (*tableMetadata, error) {
653654
parts := strings.Split(s, ":")
@@ -815,6 +816,7 @@ func TestCompactionPickerConcurrency(t *testing.T) {
815816
td.MaybeScanArgs(t, "l0_compaction_threshold", &opts.L0CompactionThreshold)
816817
td.MaybeScanArgs(t, "l0_compaction_concurrency", &opts.Experimental.L0CompactionConcurrency)
817818
td.MaybeScanArgs(t, "compaction_debt_concurrency", &opts.Experimental.CompactionDebtConcurrency)
819+
td.MaybeScanArgs(t, "concurrency", &lowerConcurrencyLimit, &upperConcurrencyLimit)
818820

819821
env := compactionEnv{
820822
earliestUnflushedSeqNum: math.MaxUint64,
@@ -842,8 +844,10 @@ func TestCompactionPickerConcurrency(t *testing.T) {
842844
fmt.Fprintf(&result, "nil")
843845
}
844846
return result.String()
847+
848+
default:
849+
return fmt.Sprintf("unrecognized command: %s", td.Cmd)
845850
}
846-
return fmt.Sprintf("unrecognized command: %s", td.Cmd)
847851
})
848852
}
849853

compaction_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,10 +1157,12 @@ func TestCompaction(t *testing.T) {
11571157
return ""
11581158

11591159
case "set-concurrent-compactions":
1160-
var concurrentCompactions int
1161-
td.ScanArgs(t, "num", &concurrentCompactions)
1162-
d.opts.MaxConcurrentCompactions = func() int {
1163-
return concurrentCompactions
1160+
lower := 1
1161+
upper := 1
1162+
td.MaybeScanArgs(t, "max", &upper)
1163+
td.MaybeScanArgs(t, "range", &lower, upper)
1164+
d.opts.CompactionConcurrencyRange = func() (int, int) {
1165+
return lower, upper
11641166
}
11651167
return ""
11661168

db_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1187,8 +1187,8 @@ func TestDBConcurrentCompactClose(t *testing.T) {
11871187
for i := 0; i < 100; i++ {
11881188
opts := &Options{
11891189
FS: mem,
1190-
MaxConcurrentCompactions: func() int {
1191-
return 2
1190+
CompactionConcurrencyRange: func() (int, int) {
1191+
return 1, 2
11921192
},
11931193
}
11941194
d, err := Open("", testingRandomized(t, opts))
@@ -1522,8 +1522,8 @@ func TestMemtableIngestInversion(t *testing.T) {
15221522
MemTableStopWritesThreshold: 1000,
15231523
L0StopWritesThreshold: 1000,
15241524
L0CompactionThreshold: 2,
1525-
MaxConcurrentCompactions: func() int {
1526-
return 1000
1525+
CompactionConcurrencyRange: func() (int, int) {
1526+
return 1, 1000
15271527
},
15281528
}
15291529

error_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ func TestDBCompactionCrash(t *testing.T) {
460460
FS: fs,
461461
Logger: testLogger{t: t},
462462
MemTableSize: 128 << 10,
463-
MaxConcurrentCompactions: func() int { return maxConcurrentCompactions },
463+
CompactionConcurrencyRange: func() (int, int) { return 1, maxConcurrentCompactions },
464464
LBaseMaxBytes: 64 << 10,
465465
L0CompactionThreshold: 2,
466466
L0CompactionFileThreshold: 2,

iterator_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2130,10 +2130,10 @@ func TestRangeKeyMaskingRandomized(t *testing.T) {
21302130
maxProcs := runtime.GOMAXPROCS(0)
21312131

21322132
opts1 := &Options{
2133-
FS: vfs.NewCrashableMem(),
2134-
Comparer: testkeys.Comparer,
2135-
FormatMajorVersion: FormatNewest,
2136-
MaxConcurrentCompactions: func() int { return maxProcs/2 + 1 },
2133+
FS: vfs.NewCrashableMem(),
2134+
Comparer: testkeys.Comparer,
2135+
FormatMajorVersion: FormatNewest,
2136+
CompactionConcurrencyRange: func() (int, int) { return 1, maxProcs/2 + 1 },
21372137
BlockPropertyCollectors: []func() BlockPropertyCollector{
21382138
sstable.NewTestKeysBlockPropertyCollector,
21392139
},
@@ -2143,10 +2143,10 @@ func TestRangeKeyMaskingRandomized(t *testing.T) {
21432143
require.NoError(t, err)
21442144

21452145
opts2 := &Options{
2146-
FS: vfs.NewCrashableMem(),
2147-
Comparer: testkeys.Comparer,
2148-
FormatMajorVersion: FormatNewest,
2149-
MaxConcurrentCompactions: func() int { return maxProcs/2 + 1 },
2146+
FS: vfs.NewCrashableMem(),
2147+
Comparer: testkeys.Comparer,
2148+
FormatMajorVersion: FormatNewest,
2149+
CompactionConcurrencyRange: func() (int, int) { return 1, maxProcs/2 + 1 },
21502150
BlockPropertyCollectors: []func() BlockPropertyCollector{
21512151
sstable.NewTestKeysBlockPropertyCollector,
21522152
},
@@ -2305,10 +2305,10 @@ func BenchmarkIterator_RangeKeyMasking(b *testing.B) {
23052305
mem := vfs.NewMem()
23062306
maxProcs := runtime.GOMAXPROCS(0)
23072307
opts := &Options{
2308-
FS: mem,
2309-
Comparer: testkeys.Comparer,
2310-
FormatMajorVersion: FormatNewest,
2311-
MaxConcurrentCompactions: func() int { return maxProcs/2 + 1 },
2308+
FS: mem,
2309+
Comparer: testkeys.Comparer,
2310+
FormatMajorVersion: FormatNewest,
2311+
CompactionConcurrencyRange: func() (int, int) { return 1, maxProcs/2 + 1 },
23122312
BlockPropertyCollectors: []func() BlockPropertyCollector{
23132313
sstable.NewTestKeysBlockPropertyCollector,
23142314
},

metamorphic/options.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,8 +687,12 @@ func RandomOptions(
687687
}
688688
opts.LBaseMaxBytes = 1 << uint(rng.IntN(30)) // 1B - 1GB
689689
maxConcurrentCompactions := rng.IntN(3) + 1 // 1-3
690-
opts.MaxConcurrentCompactions = func() int {
691-
return maxConcurrentCompactions
690+
minConcurrentCompactions := 1
691+
if rng.IntN(4) == 0 {
692+
minConcurrentCompactions = rng.IntN(maxConcurrentCompactions) + 1
693+
}
694+
opts.CompactionConcurrencyRange = func() (int, int) {
695+
return minConcurrentCompactions, maxConcurrentCompactions
692696
}
693697
// [-0.2, 0.4], in steps of 0.2.
694698
garbageFrac := float64(rng.IntN(5))/5.0 - 0.2

metamorphic/options_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func TestOptionsRoundtrip(t *testing.T) {
7171
// Function pointers
7272
"BlockPropertyCollectors:",
7373
"EventListener:",
74-
"MaxConcurrentCompactions:",
74+
"CompactionConcurrencyRange:",
7575
"MaxConcurrentDownloads:",
7676
"Experimental.CompactionGarbageFractionForMaxConcurrency:",
7777
"Experimental.DisableIngestAsFlushable:",
@@ -118,13 +118,19 @@ func TestOptionsRoundtrip(t *testing.T) {
118118
if o.Opts.Experimental.IngestSplit != nil && o.Opts.Experimental.IngestSplit() {
119119
require.Equal(t, o.Opts.Experimental.IngestSplit(), parsed.Opts.Experimental.IngestSplit())
120120
}
121+
121122
require.Equal(t, o.Opts.Experimental.CompactionGarbageFractionForMaxConcurrency == nil,
122123
parsed.Opts.Experimental.CompactionGarbageFractionForMaxConcurrency == nil)
123124
if o.Opts.Experimental.CompactionGarbageFractionForMaxConcurrency != nil {
124125
require.InDelta(t, o.Opts.Experimental.CompactionGarbageFractionForMaxConcurrency(),
125126
parsed.Opts.Experimental.CompactionGarbageFractionForMaxConcurrency(), 1e-5)
126127
}
127-
require.Equal(t, o.Opts.MaxConcurrentCompactions(), parsed.Opts.MaxConcurrentCompactions())
128+
129+
expBaseline, expUpper := o.Opts.CompactionConcurrencyRange()
130+
parsedBaseline, parsedUpper := parsed.Opts.CompactionConcurrencyRange()
131+
require.Equal(t, expBaseline, parsedBaseline)
132+
require.Equal(t, expUpper, parsedUpper)
133+
128134
require.Equal(t, o.Opts.MaxConcurrentDownloads(), parsed.Opts.MaxConcurrentDownloads())
129135
require.Equal(t, len(o.Opts.BlockPropertyCollectors), len(parsed.Opts.BlockPropertyCollectors))
130136

0 commit comments

Comments
 (0)