Skip to content

Commit a3f111d

Browse files
authored
Merge pull request #135 from DrmagicE/dev
fix(federation): fix deadlock
2 parents 8d33b56 + 699b302 commit a3f111d

File tree

2 files changed

+56
-50
lines changed

2 files changed

+56
-50
lines changed

plugin/federation/federation.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,10 +399,12 @@ func (f *Federation) Hello(ctx context.Context, req *ClientHello) (resp *ServerH
399399
return nil, err
400400
}
401401
f.memberMu.Lock()
402-
if f.peers[nodeName] == nil {
402+
p := f.peers[nodeName]
403+
f.memberMu.Unlock()
404+
if p == nil {
403405
return nil, status.Errorf(codes.Internal, "Hello: the node [%s] has not yet joined", nodeName)
404406
}
405-
f.memberMu.Unlock()
407+
406408
cleanStart, nextID := f.sessionMgr.add(nodeName, req.SessionId)
407409
if cleanStart {
408410
_ = f.fedSubStore.UnsubscribeAll(nodeName)

server/server.go

Lines changed: 52 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ func (c *clientService) TerminateSession(clientID string) {
163163
type server struct {
164164
wg sync.WaitGroup
165165
initOnce sync.Once
166+
stopOnce sync.Once
166167
mu sync.RWMutex //gard clients & offlineClients map
167168
status int32 //server status
168169
// clients stores the online clients
@@ -1419,58 +1420,61 @@ func (srv *server) Run() (err error) {
14191420
// 3. Waiting for all connections have been closed
14201421
// 4. Triggering OnStop()
14211422
func (srv *server) Stop(ctx context.Context) error {
1422-
zaplog.Info("stopping gmqtt server")
1423-
defer func() {
1424-
defer close(srv.exitedChan)
1425-
zaplog.Info("server stopped")
1426-
}()
1427-
srv.exit()
1423+
var err error
1424+
srv.stopOnce.Do(func() {
1425+
zaplog.Info("stopping gmqtt server")
1426+
defer func() {
1427+
defer close(srv.exitedChan)
1428+
zaplog.Info("server stopped")
1429+
}()
1430+
srv.exit()
14281431

1429-
for _, l := range srv.tcpListener {
1430-
l.Close()
1431-
}
1432-
for _, ws := range srv.websocketServer {
1433-
ws.Server.Shutdown(ctx)
1434-
}
1435-
// close all idle clients
1436-
srv.mu.Lock()
1437-
chs := make([]chan struct{}, len(srv.clients))
1438-
i := 0
1439-
for _, c := range srv.clients {
1440-
chs[i] = c.closed
1441-
i++
1442-
c.Close()
1443-
}
1444-
srv.mu.Unlock()
1432+
for _, l := range srv.tcpListener {
1433+
l.Close()
1434+
}
1435+
for _, ws := range srv.websocketServer {
1436+
ws.Server.Shutdown(ctx)
1437+
}
1438+
// close all idle clients
1439+
srv.mu.Lock()
1440+
chs := make([]chan struct{}, len(srv.clients))
1441+
i := 0
1442+
for _, c := range srv.clients {
1443+
chs[i] = c.closed
1444+
i++
1445+
c.Close()
1446+
}
1447+
srv.mu.Unlock()
14451448

1446-
done := make(chan struct{})
1447-
if len(chs) != 0 {
1448-
go func() {
1449-
for _, v := range chs {
1450-
<-v
1451-
}
1449+
done := make(chan struct{})
1450+
if len(chs) != 0 {
1451+
go func() {
1452+
for _, v := range chs {
1453+
<-v
1454+
}
1455+
close(done)
1456+
}()
1457+
} else {
14521458
close(done)
1453-
}()
1454-
} else {
1455-
close(done)
1456-
}
1459+
}
14571460

1458-
select {
1459-
case <-ctx.Done():
1460-
zaplog.Warn("server stop timeout, force exit", zap.String("error", ctx.Err().Error()))
1461-
return ctx.Err()
1462-
case <-done:
1463-
for _, v := range srv.plugins {
1464-
zaplog.Info("unloading plugin", zap.String("name", v.Name()))
1465-
err := v.Unload()
1466-
if err != nil {
1467-
zaplog.Warn("plugin unload error", zap.String("error", err.Error()))
1461+
select {
1462+
case <-ctx.Done():
1463+
zaplog.Warn("server stop timeout, force exit", zap.String("error", ctx.Err().Error()))
1464+
err = ctx.Err()
1465+
return
1466+
case <-done:
1467+
for _, v := range srv.plugins {
1468+
zaplog.Info("unloading plugin", zap.String("name", v.Name()))
1469+
err := v.Unload()
1470+
if err != nil {
1471+
zaplog.Warn("plugin unload error", zap.String("error", err.Error()))
1472+
}
1473+
}
1474+
if srv.hooks.OnStop != nil {
1475+
srv.hooks.OnStop(context.Background())
14681476
}
14691477
}
1470-
if srv.hooks.OnStop != nil {
1471-
srv.hooks.OnStop(context.Background())
1472-
}
1473-
return nil
1474-
}
1475-
1478+
})
1479+
return err
14761480
}

0 commit comments

Comments
 (0)