@@ -44,8 +44,8 @@ var v3_6 = semver.Version{Major: 3, Minor: 6}
44
44
var (
45
45
forever = time.Time {}
46
46
47
- // maximum number of leases to revoke per second; configurable for tests
48
- leaseRevokeRate = 1000
47
+ // default number of leases to revoke per second; configurable for tests
48
+ defaultLeaseRevokeRate = 1000
49
49
50
50
// maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
51
51
leaseCheckpointRate = 1000
@@ -172,6 +172,9 @@ type lessor struct {
172
172
// requests for shorter TTLs are extended to the minimum TTL.
173
173
minLeaseTTL int64
174
174
175
+ // maximum number of leases to revoke per second
176
+ leaseRevokeRate int
177
+
175
178
expiredC chan []* Lease
176
179
// stopC is a channel whose closure indicates that the lessor should be stopped.
177
180
stopC chan struct {}
@@ -200,6 +203,8 @@ type LessorConfig struct {
200
203
CheckpointInterval time.Duration
201
204
ExpiredLeasesRetryInterval time.Duration
202
205
CheckpointPersist bool
206
+
207
+ leaseRevokeRate int
203
208
}
204
209
205
210
func NewLessor (lg * zap.Logger , b backend.Backend , cluster cluster , cfg LessorConfig ) Lessor {
@@ -209,19 +214,24 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorCon
209
214
func newLessor (lg * zap.Logger , b backend.Backend , cluster cluster , cfg LessorConfig ) * lessor {
210
215
checkpointInterval := cfg .CheckpointInterval
211
216
expiredLeaseRetryInterval := cfg .ExpiredLeasesRetryInterval
217
+ leaseRevokeRate := cfg .leaseRevokeRate
212
218
if checkpointInterval == 0 {
213
219
checkpointInterval = defaultLeaseCheckpointInterval
214
220
}
215
221
if expiredLeaseRetryInterval == 0 {
216
222
expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval
217
223
}
224
+ if leaseRevokeRate == 0 {
225
+ leaseRevokeRate = defaultLeaseRevokeRate
226
+ }
218
227
l := & lessor {
219
228
leaseMap : make (map [LeaseID ]* Lease ),
220
229
itemMap : make (map [LeaseItem ]LeaseID ),
221
230
leaseExpiredNotifier : newLeaseExpiredNotifier (),
222
231
leaseCheckpointHeap : make (LeaseQueue , 0 ),
223
232
b : b ,
224
233
minLeaseTTL : cfg .MinLeaseTTL ,
234
+ leaseRevokeRate : leaseRevokeRate ,
225
235
checkpointInterval : checkpointInterval ,
226
236
expiredLeaseRetryInterval : expiredLeaseRetryInterval ,
227
237
checkpointPersist : cfg .CheckpointPersist ,
@@ -474,7 +484,7 @@ func (le *lessor) Promote(extend time.Duration) {
474
484
le .scheduleCheckpointIfNeeded (l )
475
485
}
476
486
477
- if len (le .leaseMap ) < leaseRevokeRate {
487
+ if len (le .leaseMap ) < le . leaseRevokeRate {
478
488
// no possibility of lease pile-up
479
489
return
480
490
}
@@ -488,7 +498,7 @@ func (le *lessor) Promote(extend time.Duration) {
488
498
expires := 0
489
499
// have fewer expires than the total revoke rate so piled up leases
490
500
// don't consume the entire revoke limit
491
- targetExpiresPerSecond := (3 * leaseRevokeRate ) / 4
501
+ targetExpiresPerSecond := (3 * le . leaseRevokeRate ) / 4
492
502
for _ , l := range leases {
493
503
remaining := l .Remaining ()
494
504
if remaining > nextWindow {
@@ -627,7 +637,7 @@ func (le *lessor) revokeExpiredLeases() {
627
637
var ls []* Lease
628
638
629
639
// rate limit
630
- revokeLimit := leaseRevokeRate / 2
640
+ revokeLimit := le . leaseRevokeRate / 2
631
641
632
642
le .mu .RLock ()
633
643
if le .isPrimary () {
0 commit comments