Skip to content

Commit e10d464

Browse files
Improve Stream Closure Logs, Remove advertise logs (#4895)
* add reason for stream closure * fix p2p stream tests, add reason to test stream closure * adjust log level of advertise messages to debug for cleaner output * remove logs for stream removal cool down
1 parent 557d2ae commit e10d464

File tree

6 files changed

+13
-13
lines changed

6 files changed

+13
-13
lines changed

p2p/stream/common/requestmanager/interface_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (st *testStream) ProtoSpec() (sttypes.ProtoSpec, error) {
107107
return sttypes.ProtoIDToProtoSpec(testProtoID)
108108
}
109109

110-
func (st *testStream) Close() error {
110+
func (st *testStream) Close(reason string) error {
111111
return nil
112112
}
113113

p2p/stream/common/streammanager/interface_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (st *testStream) ResetFailedTimes() {
8383
return
8484
}
8585

86-
func (st *testStream) Close() error {
86+
func (st *testStream) Close(reason string) error {
8787
if st.closed {
8888
return errors.New("already closed")
8989
}

p2p/stream/common/streammanager/streammanager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func (sm *streamManager) loop() {
212212
// NewStream handles a new stream from stream handler protocol
213213
func (sm *streamManager) NewStream(stream sttypes.Stream) error {
214214
if err := sm.sanityCheckStream(stream); err != nil {
215-
stream.Close()
215+
stream.Close("stream sanity check failed")
216216
return errors.Wrap(err, "stream sanity check failed")
217217
}
218218
task := addStreamTask{

p2p/stream/protocols/sync/protocol.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ func (p *Protocol) HandleStream(raw libp2p_network.Stream) {
200200
st := p.wrapStream(raw)
201201
if err := p.sm.NewStream(st); err != nil {
202202
// Possibly we have reach the hard limit of the stream
203-
if !errors.Is(err, streammanager.ErrStreamAlreadyExist) {
203+
if !errors.Is(err, streammanager.ErrStreamAlreadyExist) && !errors.Is(err, streammanager.ErrStreamRemovalNotExpired) {
204204
p.logger.Warn().Err(err).Str("stream ID", string(st.ID())).
205205
Msg("failed to add new stream")
206206
}
@@ -281,7 +281,7 @@ func (p *Protocol) advertise() time.Duration {
281281

282282
if err == nil {
283283
newPeersDiscovered = true
284-
p.logger.Info().
284+
p.logger.Debug().
285285
Str("protocol", string(pid)).
286286
Dur("elapsed(sec)", time.Duration(elapsed.Seconds())).
287287
Int("retry", retries).
@@ -319,7 +319,7 @@ func (p *Protocol) advertise() time.Duration {
319319
}
320320

321321
if err != nil {
322-
p.logger.Error().Err(err).
322+
p.logger.Debug().Err(err).
323323
Str("protocol", string(pid)).
324324
Msg("Advertise failed after retries")
325325
continue
@@ -379,7 +379,7 @@ func (p *Protocol) RemoveStream(stID sttypes.StreamID, reason string) {
379379
st, exist := p.sm.GetStreamByID(stID)
380380
if exist && st != nil {
381381
//TODO: log this incident with reason
382-
st.Close()
382+
st.Close(reason)
383383
p.logger.Info().
384384
Str("stream ID", string(stID)).
385385
Str("reason", reason).
@@ -397,7 +397,7 @@ func (p *Protocol) StreamFailed(stID sttypes.StreamID, reason string) {
397397
Str("reason", reason).
398398
Msg("stream failed")
399399
if st.Failures() >= MaxStreamFailures {
400-
st.Close()
400+
st.Close("too many failures")
401401
p.logger.Warn().
402402
Str("stream ID", string(st.ID())).
403403
Str("reason", "too many failures").

p2p/stream/protocols/sync/stream.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (st *syncStream) readMsgLoop() {
7474
default:
7575
msg, err := st.readMsg()
7676
if err != nil {
77-
if err := st.Close(); err != nil {
77+
if err := st.Close("read msg failed"); err != nil {
7878
st.logger.Err(err).Msg("failed to close sync stream")
7979
}
8080
return
@@ -125,7 +125,7 @@ func (st *syncStream) handleReqLoop() {
125125
if err != nil {
126126
st.logger.Info().Err(err).Str("request", req.String()).
127127
Msg("handle request error. Closing stream")
128-
if err := st.Close(); err != nil {
128+
if err := st.Close("handle request error"); err != nil {
129129
st.logger.Err(err).Msg("failed to close sync stream")
130130
}
131131
return
@@ -150,13 +150,13 @@ func (st *syncStream) handleRespLoop() {
150150
}
151151

152152
// Close stops the stream handling and closes the underlying stream
153-
func (st *syncStream) Close() error {
153+
func (st *syncStream) Close(reason string) error {
154154
notClosed := atomic.CompareAndSwapUint32(&st.closeStat, 0, 1)
155155
if !notClosed {
156156
// Already closed by another goroutine. Directly return
157157
return nil
158158
}
159-
if err := st.protocol.sm.RemoveStream(st.ID(), "force close"); err != nil {
159+
if err := st.protocol.sm.RemoveStream(st.ID(), "force close: "+reason); err != nil {
160160
st.logger.Err(err).Str("stream ID", string(st.ID())).
161161
Msg("failed to remove sync stream on close")
162162
}

p2p/stream/types/stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type Stream interface {
2020
ProtoSpec() (ProtoSpec, error)
2121
WriteBytes([]byte) error
2222
ReadBytes() ([]byte, error)
23-
Close() error
23+
Close(reason string) error
2424
CloseOnExit() error
2525
Failures() int32
2626
AddFailedTimes(faultRecoveryThreshold time.Duration)

0 commit comments

Comments
 (0)