@@ -90,6 +90,7 @@ type pipe struct {
90
90
r2ps bool // identify this pipe is used for resp2 pubsub or not
91
91
noNoDelay bool
92
92
optin bool
93
+ pingTimer * time.Timer
93
94
}
94
95
95
96
type pipeFn func (ctx context.Context , connFn func (ctx context.Context ) (net.Conn , error ), option * ClientOption ) (p * pipe , err error )
@@ -630,17 +631,18 @@ func (p *pipe) _backgroundRead() (err error) {
630
631
631
632
func (p * pipe ) backgroundPing () {
632
633
var prev , recv int32
633
- var timer * time.Timer
634
634
635
635
prev = atomic .LoadInt32 (& p .recvs )
636
- timer = time .AfterFunc (p .pinggap , func () {
636
+ p . pingTimer = time .AfterFunc (p .pinggap , func () {
637
637
var err error
638
638
recv = atomic .LoadInt32 (& p .recvs )
639
639
reset := false
640
640
defer func (){
641
641
prev = atomic .LoadInt32 (& p .recvs )
642
- if reset && timer != nil {
643
- timer .Reset (p .pinggap )
642
+ if reset {
643
+ p .pingTimer .Reset (p .pinggap )
644
+ } else {
645
+ p .pingTimer .Stop ()
644
646
}
645
647
}()
646
648
if recv != prev || atomic .LoadInt32 (& p .blcksig ) != 0 || (atomic .LoadInt32 (& p .state ) == 0 && atomic .LoadInt32 (& p .waits ) != 0 ) {
@@ -654,7 +656,7 @@ func (p *pipe) backgroundPing() {
654
656
case <- tm .C :
655
657
err = os .ErrDeadlineExceeded
656
658
case err = <- ch :
657
- // noop
659
+ tm . Stop ()
658
660
}
659
661
if err != nil && atomic .LoadInt32 (& p .blcksig ) != 0 {
660
662
err = nil
@@ -1595,6 +1597,9 @@ func (p *pipe) Close() {
1595
1597
}
1596
1598
atomic .AddInt32 (& p .waits , - 1 )
1597
1599
atomic .AddInt32 (& p .blcksig , - 1 )
1600
+ if p .pingTimer != nil {
1601
+ p .pingTimer .Stop ()
1602
+ }
1598
1603
if p .conn != nil {
1599
1604
p .conn .Close ()
1600
1605
}
0 commit comments