Skip to content

Commit

Permalink
fix(federation): Fix client hanging issue
Browse files Browse the repository at this point in the history
  • Loading branch information
DrmagicE committed Jul 17, 2021
1 parent 794e429 commit 8f8990f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 16 deletions.
28 changes: 18 additions & 10 deletions plugin/federation/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/DrmagicE/gmqtt/persistence/subscription"
)

type peerState byte

const (
peerStateStopped = iota + 1
peerStateStopped peerState = iota + 1
peerStateStreaming
)

Expand All @@ -33,14 +35,16 @@ type peer struct {
// local session id
sessionID string
queue queue
// client-side stream
// stateMu guards the following fields
stateMu sync.Mutex
state int
stream *stream
state peerState
// client-side stream
stream *stream
}

type stream struct {
queue queue
conn *grpc.ClientConn
client Federation_EventStreamClient
close chan struct{}
errOnce sync.Once
Expand Down Expand Up @@ -177,12 +181,15 @@ func (p *peer) stop() {
close(p.exit)
}
p.stateMu.Lock()
if p.state == peerStateStreaming {
_ = p.stream.client.CloseSend()
state := p.state
if state == peerStateStreaming {
_ = p.stream.conn.Close()
}
p.state = peerStateStopped
p.stateMu.Unlock()
p.stream.wg.Wait()
if state == peerStateStreaming {
p.stream.wg.Wait()
}
}

func (p *peer) serveEventStream() {
Expand Down Expand Up @@ -210,7 +217,7 @@ func (p *peer) serveEventStream() {
}
}

func (p *peer) initStream(client FederationClient) (s *stream, err error) {
func (p *peer) initStream(client FederationClient, conn *grpc.ClientConn) (s *stream, err error) {
p.stateMu.Lock()
defer func() {
if err == nil {
Expand Down Expand Up @@ -265,6 +272,7 @@ func (p *peer) initStream(client FederationClient) (s *stream, err error) {
p.queue.open()
s = &stream{
queue: p.queue,
conn: conn,
client: c,
close: make(chan struct{}),
}
Expand All @@ -291,7 +299,7 @@ func (p *peer) serveStream(reconnectCount int, backoff *time.Timer) (err error)
return err
}
client := NewFederationClient(conn)
s, err := p.initStream(client)
s, err := p.initStream(client, conn)
if err != nil {
return err
}
Expand All @@ -309,7 +317,7 @@ func (s *stream) serve() error {
func (s *stream) setError(err error) {
s.errOnce.Do(func() {
s.queue.close()
s.client.CloseSend()
s.conn.Close()
close(s.close)
if err != nil && err != io.EOF {
log.Error("stream error", zap.Error(err))
Expand Down
8 changes: 2 additions & 6 deletions plugin/federation/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestPeer_initStream_CleanStart(t *testing.T) {
}).Times(4)

client.EXPECT().EventStream(gomock.Any())
_, err := p.initStream(client)
_, err := p.initStream(client, nil)

a.NoError(err)
for k, v := range msgEvents {
Expand Down Expand Up @@ -166,11 +166,7 @@ func TestPeer_initStream_CleanStartFalse(t *testing.T) {

client.EXPECT().EventStream(gomock.Any())

_, err := p.initStream(client)
_, err := p.initStream(client,nil)
a.NoError(err)

}

func TestEventQueue(t *testing.T) {

}

0 comments on commit 8f8990f

Please sign in to comment.