-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate_operator.go
343 lines (300 loc) · 12.4 KB
/
create_operator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
// Copyright 2019 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package operator
import (
"fmt"
"math/rand"
"go.uber.org/zap"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/logutil"
)
// CreateAddPeerOperator creates an operator that adds a new peer.
func CreateAddPeerOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, peer *metapb.Peer, kind OpKind) (*Operator, error) {
return NewBuilder(desc, ci, region).
AddPeer(peer).
Build(kind)
}
// CreateDemoteVoterOperator creates an operator that demotes a voter
func CreateDemoteVoterOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
DemoteVoter(peer.GetStoreId()).
Build(0)
}
// CreatePromoteLearnerOperator creates an operator that promotes a learner.
func CreatePromoteLearnerOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
PromoteLearner(peer.GetStoreId()).
Build(0)
}
// CreatePromoteLearnerOperatorAndRemovePeer creates an operator that promotes a learner and removes a peer.
func CreatePromoteLearnerOperatorAndRemovePeer(desc string, ci sche.SharedCluster, region *core.RegionInfo, toPromote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
PromoteLearner(toPromote.GetStoreId()).
RemovePeer(toRemove.GetStoreId()).
Build(0)
}
// CreateDemoteLearnerOperatorAndRemovePeer creates an operator that demotes a learner and removes a peer.
func CreateDemoteLearnerOperatorAndRemovePeer(desc string, ci sche.SharedCluster, region *core.RegionInfo, toDemote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) {
if !ci.GetSharedConfig().IsUseJointConsensus() {
return nil, errors.Errorf("cannot build demote learner operator due to disabling using joint state")
}
return NewBuilder(desc, ci, region).
DemoteVoter(toDemote.GetStoreId()).
RemovePeer(toRemove.GetStoreId()).
Build(0)
}
// CreateRemovePeerOperator creates an operator that removes a peer from region.
func CreateRemovePeerOperator(desc string, ci sche.SharedCluster, kind OpKind, region *core.RegionInfo, storeID uint64) (*Operator, error) {
return NewBuilder(desc, ci, region).
RemovePeer(storeID).
Build(kind)
}
// CreateTransferLeaderOperator creates an operator that transfers the leader from a source store to a target store.
func CreateTransferLeaderOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, targetStoreID uint64, kind OpKind) (*Operator, error) {
return NewBuilder(desc, ci, region, SkipOriginJointStateCheck).
SetLeader(targetStoreID).
Build(kind)
}
// CreateForceTransferLeaderOperator creates an operator that transfers the leader from a source store to a target store forcible.
func CreateForceTransferLeaderOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, targetStoreID uint64, kind OpKind) (*Operator, error) {
return NewBuilder(desc, ci, region, SkipOriginJointStateCheck, SkipPlacementRulesCheck).
SetLeader(targetStoreID).
EnableForceTargetLeader().
Build(kind)
}
// CreateMoveRegionOperator creates an operator that moves a region to specified stores.
func CreateMoveRegionOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, kind OpKind, roles map[uint64]placement.PeerRoleType) (*Operator, error) {
// construct the peers from roles
oldPeers := region.GetPeers()
peers := make(map[uint64]*metapb.Peer)
i := 0
for storeID, role := range roles {
isWitness := false
if i < len(oldPeers) {
isWitness = oldPeers[i].GetIsWitness()
}
peers[storeID] = &metapb.Peer{
StoreId: storeID,
Role: role.MetaPeerRole(),
IsWitness: isWitness,
}
i += 1
}
builder := NewBuilder(desc, ci, region).SetPeers(peers).SetExpectedRoles(roles)
return builder.Build(kind)
}
// CreateMovePeerOperator creates an operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
RemovePeer(oldStore).
AddPeer(peer).
Build(kind)
}
// CreateMoveWitnessOperator creates an operator that replaces an old witness with a new witness.
func CreateMoveWitnessOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, sourceStoreID uint64, targetStoreID uint64) (*Operator, error) {
return NewBuilder(desc, ci, region).
BecomeNonWitness(sourceStoreID).
BecomeWitness(targetStoreID).
Build(OpWitness)
}
// CreateReplaceLeaderPeerOperator creates an operator that replaces an old peer with a new peer, and move leader from old store firstly.
func CreateReplaceLeaderPeerOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer, leader *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
RemovePeer(oldStore).
AddPeer(peer).
SetLeader(leader.GetStoreId()).
Build(kind)
}
// CreateMoveLeaderOperator creates an operator that replaces an old leader with a new leader.
func CreateMoveLeaderOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, kind OpKind, oldStore uint64, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
RemovePeer(oldStore).
AddPeer(peer).
SetLeader(peer.GetStoreId()).
Build(kind)
}
// CreateSplitRegionOperator creates an operator that splits a region.
func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind, policy pdpb.CheckPolicy, keys [][]byte) (*Operator, error) {
if core.IsInJointState(region.GetPeers()...) {
return nil, errors.Errorf("cannot split region which is in joint state")
}
step := SplitRegion{
StartKey: region.GetStartKey(),
EndKey: region.GetEndKey(),
Policy: policy,
SplitKeys: keys,
}
brief := fmt.Sprintf("split: region %v use policy %s", region.GetID(), policy)
if len(keys) > 0 {
hexKeys := make([]string, len(keys))
for i := range keys {
hexKeys[i] = core.HexRegionKeyStr(logutil.RedactBytes(keys[i]))
}
brief += fmt.Sprintf(" and keys %v", hexKeys)
}
op := NewOperator(desc, brief, region.GetID(), region.GetRegionEpoch(), kind|OpSplit, region.GetApproximateSize(), step)
op.SetAdditionalInfo("region-start-key", core.HexRegionKeyStr(logutil.RedactBytes(region.GetStartKey())))
op.SetAdditionalInfo("region-end-key", core.HexRegionKeyStr(logutil.RedactBytes(region.GetEndKey())))
return op, nil
}
// CreateMergeRegionOperator creates an operator that merge two region into one.
func CreateMergeRegionOperator(desc string, ci sche.SharedCluster, source *core.RegionInfo, target *core.RegionInfo, kind OpKind) ([]*Operator, error) {
if core.IsInJointState(source.GetPeers()...) || core.IsInJointState(target.GetPeers()...) {
return nil, errors.Errorf("cannot merge regions which are in joint state")
}
var steps []OpStep
if !isRegionMatch(source, target) {
peers := make(map[uint64]*metapb.Peer)
for _, p := range target.GetPeers() {
peers[p.GetStoreId()] = &metapb.Peer{
StoreId: p.GetStoreId(),
Role: p.GetRole(),
IsWitness: p.GetIsWitness(),
}
}
matchOp, err := NewBuilder("", ci, source).
SetPeers(peers).
Build(kind)
if err != nil {
return nil, err
}
steps = append(steps, matchOp.steps...)
kind = matchOp.Kind()
}
steps = append(steps, MergeRegion{
FromRegion: source.GetMeta(),
ToRegion: target.GetMeta(),
IsPassive: false,
})
brief := fmt.Sprintf("merge: region %v to %v", source.GetID(), target.GetID())
op1 := NewOperator(desc, brief, source.GetID(), source.GetRegionEpoch(), kind|OpMerge, source.GetApproximateSize(), steps...)
op2 := NewOperator(desc, brief, target.GetID(), target.GetRegionEpoch(), kind|OpMerge, target.GetApproximateSize(), MergeRegion{
FromRegion: source.GetMeta(),
ToRegion: target.GetMeta(),
IsPassive: true,
})
op2.Sync(op1)
return []*Operator{op1, op2}, nil
}
func isRegionMatch(a, b *core.RegionInfo) bool {
if len(a.GetPeers()) != len(b.GetPeers()) {
return false
}
for _, pa := range a.GetPeers() {
pb := b.GetStorePeer(pa.GetStoreId())
if pb == nil || core.IsLearner(pb) != core.IsLearner(pa) || core.IsWitness(pb) != core.IsWitness(pa) {
return false
}
}
return true
}
// CreateScatterRegionOperator creates an operator that scatters the specified region.
func CreateScatterRegionOperator(desc string, ci sche.SharedCluster, origin *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64, skipLimitCheck bool) (*Operator, error) {
// randomly pick a leader.
var ids []uint64
for id, peer := range targetPeers {
if !core.IsLearner(peer) {
ids = append(ids, id)
}
}
var leader uint64
if len(ids) > 0 {
leader = ids[rand.Intn(len(ids))]
}
if targetLeader != 0 {
leader = targetLeader
}
builder := NewBuilder(desc, ci, origin)
if skipLimitCheck {
builder.SetRemoveLightPeer()
}
return builder.
SetPeers(targetPeers).
SetLeader(leader).
SetAddLightPeer().
// EnableForceTargetLeader in order to ignore the leader schedule limit
EnableForceTargetLeader().
Build(OpAdmin)
}
// OpDescLeaveJointState is the expected desc for LeaveJointStateOperator.
const OpDescLeaveJointState = "leave-joint-state"
// CreateLeaveJointStateOperator creates an operator that let region leave joint state.
func CreateLeaveJointStateOperator(desc string, ci sche.SharedCluster, origin *core.RegionInfo) (*Operator, error) {
b := NewBuilder(desc, ci, origin, SkipOriginJointStateCheck, SkipPlacementRulesCheck)
if b.err == nil && !core.IsInJointState(origin.GetPeers()...) {
b.err = errors.Errorf("cannot build leave joint state operator due to disabling using joint state")
}
if b.err != nil {
return nil, b.err
}
// prepareBuild
b.toDemote = newPeersMap()
b.toPromote = newPeersMap()
for _, o := range b.originPeers {
switch o.GetRole() {
case metapb.PeerRole_IncomingVoter:
b.toPromote.set(o)
case metapb.PeerRole_DemotingVoter:
b.toDemote.set(o)
}
}
leader := b.originPeers[b.originLeaderStoreID]
if leader == nil || !b.allowLeader(leader, true) {
b.targetLeaderStoreID = 0
} else {
b.targetLeaderStoreID = b.originLeaderStoreID
}
b.currentPeers, b.currentLeaderStoreID = b.originPeers.copy(), b.originLeaderStoreID
b.peerAddStep = make(map[uint64]int)
brief := b.brief()
// buildStepsWithJointConsensus
var kind OpKind
b.setTargetLeaderIfNotExist()
if b.targetLeaderStoreID == 0 {
// Because the demote leader will be rejected by TiKV,
// when the target leader cannot be found, we need to force a target to be found.
b.forceTargetLeader = true
b.setTargetLeaderIfNotExist()
}
if b.targetLeaderStoreID == 0 {
log.Error(
"unable to find target leader",
zap.Reflect("region", origin),
errs.ZapError(errs.ErrCreateOperator.FastGenByArgs("no target leader")))
b.originLeaderStoreID = 0
} else if b.originLeaderStoreID != b.targetLeaderStoreID {
kind |= OpLeader
}
b.execChangePeerV2(false, true)
return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, origin.GetApproximateSize(), b.steps...), nil
}
// CreateWitnessPeerOperator creates an operator that set a follower or learner peer with witness
func CreateWitnessPeerOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
BecomeWitness(peer.GetStoreId()).
Build(OpWitness)
}
// CreateNonWitnessPeerOperator creates an operator that set a peer with non-witness
func CreateNonWitnessPeerOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
BecomeNonWitness(peer.GetStoreId()).
Build(OpWitness)
}