Skip to content

Commit c164be5

Browse files
authored
dm-worker: use run ctx instead global ctx to fix double write (#7661) (#7745)
close #7658
1 parent 784dd7a commit c164be5

File tree

1 file changed

+21
-11
lines changed

1 file changed

+21
-11
lines changed

dm/dm/worker/server.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,14 @@ type Server struct {
7070
wg sync.WaitGroup
7171
kaWg sync.WaitGroup
7272
httpWg sync.WaitGroup
73+
runWg sync.WaitGroup
7374

7475
ctx context.Context
7576
cancel context.CancelFunc
7677

78+
runCtx context.Context
79+
runCancel context.CancelFunc
80+
7781
kaCtx context.Context
7882
kaCancel context.CancelFunc
7983

@@ -105,6 +109,8 @@ func (s *Server) Start() error {
105109

106110
var m cmux.CMux
107111

112+
s.runCtx, s.runCancel = context.WithCancel(s.ctx)
113+
108114
// protect member from data race. some functions below like GetRelayConfig,
109115
// GetSourceBoundConfig has a built-in timeout so it will not be stuck for a
110116
// long time.
@@ -141,10 +147,10 @@ func (s *Server) Start() error {
141147

142148
s.setWorker(nil, true)
143149

144-
s.wg.Add(1)
150+
s.runWg.Add(1)
145151
go func() {
146-
s.runBackgroundJob(s.ctx)
147-
s.wg.Done()
152+
s.runBackgroundJob(s.runCtx)
153+
s.runWg.Done()
148154
}()
149155

150156
s.startKeepAlive()
@@ -160,13 +166,13 @@ func (s *Server) Start() error {
160166
}
161167
}
162168

163-
s.wg.Add(1)
169+
s.runWg.Add(1)
164170
go func(ctx context.Context) {
165-
defer s.wg.Done()
171+
defer s.runWg.Done()
166172
// TODO: handle fatal error from observeRelayConfig
167173
//nolint:errcheck
168174
s.observeRelayConfig(ctx, revRelay)
169-
}(s.ctx)
175+
}(s.runCtx)
170176

171177
bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
172178
if err != nil {
@@ -180,17 +186,17 @@ func (s *Server) Start() error {
180186
log.L().Info("started to handle mysql source", zap.String("sourceCfg", sourceCfg.String()))
181187
}
182188

183-
s.wg.Add(1)
189+
s.runWg.Add(1)
184190
go func(ctx context.Context) {
185-
defer s.wg.Done()
191+
defer s.runWg.Done()
186192
for {
187193
err1 := s.observeSourceBound(ctx, revBound)
188194
if err1 == nil {
189195
return
190196
}
191197
s.restartKeepAlive()
192198
}
193-
}(s.ctx)
199+
}(s.runCtx)
194200

195201
// create a cmux
196202
m = cmux.New(s.rootLis)
@@ -468,8 +474,8 @@ func (s *Server) doClose() {
468474
return
469475
}
470476
// stop server in advance, stop receiving source bound and relay bound
471-
s.cancel()
472-
s.wg.Wait()
477+
s.runCancel()
478+
s.runWg.Wait()
473479

474480
// stop worker and wait for return(we already lock the whole Sever, so no need use lock to get source worker)
475481
if w := s.getSourceWorker(true); w != nil {
@@ -493,6 +499,10 @@ func (s *Server) doClose() {
493499
func (s *Server) Close() {
494500
s.doClose() // we should stop current sync first, otherwise master may schedule task on new worker while we are closing
495501
s.stopKeepAlive()
502+
503+
s.cancel()
504+
s.wg.Wait()
505+
496506
if s.etcdClient != nil {
497507
s.etcdClient.Close()
498508
}

0 commit comments

Comments
 (0)