Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,21 @@ func (server *mongoServer) acquireSocketInternal(info *DialInfo, shouldBlock boo
}()
}
timeSpentWaiting := time.Duration(0)
for len(server.liveSockets)-len(server.unusedSockets) >= info.PoolLimit && !timeoutHit {

/*
* @vinllen modification.
* Before:
* len(server.liveSockets)-len(server.unusedSockets) >= poolLimit
* Explanation:
* This is a bug that may cause the connection full. "liveSockets" means
* unused connections and used connections, "unusedSockets" means unused connections.
* However, a connection won't be released even if it's an "unused" connection.
* So this comparision only restrict the used connections but not the total connections.
* After:
* len(server.liveSockets) >= poolLimit && len(server.unusedSockets) == 0
*/
// for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit {
for len(server.liveSockets) >= info.PoolLimit && len(server.unusedSockets) == 0 && !timeoutHit {
// We only count time spent in Wait(), and not time evaluating the entire loop,
// so that in the happy non-blocking path where the condition above evaluates true
// first time, we record a nice round zero wait time.
Expand All @@ -185,14 +199,18 @@ func (server *mongoServer) acquireSocketInternal(info *DialInfo, shouldBlock boo
// Record that we fetched a connection of of a socket list and how long we spent waiting
stats.noticeSocketAcquisition(timeSpentWaiting)
} else {
if len(server.liveSockets)-len(server.unusedSockets) >= info.PoolLimit {
// no socket available
// if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit {
if len(server.liveSockets) >= info.PoolLimit && len(server.unusedSockets) == 0 {
server.Unlock()
return nil, false, errPoolLimit
}
}
}

n := len(server.unusedSockets)
if n > 0 {
// use unused socket
socket = server.unusedSockets[n-1]
server.unusedSockets[n-1] = nil // Help GC.
server.unusedSockets = server.unusedSockets[:n-1]
Expand All @@ -203,6 +221,7 @@ func (server *mongoServer) acquireSocketInternal(info *DialInfo, shouldBlock boo
continue
}
} else {
// build new one
server.Unlock()
socket, err = server.Connect(info)
if err == nil {
Expand Down Expand Up @@ -457,12 +476,16 @@ func (server *mongoServer) poolShrinker() {
end++
reclaimMap[s] = struct{}{}
}

// elements before index "end" are expired

tbr := server.unusedSockets[:end]
if end > 0 {
next := make([]*mongoSocket, unused-end)
copy(next, server.unusedSockets[end:])
server.unusedSockets = next
remainSockets := []*mongoSocket{}
// remove unused sockets from liveSockets
for _, s := range server.liveSockets {
if _, ok := reclaimMap[s]; !ok {
remainSockets = append(remainSockets, s)
Expand Down