Skip to content

Commit 343acc7

Browse files
committed
improve std server in windows
1 parent 7faaba8 commit 343acc7

File tree

1 file changed

+38
-39
lines changed

1 file changed

+38
-39
lines changed

server_std.go

+38-39
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//go:build windows
12
// +build windows
23

34
package gev
@@ -28,7 +29,7 @@ type Handler interface {
2829
type Server struct {
2930
listener net.Listener
3031
callback Handler
31-
connections []*Connection
32+
connections stdsync.Map
3233

3334
timingWheel *timingwheel.TimingWheel
3435
opts *Options
@@ -71,33 +72,39 @@ func (s *Server) Start() {
7172
sw := sync.WaitGroupWrapper{}
7273
s.timingWheel.Start()
7374

74-
sw.AddAndRun(func() {
75-
for {
76-
select {
77-
case <-s.dying:
78-
return
75+
if s.opts.NumLoops <= 0 {
76+
s.opts.NumLoops = 1
77+
}
78+
for i := 0; i < s.opts.NumLoops; i++ {
79+
sw.AddAndRun(func() {
80+
for {
81+
select {
82+
case <-s.dying:
83+
return
7984

80-
default:
81-
conn, err := s.listener.Accept()
82-
if err != nil {
83-
log.Errorf("accept error: %v", err)
84-
continue
85+
default:
86+
conn, err := s.listener.Accept()
87+
if err != nil {
88+
log.Errorf("accept error: %v", err)
89+
continue
90+
}
91+
92+
connection := NewConnection(conn, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback)
93+
s.connections.Store(connection, struct{}{})
94+
95+
sw.AddAndRun(func() {
96+
connection.readLoop()
97+
})
98+
sw.AddAndRun(func() {
99+
connection.writeLoop()
100+
})
101+
sw.AddAndRun(func() {
102+
s.callback.OnConnect(connection)
103+
})
85104
}
86-
87-
connection := NewConnection(conn, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback)
88-
s.connections = append(s.connections, connection)
89-
90-
s.callback.OnConnect(connection)
91-
92-
sw.AddAndRun(func() {
93-
connection.readLoop()
94-
})
95-
sw.AddAndRun(func() {
96-
connection.writeLoop()
97-
})
98105
}
99-
}
100-
})
106+
})
107+
}
101108

102109
s.running.Set(true)
103110

@@ -116,9 +123,13 @@ func (s *Server) Stop() {
116123
log.Error(err)
117124
}
118125

119-
for _, c := range s.connections {
126+
s.connections.Range(func(key, value interface{}) bool {
127+
c := key.(*Connection)
120128
c.Close()
121-
}
129+
130+
return true
131+
})
132+
122133
}
123134
}
124135

@@ -253,8 +264,6 @@ func (c *Connection) Send(data interface{}, opts ...ConnectionOption) error {
253264
// Close 关闭连接
254265
func (c *Connection) Close() error {
255266
if c.connected.Get() {
256-
log.Info("Close ", c.PeerAddr())
257-
258267
close(c.dying)
259268
c.connected.Set(false)
260269
c.callBack.OnClose(c)
@@ -272,9 +281,6 @@ func (c *Connection) Close() error {
272281

273282
// ShutdownWrite 关闭可写端,等待读取完接收缓冲区所有数据
274283
func (c *Connection) ShutdownWrite() error {
275-
log.Info("ShutdownWrite ", c.PeerAddr())
276-
277-
//return nil
278284
return c.Close()
279285
}
280286

@@ -302,7 +308,6 @@ func (c *Connection) readLoop() {
302308
return
303309

304310
default:
305-
//c.conn.SetReadDeadline(time.Now().Add(time.Second))
306311
n, err := c.conn.Read(buf)
307312
if err != nil {
308313
if err != io.EOF {
@@ -345,8 +350,6 @@ func (c *Connection) writeLoop() {
345350
continue
346351
}
347352

348-
c.conn.SetWriteDeadline(time.Now().Add(time.Second))
349-
350353
first, end := c.outBuffer.PeekAll()
351354
n, err := c.conn.Write(first)
352355
if err != nil {
@@ -429,14 +432,10 @@ func (c *Connection) closeTimeoutConn() func() {
429432
return func() {
430433
now := time.Now()
431434
intervals := now.Sub(time.Unix(c.activeTime.Get(), 0))
432-
log.Info("closeTimeoutConn ", intervals)
433435

434436
if intervals >= c.idleTime {
435-
log.Info("closeTimeoutConn ", c.conn.RemoteAddr())
436437
_ = c.Close()
437438
} else {
438-
log.Info("timingWheel.AfterFunc ", c.idleTime-intervals)
439-
440439
timer := c.timingWheel.AfterFunc(c.idleTime-intervals, c.closeTimeoutConn())
441440
c.timer.Store(timer)
442441
}

0 commit comments

Comments
 (0)