Skip to content

Commit e8c7bdc

Browse files
craig[bot]tbg
craig[bot]
andcommitted
Merge #97779
97779: kvserver: avoid reusing proposal in tryReproposeWithNewLeaseIndex r=erikgrinaker a=tbg The previous reproposal mechanism is quite complex. We can side-step a good amount of the complexity by not re-using proposals. Instead, when the entry associated to a local proposal gets rejected by the `LeaseAppliedIndex`, we mint a "new" proposal (identical to the old but with a new command ID) which is now responsible for notifying the caller. This is introduced piecemeal, and the "old way" is preserved for continued metamorphic testing (and a guaranteed way to be able to fall back, should we see fallout over the next couple of weeks). Phasing out the gating var `useReproposalsV2` is tracked separately[^1]. [^1]: #105625 Epic: CRDB-25287 Release note: None Co-authored-by: Tobias Grieger <[email protected]>
2 parents 45be076 + 120684d commit e8c7bdc

9 files changed

+436
-14
lines changed

pkg/kv/kvserver/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ go_test(
297297
"range_log_test.go",
298298
"rebalance_objective_test.go",
299299
"replica_application_cmd_buf_test.go",
300+
"replica_application_result_test.go",
300301
"replica_application_state_machine_test.go",
301302
"replica_batch_updates_test.go",
302303
"replica_circuit_breaker_test.go",

pkg/kv/kvserver/replica_application_cmd.go

+3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ type replicatedCmd struct {
5252
// finishTracingSpan. This span "follows from" the proposer's span (even
5353
// when the proposer is remote; we marshall tracing info through the
5454
// proposal).
55+
//
56+
// For local commands, we also have a `ProposalData` which may have a span
57+
// as well, and if it does, this span will follow from it.
5558
sp *tracing.Span
5659

5760
// splitMergeUnlock is acquired for splits and merges when they are staged

pkg/kv/kvserver/replica_application_decoder.go

+69
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
1818
"github.com/cockroachdb/cockroach/pkg/util"
1919
"github.com/cockroachdb/cockroach/pkg/util/log"
20+
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
2021
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
2122
"github.com/cockroachdb/cockroach/pkg/util/tracing"
23+
"github.com/cockroachdb/errors"
2224
"go.etcd.io/raft/v3/raftpb"
2325
)
2426

@@ -73,10 +75,77 @@ func (d *replicaDecoder) decode(ctx context.Context, ents []raftpb.Entry) error
7375
return nil
7476
}
7577

78+
// retrieveLocalProposalsV2 is used with useReproposalsV2, replacing a call
79+
// to retrieveLocalProposals. The V2 implementation is simpler because a log
80+
// entry that comes up for a local proposal can always consume that proposal
81+
// from the map because V2 never mutates the MaxLeaseIndex for the same proposal.
82+
// In contrast, with V1, we can only remove the proposal from the map once we
83+
// have found a log entry that had a matching MaxLeaseIndex. This lead to the
84+
// complexity of having multiple entries associated to the same proposal during
85+
// application.
86+
func (d *replicaDecoder) retrieveLocalProposalsV2() (anyLocal bool) {
87+
d.r.mu.Lock()
88+
defer d.r.mu.Unlock()
89+
90+
var it replicatedCmdBufSlice
91+
92+
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
93+
cmd := it.cur()
94+
cmd.proposal = d.r.mu.proposals[cmd.ID]
95+
var alloc *quotapool.IntAlloc
96+
if cmd.proposal != nil {
97+
// INVARIANT: a proposal is consumed (i.e. removed from the proposals map)
98+
// the first time it comes up for application. (If the proposal fails due
99+
// to an illegal LeaseAppliedIndex, a new proposal might be spawned to
100+
// retry but that proposal will be unrelated as far as log application is
101+
// concerned).
102+
//
103+
// INVARIANT: local proposals are not in the proposals map while being
104+
// applied, and they never re-enter the proposals map either during or
105+
// afterwards.
106+
//
107+
// (propBuf.{Insert,ReinsertLocked} ignores proposals that have
108+
// v2SeenDuringApplicationSet to make this true).
109+
if cmd.proposal.v2SeenDuringApplication {
110+
err := errors.AssertionFailedf("ProposalData seen twice during application: %+v", cmd.proposal)
111+
logcrash.ReportOrPanic(d.r.AnnotateCtx(cmd.ctx), &d.r.store.ClusterSettings().SV, "%v", err)
112+
// If we didn't panic, treat the proposal as non-local. This makes sure
113+
// we don't repropose it under a new lease index.
114+
cmd.proposal = nil
115+
} else {
116+
cmd.proposal.v2SeenDuringApplication = true
117+
anyLocal = true
118+
delete(d.r.mu.proposals, cmd.ID)
119+
if d.r.mu.proposalQuota != nil {
120+
alloc = cmd.proposal.quotaAlloc
121+
cmd.proposal.quotaAlloc = nil
122+
}
123+
}
124+
}
125+
126+
// NB: this may append nil. It's intentional. The quota release queue
127+
// needs to have one slice entry per entry applied (even if the entry
128+
// is rejected).
129+
//
130+
// TODO(tbg): there used to be an optimization where we'd elide mutating
131+
// this slice until we saw a local proposal under a populated
132+
// b.r.mu.proposalQuota. We can bring it back.
133+
if d.r.mu.proposalQuota != nil {
134+
d.r.mu.quotaReleaseQueue = append(d.r.mu.quotaReleaseQueue, alloc)
135+
}
136+
}
137+
return anyLocal
138+
}
139+
76140
// retrieveLocalProposals binds each of the decoder's commands to their local
77141
// proposals if they were proposed locally. The method also sets the ctx fields
78142
// on all commands.
79143
func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal bool) {
144+
if useReproposalsV2 {
145+
// NB: we *must* use this new code for correctness, since we have an invariant
146+
// described within.
147+
return d.retrieveLocalProposalsV2()
148+
}
80149
d.r.mu.Lock()
81150
defer d.r.mu.Unlock()
82151
// Assign all the local proposals first then delete all of them from the map

pkg/kv/kvserver/replica_application_result.go

+182-3
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,27 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1717
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1818
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
19+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
1920
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
2021
"github.com/cockroachdb/cockroach/pkg/roachpb"
2122
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
23+
"github.com/cockroachdb/cockroach/pkg/util"
2224
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2325
"github.com/cockroachdb/cockroach/pkg/util/log"
2426
"github.com/cockroachdb/cockroach/pkg/util/stop"
2527
"github.com/cockroachdb/errors"
2628
"go.etcd.io/raft/v3"
2729
)
2830

31+
// useReproposalsV2 activates prototype code that instead of reproposing using a
32+
// modified lease index makes a new proposal (different CmdID), transfers the
33+
// waiting caller (if any) to it, and proposes that. With this strategy, the
34+
// *RaftCommand associated to a proposal becomes immutable, which simplifies the
35+
// mental model and allows various simplifications in the proposal pipeline. For
36+
// now, the old and new behavior coexist, and we want to keep exercising both.
37+
// Once we have confidence, we'll hard-code true and remove all old code paths.
38+
var useReproposalsV2 = util.ConstantWithMetamorphicTestBool("reproposals-v2", true)
39+
2940
// replica_application_*.go files provide concrete implementations of
3041
// the interfaces defined in the storage/apply package:
3142
//
@@ -105,6 +116,16 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
105116
case kvserverbase.ProposalRejectionPermanent:
106117
cmd.response.Err = pErr
107118
case kvserverbase.ProposalRejectionIllegalLeaseIndex:
119+
if useReproposalsV2 {
120+
// If we're using V2 reproposals, this proposal is actually going to
121+
// be fully rejected, but the client won't be listening to it at that
122+
// point any more. But we should set the error. (This ends up being
123+
// inconsequential but it's the right thing to do).
124+
//
125+
// TODO(tbg): once useReproposalsV2 is baked in, set the error unconditionally
126+
// above the `switch`.
127+
cmd.response.Err = pErr
128+
}
108129
// Reset the error as it's now going to be determined by the outcome of
109130
// reproposing (or not); note that tryReproposeWithNewLeaseIndex will
110131
// return `nil` if the entry is not eligible for reproposals.
@@ -171,7 +192,18 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
171192
}
172193
}
173194
if pErr == nil { // since we might have injected an error
174-
pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd)
195+
if useReproposalsV2 {
196+
pErr = kvpb.NewError(r.tryReproposeWithNewLeaseIndexV2(ctx, cmd))
197+
if pErr == nil {
198+
// Avoid falling through below. We managed to repropose, but this
199+
// proposal is still erroring out. We don't want to assign to
200+
// localResult. If there is an error though, we do fall through into
201+
// the existing tangle of correct but unreadable handling below.
202+
return
203+
}
204+
} else {
205+
pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd)
206+
}
175207
}
176208

177209
if pErr != nil {
@@ -210,10 +242,16 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
210242
// https://github.com/cockroachdb/cockroach/issues/97633
211243
log.Infof(ctx, "failed to repropose %s at idx %d with new lease index: %s", cmd.ID, cmd.Index(), pErr)
212244
cmd.response.Err = pErr
213-
} else {
245+
// Fall through.
246+
} else if !useReproposalsV2 {
214247
// Unbind the entry's local proposal because we just succeeded
215248
// in reproposing it and we don't want to acknowledge the client
216249
// yet.
250+
//
251+
// NB: in v2, reproposing already moved the waiting caller over to a new
252+
// proposal, and by design we don't change the "Localness" of the old
253+
// proposal mid-application but instead let it fail as a local proposal
254+
// (which signals into an throwaway channel).
217255
cmd.proposal = nil
218256
return
219257
}
@@ -225,6 +263,14 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
225263
} else {
226264
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal)
227265
}
266+
267+
// The current proposal has no error (and wasn't reproposed successfully or we
268+
// would've early returned already) OR it has an error AND we failed to
269+
// repropose it.
270+
//
271+
// TODO(tbg): it doesn't make sense to assign to `cmd.response` unconditionally.
272+
// We're returning an error; the response should be nil. The error tracking in
273+
// this method should be cleaned up.
228274
cmd.response.EncounteredIntents = cmd.proposal.Local.DetachEncounteredIntents()
229275
cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil)
230276
if pErr == nil {
@@ -234,6 +280,134 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
234280
}
235281
}
236282

283+
func (r *Replica) tryReproposeWithNewLeaseIndexV2(
284+
ctx context.Context, origCmd *replicatedCmd,
285+
) error {
286+
// NB: `origCmd` remains "Local". It's just not going to signal anyone
287+
// or release any latches.
288+
289+
origP := origCmd.proposal
290+
291+
// We want to move a few items from origCmd to the new command, but only if we
292+
// managed to propose the new command. For example, if we move the latches
293+
// over too early but then fail to actually get the new proposal started, the
294+
// old proposal will not release the latches. This would result in a lost
295+
// latch.
296+
var success bool
297+
298+
// Go through the original proposal field by field and decide what transfers
299+
// to the new proposal (and how that affects the old proposal). The overall
300+
// goal is that the old proposal remains a local proposal (switching it to
301+
// non-local now invites logic bugs) but not bound to the caller.
302+
303+
// NB: quotaAlloc is always nil here, because we already
304+
// released the quota unconditionally in retrieveLocalProposalsV2.
305+
// So the below is a no-op.
306+
//
307+
// TODO(tbg): if we shifted the release of proposal quota to *after*
308+
// successful application, we could move the quota over
309+
// prematurely releasing it here.
310+
newQuotaAlloc := origP.quotaAlloc
311+
defer func() {
312+
if success {
313+
origP.quotaAlloc = nil
314+
}
315+
}()
316+
317+
newCommand := kvserverpb.RaftCommand{
318+
ProposerLeaseSequence: origP.command.ProposerLeaseSequence,
319+
DeprecatedProposerLease: origP.command.DeprecatedProposerLease,
320+
ReplicatedEvalResult: origP.command.ReplicatedEvalResult,
321+
WriteBatch: origP.command.WriteBatch,
322+
LogicalOpLog: origP.command.LogicalOpLog,
323+
TraceData: origP.command.TraceData,
324+
325+
MaxLeaseIndex: 0, // assigned on flush
326+
ClosedTimestamp: nil, // assigned on flush
327+
AdmissionPriority: 0, // assigned on flush
328+
AdmissionCreateTime: 0, // assigned on flush
329+
AdmissionOriginNode: 0, // assigned on flush
330+
}
331+
332+
// Now we construct the remainder of the ProposalData. First, the pieces
333+
// that actively "move over", i.e. those that have to do with the latches
334+
// held and the caller waiting to be signaled.
335+
336+
// `ec` (latches, etc) transfers to the new proposal.
337+
newEC := origP.ec
338+
defer func() {
339+
if success {
340+
origP.ec = endCmds{}
341+
}
342+
}()
343+
344+
// Ditto doneCh (signal to proposer).
345+
newDoneCh := origP.doneCh
346+
defer func() {
347+
if success {
348+
origP.doneCh = nil
349+
}
350+
}()
351+
352+
r.mu.RLock()
353+
ticks := r.mu.ticks
354+
r.mu.RUnlock()
355+
356+
// TODO(tbg): work on the lifecycle of ProposalData. This struct (and the
357+
// surrounding replicatedCmd) are populated in an overly ad-hoc manner.
358+
// TODO(tbg): the fields are spelled out here to make explicit what is being copied
359+
// here. Add a unit test that fails on addition of a new field and points at the
360+
// need to double check what the intended behavior of the new field in this method
361+
// is.
362+
newProposal := &ProposalData{
363+
// The proposal's context and span carry over. Recall that they are *not*
364+
// used for command application; `cmd.{ctx,sp}` are; and since this last
365+
// span "follows from" the proposal's span, if the proposal sticks around
366+
// for (some reincarnation of) the command to eventually apply, its trace
367+
// will reflect the reproposal as well.
368+
ctx: origP.ctx,
369+
sp: origP.sp, // NB: special handling below
370+
idKey: raftlog.MakeCmdIDKey(),
371+
proposedAtTicks: 0, // set in registerProposalLocked
372+
createdAtTicks: ticks,
373+
command: &newCommand,
374+
quotaAlloc: newQuotaAlloc,
375+
ec: newEC,
376+
applied: false,
377+
doneCh: newDoneCh,
378+
// Local is copied over. It won't be used on the old proposal (since that
379+
// proposal got rejected), but since it's still "local" we don't want to put
380+
// it into an undefined state by removing its response. The same goes for
381+
// Request.
382+
Local: origP.Local,
383+
Request: origP.Request,
384+
leaseStatus: origP.leaseStatus,
385+
tok: TrackedRequestToken{}, // filled in in `propose`
386+
encodedCommand: nil,
387+
raftAdmissionMeta: nil,
388+
v2SeenDuringApplication: false,
389+
}
390+
// If the original proposal had an explicit span, it's an async consensus
391+
// proposal and the span would be finished momentarily (when we return to
392+
// the caller) if we didn't unlink it here, but we want it to continue
393+
// tracking newProposal. We leave it in `origP.ctx` though, since that
394+
// context will become unused once the application of this (soft-failed)
395+
// proposal concludes, i.e. soon after this method returns, in case there
396+
// is anything left to log into it.
397+
defer func() {
398+
if success {
399+
origP.sp = nil
400+
}
401+
}()
402+
403+
if err := r.tryReproposeWithNewLeaseIndexShared(ctx, newProposal).GoError(); err != nil {
404+
return err
405+
}
406+
407+
success = true
408+
return nil
409+
}
410+
237411
// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose
238412
// commands that have gotten an illegal lease index error, and that we know
239413
// could not have applied while their lease index was valid (that is, we
@@ -267,7 +441,12 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(
267441
// succeeding in the Raft log for a given command.
268442
return nil
269443
}
444+
return r.tryReproposeWithNewLeaseIndexShared(ctx, cmd.proposal)
445+
}
270446

447+
func (r *Replica) tryReproposeWithNewLeaseIndexShared(
448+
ctx context.Context, p *ProposalData,
449+
) *kvpb.Error {
271450
// We need to track the request again in order to protect its timestamp until
272451
// it gets reproposed.
273452
// TODO(andrei): Only track if the request consults the ts cache. Some
@@ -299,7 +478,7 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(
299478
if pErr != nil {
300479
return pErr
301480
}
302-
log.VEventf(ctx, 2, "reproposed command %x", cmd.ID)
481+
log.VEventf(ctx, 2, "reproposed command %x", p.idKey)
303482
return nil
304483
}
305484

0 commit comments

Comments
 (0)