Skip to content

Commit ce2bd4a

Browse files
committed
Fix(migrate): support migrate chunkserver and metaserveruccessed
Signed-off-by: caoxianfei1 <[email protected]>
1 parent 8d9c430 commit ce2bd4a

File tree

11 files changed

+293
-64
lines changed

11 files changed

+293
-64
lines changed

cli/command/deploy.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,6 @@ func genDeployPlaybook(curveadm *cli.CurveAdm,
231231
Name: options.poolset,
232232
Type: options.poolsetDiskType,
233233
}
234-
diskType := options.poolsetDiskType
235234

236235
pb := playbook.NewPlaybook(curveadm)
237236
for _, step := range steps {
@@ -255,8 +254,7 @@ func genDeployPlaybook(curveadm *cli.CurveAdm,
255254
options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs)
256255
} else if step == CREATE_LOGICAL_POOL {
257256
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
258-
options[comm.POOLSET] = poolset
259-
options[comm.POOLSET_DISK_TYPE] = diskType
257+
options[comm.KEY_POOLSET] = poolset
260258
options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs)
261259
}
262260

cli/command/migrate.go

+68-37
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ import (
3030
"github.com/opencurve/curveadm/internal/configure/topology"
3131
"github.com/opencurve/curveadm/internal/errno"
3232
"github.com/opencurve/curveadm/internal/playbook"
33-
tui "github.com/opencurve/curveadm/internal/tui/common"
33+
"github.com/opencurve/curveadm/internal/task/task/common"
34+
tuicomm "github.com/opencurve/curveadm/internal/tui/common"
35+
3436
cliutil "github.com/opencurve/curveadm/internal/utils"
3537
"github.com/spf13/cobra"
3638
)
@@ -71,26 +73,23 @@ var (
7173
// chunkserevr (curvebs)
7274
MIGRATE_CHUNKSERVER_STEPS = []int{
7375
playbook.BACKUP_ETCD_DATA,
74-
playbook.STOP_SERVICE,
75-
playbook.CLEAN_SERVICE, // only container
76+
playbook.CREATE_PHYSICAL_POOL, // add machine that migrate to
7677
playbook.PULL_IMAGE,
7778
playbook.CREATE_CONTAINER,
7879
playbook.SYNC_CONFIG,
79-
playbook.CREATE_PHYSICAL_POOL,
8080
playbook.START_CHUNKSERVER,
81-
playbook.CREATE_LOGICAL_POOL,
81+
playbook.MARK_SERVER_PENGDDING, // start migrate to new server
8282
}
8383

8484
// metaserver (curvefs)
8585
MIGRATE_METASERVER_STEPS = []int{
8686
playbook.BACKUP_ETCD_DATA,
87-
playbook.STOP_SERVICE, // only container
88-
playbook.CLEAN_SERVICE,
87+
playbook.CREATE_LOGICAL_POOL,
8988
playbook.PULL_IMAGE,
9089
playbook.CREATE_CONTAINER,
9190
playbook.SYNC_CONFIG,
9291
playbook.START_METASERVER,
93-
playbook.CREATE_LOGICAL_POOL,
92+
playbook.STOP_SERVICE, // start migrate to new server
9493
}
9594

9695
MIGRATE_ROLE_STEPS = map[string][]int{
@@ -100,12 +99,21 @@ var (
10099
topology.ROLE_SNAPSHOTCLONE: MIGRATE_SNAPSHOTCLONE_STEPS,
101100
topology.ROLE_METASERVER: MIGRATE_METASERVER_STEPS,
102101
}
102+
103+
MIGRATE_POST_CLEAN_STEPS = []int{
104+
playbook.STOP_SERVICE, // bs
105+
playbook.CLEAN_SERVICE, // bs, fs
106+
playbook.CREATE_PHYSICAL_POOL, // only for chunkserver, remove server that migrate from
107+
playbook.CREATE_LOGICAL_POOL, // only for metaserver, remove server that migrate from
108+
playbook.UPDATE_TOPOLOGY, // bs, fs
109+
}
103110
)
104111

105112
type migrateOptions struct {
106113
filename string
107114
poolset string
108115
poolsetDiskType string
116+
clean bool
109117
}
110118

111119
func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command {
@@ -125,7 +133,7 @@ func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command {
125133
flags := cmd.Flags()
126134
flags.StringVar(&options.poolset, "poolset", "default", "Specify the poolset")
127135
flags.StringVar(&options.poolsetDiskType, "poolset-disktype", "ssd", "Specify the disk type of physical pool")
128-
136+
flags.BoolVar(&options.clean, "clean", false, "Clean migrated environment for chunkserver or metaserver")
129137
return cmd
130138
}
131139

@@ -191,8 +199,21 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
191199
migrates := getMigrates(curveadm, data)
192200
role := migrates[0].From.GetRole()
193201
steps := MIGRATE_ROLE_STEPS[role]
194-
poolset := options.poolset
195-
poolsetDiskType := options.poolsetDiskType
202+
203+
// post clean
204+
if options.clean {
205+
steps = MIGRATE_POST_CLEAN_STEPS
206+
if migrates[0].From.GetKind() == common.KIND_CURVEBS {
207+
steps = append(steps[:3], steps[4:]...)
208+
} else {
209+
steps = append(steps[1:2], steps[3:]...)
210+
}
211+
}
212+
213+
poolset := configure.Poolset{
214+
Name: options.poolset,
215+
Type: options.poolsetDiskType,
216+
}
196217

197218
pb := playbook.NewPlaybook(curveadm)
198219
for _, step := range steps {
@@ -204,38 +225,40 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
204225
config = dcs2del
205226
case playbook.BACKUP_ETCD_DATA:
206227
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD)
207-
case CREATE_PHYSICAL_POOL,
208-
CREATE_LOGICAL_POOL:
228+
case
229+
playbook.CREATE_PHYSICAL_POOL,
230+
playbook.CREATE_LOGICAL_POOL,
231+
playbook.MARK_SERVER_PENGDDING:
209232
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS)[:1]
210233
}
211234

212235
// options
213-
options := map[string]interface{}{}
236+
optionsKV := map[string]interface{}{}
214237
switch step {
215238
case playbook.CLEAN_SERVICE:
216-
options[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER}
217-
options[comm.KEY_CLEAN_BY_RECYCLE] = true
239+
optionsKV[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER}
240+
optionsKV[comm.KEY_CLEAN_BY_RECYCLE] = true
241+
optionsKV[comm.KEY_REMOVE_MIGRATED_SERVER] = true
218242
case playbook.CREATE_PHYSICAL_POOL:
219-
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL
220-
options[comm.KEY_MIGRATE_SERVERS] = migrates
221-
options[comm.POOLSET] = poolset
222-
options[comm.POOLSET_DISK_TYPE] = poolsetDiskType
243+
optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL
244+
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
245+
optionsKV[comm.KEY_POOLSET] = poolset
223246
case playbook.CREATE_LOGICAL_POOL:
224-
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
225-
options[comm.KEY_MIGRATE_SERVERS] = migrates
226-
options[comm.KEY_NEW_TOPOLOGY_DATA] = data
227-
options[comm.POOLSET] = poolset
228-
options[comm.POOLSET_DISK_TYPE] = poolsetDiskType
247+
optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
248+
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
249+
optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data
250+
optionsKV[comm.KEY_IF_UPDATE_TOPOLOG] = false
251+
optionsKV[comm.KEY_POOLSET] = poolset
229252
case playbook.UPDATE_TOPOLOGY:
230-
options[comm.KEY_NEW_TOPOLOGY_DATA] = data
253+
optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data
231254
}
232255

233256
pb.AddStep(&playbook.PlaybookStep{
234-
Type: step,
235-
Configs: config,
236-
Options: options,
257+
Type: step,
258+
Configs: config,
259+
Options: optionsKV,
237260
ExecOptions: playbook.ExecOptions{
238-
SilentSubBar: step == playbook.UPDATE_TOPOLOGY,
261+
// SilentSubBar: step == playbook.UPDATE_TOPOLOGY,
239262
},
240263
})
241264
}
@@ -261,7 +284,10 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
261284
}
262285

263286
// 2) read topology from file
264-
data, err := readTopology(curveadm, options.filename)
287+
data, err := readTopology(curveadm,
288+
options.filename,
289+
options.clean,
290+
)
265291
if err != nil {
266292
return err
267293
}
@@ -272,13 +298,15 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
272298
return err
273299
}
274300

275-
// 4) display title
276-
displayMigrateTitle(curveadm, data)
301+
if !options.clean {
302+
// 4) display title
303+
displayMigrateTitle(curveadm, data)
277304

278-
// 5) confirm by user
279-
if pass := tui.ConfirmYes(tui.DEFAULT_CONFIRM_PROMPT); !pass {
280-
curveadm.WriteOutln(tui.PromptCancelOpetation("migrate service"))
281-
return errno.ERR_CANCEL_OPERATION
305+
// 5) confirm by user
306+
if pass := tuicomm.ConfirmYes(tuicomm.DEFAULT_CONFIRM_PROMPT); !pass {
307+
curveadm.WriteOutln(tuicomm.PromptCancelOpetation("migrate service"))
308+
return errno.ERR_CANCEL_OPERATION
309+
}
282310
}
283311

284312
// 6) generate migrate playbook
@@ -294,6 +322,9 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
294322
}
295323

296324
// 9) print success prompt
325+
if options.clean {
326+
return nil
327+
}
297328
curveadm.WriteOutln("")
298329
curveadm.WriteOutln(color.GreenString("Services successfully migrateed ^_^."))
299330
// TODO(P1): warning iff there is changed configs

cli/command/scale_out.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func NewScaleOutCommand(curveadm *cli.CurveAdm) *cobra.Command {
144144
return cmd
145145
}
146146

147-
func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) {
147+
func readTopology(curveadm *cli.CurveAdm, filename string, clean bool) (string, error) {
148148
if !utils.PathExist(filename) {
149149
return "", errno.ERR_TOPOLOGY_FILE_NOT_FOUND.
150150
F("%s: no such file", utils.AbsPath(filename))
@@ -156,7 +156,9 @@ func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) {
156156
}
157157

158158
oldData := curveadm.ClusterTopologyData()
159-
curveadm.WriteOut("%s", utils.Diff(oldData, data))
159+
if !clean {
160+
curveadm.WriteOut("%s", utils.Diff(oldData, data))
161+
}
160162
return data, nil
161163
}
162164

@@ -384,7 +386,7 @@ func runScaleOut(curveadm *cli.CurveAdm, options scaleOutOptions) error {
384386
}
385387

386388
// 2) read topology from file
387-
data, err := readTopology(curveadm, options.filename)
389+
data, err := readTopology(curveadm, options.filename, false)
388390
if err != nil {
389391
return err
390392
}

internal/common/common.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ const (
5454
// format
5555
KEY_ALL_FORMAT_STATUS = "ALL_FORMAT_STATUS"
5656

57+
// migrate
58+
KEY_MIGRATE_STATUS = "MIGRATE_STATUS"
59+
KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS"
60+
5761
// check
5862
KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK"
5963
KEY_CHECK_KERNEL_MODULE_NAME = "CHECK_KERNEL_MODULE_NAME"
@@ -64,6 +68,7 @@ const (
6468
KEY_SCALE_OUT_CLUSTER = "SCALE_OUT_CLUSTER"
6569
KEY_MIGRATE_SERVERS = "MIGRATE_SERVERS"
6670
KEY_NEW_TOPOLOGY_DATA = "NEW_TOPOLOGY_DATA"
71+
KEY_IF_UPDATE_TOPOLOG = "IF_UPDATE_TOPOTLOY"
6772

6873
// status
6974
KEY_ALL_SERVICE_STATUS = "ALL_SERVICE_STATUS"
@@ -72,12 +77,13 @@ const (
7277
SERVICE_STATUS_UNKNOWN = "Unknown"
7378

7479
// clean
75-
KEY_CLEAN_ITEMS = "CLEAN_ITEMS"
76-
KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE"
77-
CLEAN_ITEM_LOG = "log"
78-
CLEAN_ITEM_DATA = "data"
79-
CLEAN_ITEM_CONTAINER = "container"
80-
CLEANED_CONTAINER_ID = "-"
80+
KEY_CLEAN_ITEMS = "CLEAN_ITEMS"
81+
KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE"
82+
CLEAN_ITEM_LOG = "log"
83+
CLEAN_ITEM_DATA = "data"
84+
CLEAN_ITEM_CONTAINER = "container"
85+
CLEANED_CONTAINER_ID = "-"
86+
KEY_REMOVE_MIGRATED_SERVER = "REMOVE_MIGRATED_SERVER"
8187

8288
// client
8389
KEY_CLIENT_HOST = "CLIENT_HOST"

internal/configure/pool.go

+26-12
Original file line numberDiff line numberDiff line change
@@ -263,26 +263,40 @@ func ScaleOutClusterPool(old *CurveClusterTopo, dcs []*topology.DeployConfig, po
263263
old.NPools = old.NPools + 1
264264
}
265265

266-
func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer) {
266+
func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer, removeMigratedServer bool) {
267267
m := map[string]*topology.DeployConfig{} // key: from.Name, value: to.DeployConfig
268268
for _, migrate := range migrates {
269269
m[formatName(migrate.From)] = migrate.To
270270
}
271271

272-
for i, server := range old.Servers {
273-
dc, ok := m[server.Name]
274-
if !ok {
275-
continue
272+
// add server that will migrate to
273+
for fromName, toDc := range m {
274+
server := Server{}
275+
server.InternalIp = toDc.GetListenIp()
276+
server.ExternalIp = toDc.GetListenExternalIp()
277+
server.InternalPort = toDc.GetListenPort()
278+
server.ExternalPort = toDc.GetListenExternalPort()
279+
server.Name = formatName(toDc)
280+
281+
for _, oldServer := range old.Servers {
282+
if oldServer.Name == fromName {
283+
server.PhysicalPool = oldServer.PhysicalPool
284+
server.Poolset = oldServer.Poolset
285+
server.Pool = oldServer.Pool
286+
server.Zone = oldServer.Zone
287+
}
276288
}
289+
old.Servers = append(old.Servers, server)
290+
}
277291

278-
server.InternalIp = dc.GetListenIp()
279-
server.ExternalIp = dc.GetListenExternalIp()
280-
server.Name = formatName(dc)
281-
if server.InternalPort != 0 && server.ExternalPort != 0 {
282-
server.InternalPort = dc.GetListenPort()
283-
server.ExternalPort = dc.GetListenExternalPort()
292+
// remove server that has migrated
293+
if removeMigratedServer {
294+
for i := 0; i < len(old.Servers); i++ {
295+
_, ok := m[old.Servers[i].Name]
296+
if ok {
297+
old.Servers = append(old.Servers[:i], old.Servers[i+1:]...)
298+
}
284299
}
285-
old.Servers[i] = server
286300
}
287301
}
288302

internal/errno/errno.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,11 @@ var (
399399
ERR_ENCRYPT_FILE_FAILED = EC(410021, "encrypt file failed")
400400
ERR_CLIENT_ID_NOT_FOUND = EC(410022, "client id not found")
401401
ERR_ENABLE_ETCD_AUTH_FAILED = EC(410023, "enable etcd auth failed")
402-
402+
ERR_MARK_CHUNKSERVER_PENDDING = EC(410024, "mark chunkserver pendding status failed when migrate")
403+
RRR_GET_CLUSTER_MDSADDR = EC(410025, "failed to get cluster mds addr")
404+
ERR_GET_CHUNKSERVER_COPYSET = EC(410026, "failed to get chunkserver copyset")
405+
ERR_GET_MIGRATE_COPYSET = EC(410027, "migrate chunkserver copyset info must be 2")
406+
ERR_CONTAINER_NOT_REMOVED = EC(410027, "container not removed")
403407
// 420: common (curvebs client)
404408
ERR_VOLUME_ALREADY_MAPPED = EC(420000, "volume already mapped")
405409
ERR_VOLUME_CONTAINER_LOSED = EC(420001, "volume container is losed")

internal/playbook/factory.go

+3
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ const (
9494
CREATE_VOLUME
9595
MAP_IMAGE
9696
UNMAP_IMAGE
97+
MARK_SERVER_PENGDDING
9798

9899
// monitor
99100
PULL_MONITOR_IMAGE
@@ -278,6 +279,8 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) {
278279
t, err = bs.NewDeleteTargetTask(curveadm, nil)
279280
case LIST_TARGETS:
280281
t, err = bs.NewListTargetsTask(curveadm, nil)
282+
case MARK_SERVER_PENGDDING:
283+
t, err = bs.NewMarkServerPendding(curveadm, config.GetDC(i))
281284
// fs
282285
case CHECK_CLIENT_S3:
283286
t, err = checker.NewClientS3ConfigureTask(curveadm, config.GetCC(i))

internal/task/scripts/script.go

+3
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,7 @@ var (
6161

6262
//go:embed shell/create_fs.sh
6363
CREATE_FS string
64+
65+
//go:embed shell/mark_server_pendding.sh
66+
MARK_SERVER_PENDDING string
6467
)

0 commit comments

Comments
 (0)