Skip to content

Commit

Permalink
fix: barrier mem leak
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Nov 28, 2023
1 parent 27be26a commit cc243d4
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 11 deletions.
4 changes: 4 additions & 0 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ func (c *connection) closeCallback(needLock bool, needDetach bool) (err error) {
for callback := latest.(*callbackNode); callback != nil; callback = callback.pre {
callback.fn(c)
}
c.ctx = nil
c.closeCallbacks.Store(nil)
c.onConnectCallback.Store(nil)
c.onRequestCallback.Store(nil)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (c *connection) closeBuffer() {
}
if c.outputBuffer.Len() == 0 || onConnect != nil || onRequest != nil {
c.outputBuffer.Close()
c.outputBarrier.reset()
barrierPool.Put(c.outputBarrier)
}
}
Expand Down
32 changes: 21 additions & 11 deletions netpoll_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ package netpoll
import (
"context"
"errors"
"log"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -35,11 +38,12 @@ func newServer(ln Listener, opts *options, onQuit func(err error)) *server {
}

type server struct {
operator FDOperator
ln Listener
opts *options
onQuit func(err error)
connections sync.Map // key=fd, value=connection
operator FDOperator
ln Listener
opts *options
onQuit func(err error)
connections sync.Map // key=fd, value=connection
connectionsNum int64
}

// Run this server.
Expand Down Expand Up @@ -106,20 +110,26 @@ func (s *server) OnRead(p Poll) error {
return nil
}
// store & register connection
var connection = &connection{}
connection.init(conn.(Conn), s.opts)
if !connection.IsActive() {
var nconn = &connection{}
runtime.SetFinalizer(nconn, func(cc *connection) {
log.Printf("netpoll finalized connection: %v", cc)
})
nconn.init(conn.(Conn), s.opts)
if !nconn.IsActive() {
return nil
}
var fd = conn.(Conn).Fd()
connection.AddCloseCallback(func(connection Connection) error {
nconn.AddCloseCallback(func(nconn Connection) error {
s.connections.Delete(fd)
atomic.AddInt64(&s.connectionsNum, -1)
log.Println("netpoll close connection")
return nil
})
s.connections.Store(fd, connection)
s.connections.Store(fd, nconn)
atomic.AddInt64(&s.connectionsNum, 1)

// trigger onConnect asynchronously
connection.onConnect()
nconn.onConnect()
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions sys_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ type barrier struct {
ivs []syscall.Iovec
}

func (b *barrier) reset() {
for i := range b.bs {
b.bs[i] = nil
}
for i := range b.ivs {
b.ivs[i].Base = nil
b.ivs[i].Len = 0
}
}

// writev wraps the writev system call.
func writev(fd int, bs [][]byte, ivs []syscall.Iovec) (n int, err error) {
iovLen := iovecs(bs, ivs)
Expand Down

0 comments on commit cc243d4

Please sign in to comment.