Skip to content

Commit 2864c93

Browse files
author
bruce
committedSep 29, 2018
init
1 parent 3618169 commit 2864c93

24 files changed

+2899
-0
lines changed
 

‎config/config.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package config
2+
3+
import (
4+
"github.com/bailu1901/lockstepserver/util"
5+
)
6+
7+
var (
8+
Cfg = Config{}
9+
)
10+
11+
type Config struct {
12+
OutAddress string
13+
InAddress string
14+
EtcdEndPionts string `xml:"etcd_endpoints"`
15+
EtcdKey string `xml:"etcd_key"`
16+
EtcdTTL int64 `xml:"etcd_ttl"`
17+
MaxRoom int `xml:"max_room"`
18+
}
19+
20+
func LoadConfig(file string) error {
21+
if err := util.LoadConfig(file, &Cfg); nil != err {
22+
return err
23+
}
24+
return nil
25+
}

‎kcp_server/kcp_test.go

+270
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
package kcp_server
2+
3+
import (
4+
"net"
5+
"sync"
6+
"sync/atomic"
7+
"testing"
8+
"time"
9+
10+
"github.com/bailu1901/lockstepserver/network"
11+
"github.com/xtaci/kcp-go"
12+
)
13+
14+
const (
15+
latency = time.Millisecond * 10
16+
)
17+
18+
var (
19+
numConn uint32 = 0
20+
numMsg uint32 = 0
21+
numDiscon uint32 = 0
22+
)
23+
24+
type testCallback struct {
25+
}
26+
27+
func (*testCallback) OnMessage(conn *network.Conn, msg network.Packet) bool {
28+
29+
atomic.AddUint32(&numMsg, 1)
30+
31+
//fmt.Println("OnMessage", conn.GetExtraData(), string(msg.(*network.DefaultPacket).GetBody()))
32+
conn.AsyncWritePacket(network.NewDefaultPacket([]byte("pong")), time.Second*1)
33+
return true
34+
}
35+
36+
func (*testCallback) OnConnect(conn *network.Conn) bool {
37+
id := atomic.AddUint32(&numConn, 1)
38+
conn.PutExtraData(id)
39+
//fmt.Println("OnConnect", conn.GetExtraData())
40+
return true
41+
}
42+
43+
func (*testCallback) OnClose(conn *network.Conn) {
44+
atomic.AddUint32(&numDiscon, 1)
45+
46+
//fmt.Println("OnDisconnect", conn.GetExtraData())
47+
}
48+
49+
func Test_KCPServer(t *testing.T) {
50+
51+
l, err := kcp.Listen(":10086")
52+
if nil != err {
53+
panic(err)
54+
}
55+
56+
config := &network.Config{
57+
PacketReceiveChanLimit: 1024,
58+
PacketSendChanLimit: 1024,
59+
ConnReadTimeout: latency,
60+
ConnWriteTimeout: latency,
61+
}
62+
63+
server := network.NewServer(config, &testCallback{}, &network.DefaultProtocol{})
64+
65+
go server.Start(l, func(conn net.Conn, i *network.Server) *network.Conn {
66+
kcpConn := conn.(*kcp.UDPSession)
67+
kcpConn.SetNoDelay(1, 10, 2, 1)
68+
kcpConn.SetStreamMode(true)
69+
kcpConn.SetWindowSize(4096, 4096)
70+
kcpConn.SetReadBuffer(4 * 1024 * 1024)
71+
kcpConn.SetWriteBuffer(4 * 1024 * 1024)
72+
kcpConn.SetACKNoDelay(true)
73+
74+
return network.NewConn(conn, server)
75+
})
76+
defer server.Stop()
77+
78+
time.Sleep(time.Second)
79+
80+
wg := sync.WaitGroup{}
81+
const max_con = 6000
82+
for i := 0; i < max_con; i++ {
83+
wg.Add(1)
84+
time.Sleep(time.Nanosecond)
85+
go func() {
86+
defer wg.Done()
87+
88+
c, e := kcp.Dial("127.0.0.1:10086")
89+
if nil != e {
90+
t.FailNow()
91+
}
92+
defer c.Close()
93+
94+
c.Write(network.NewDefaultPacket([]byte("ping")).Serialize())
95+
b := make([]byte, 1024)
96+
c.SetReadDeadline(time.Now().Add(latency))
97+
if _, e := c.Read(b); nil != e {
98+
t.Fatalf("error:%s", e.Error())
99+
}
100+
101+
//time.Sleep(time.Second)
102+
}()
103+
}
104+
105+
wg.Wait()
106+
time.Sleep(time.Second * 2)
107+
108+
n := atomic.LoadUint32(&numConn)
109+
if n != max_con {
110+
t.Errorf("numConn[%d] should be [%d]", n, max_con)
111+
}
112+
113+
n = atomic.LoadUint32(&numMsg)
114+
if n != max_con {
115+
t.Errorf("numMsg[%d] should be [%d]", n, max_con)
116+
}
117+
118+
n = atomic.LoadUint32(&numDiscon)
119+
if n != max_con {
120+
t.Errorf("numDiscon[%d] should be [%d]", n, max_con)
121+
}
122+
}
123+
124+
func Benchmark_KCPServer(b *testing.B) {
125+
126+
l, err := kcp.Listen(":10086")
127+
if nil != err {
128+
panic(err)
129+
}
130+
131+
config := &network.Config{
132+
PacketReceiveChanLimit: 1024,
133+
PacketSendChanLimit: 1024,
134+
}
135+
136+
server := network.NewServer(config, &testCallback{}, &network.DefaultProtocol{})
137+
138+
go server.Start(l, func(conn net.Conn, i *network.Server) *network.Conn {
139+
kcpConn := conn.(*kcp.UDPSession)
140+
kcpConn.SetNoDelay(1, 10, 2, 1)
141+
kcpConn.SetStreamMode(true)
142+
kcpConn.SetWindowSize(4096, 4096)
143+
kcpConn.SetReadBuffer(4 * 1024 * 1024)
144+
kcpConn.SetWriteBuffer(4 * 1024 * 1024)
145+
kcpConn.SetACKNoDelay(true)
146+
147+
return network.NewConn(conn, server)
148+
})
149+
150+
time.Sleep(time.Millisecond * 100)
151+
152+
wg := sync.WaitGroup{}
153+
var max_con uint32 = 0
154+
c, e := kcp.Dial("127.0.0.1:10086")
155+
if nil != e {
156+
b.FailNow()
157+
}
158+
159+
go func() {
160+
for {
161+
buf := make([]byte, 1024)
162+
c.SetReadDeadline(time.Now().Add(time.Second * 2))
163+
_, er := c.Read(buf)
164+
if nil != er {
165+
//b.FailNow()
166+
return
167+
}
168+
wg.Done()
169+
}
170+
171+
}()
172+
173+
for i := 0; i < b.N; i++ {
174+
max_con++
175+
176+
wg.Add(1)
177+
go func() {
178+
179+
c.Write(network.NewDefaultPacket([]byte("ping")).Serialize())
180+
181+
//time.Sleep(time.Second)
182+
}()
183+
}
184+
185+
wg.Wait()
186+
//time.Sleep(time.Second * 2)
187+
server.Stop()
188+
189+
n := atomic.LoadUint32(&numMsg)
190+
b.Logf("numMsg[%d]", n)
191+
if n != numMsg {
192+
b.Errorf("numMsg[%d] should be [%d]", n, max_con)
193+
}
194+
/*
195+
n = atomic.LoadUint32(&numConn)
196+
b.Logf("numConn[%d]", n)
197+
if n != max_con {
198+
b.Errorf("numConn[%d] should be [%d]", n, max_con)
199+
}
200+
201+
202+
203+
n = atomic.LoadUint32(&numDiscon)
204+
b.Logf("numDiscon[%d]", n)
205+
if n != numDiscon {
206+
b.Errorf("numDiscon[%d] should be [%d]", n, max_con)
207+
}
208+
*/
209+
}
210+
211+
func Test_TCPServer(t *testing.T) {
212+
213+
l, err := net.Listen("tcp", ":10086")
214+
if nil != err {
215+
panic(err)
216+
}
217+
218+
config := &network.Config{
219+
PacketReceiveChanLimit: 1024,
220+
PacketSendChanLimit: 1024,
221+
ConnReadTimeout: time.Millisecond * 50,
222+
ConnWriteTimeout: time.Millisecond * 50,
223+
}
224+
225+
server := network.NewServer(config, &testCallback{}, &network.DefaultProtocol{})
226+
227+
go server.Start(l, func(conn net.Conn, i *network.Server) *network.Conn {
228+
return network.NewConn(conn, server)
229+
})
230+
231+
time.Sleep(time.Second)
232+
233+
wg := sync.WaitGroup{}
234+
const max_con = 2000
235+
for i := 0; i < max_con; i++ {
236+
wg.Add(1)
237+
go func() {
238+
defer wg.Done()
239+
c, e := net.Dial("tcp", "127.0.0.1:10086")
240+
if nil != e {
241+
t.FailNow()
242+
}
243+
defer c.Close()
244+
c.Write(network.NewDefaultPacket([]byte("ping")).Serialize())
245+
b := make([]byte, 1024)
246+
c.SetReadDeadline(time.Now().Add(time.Second * 2))
247+
c.Read(b)
248+
//time.Sleep(time.Second)
249+
}()
250+
}
251+
252+
wg.Wait()
253+
//time.Sleep(max_sleep)
254+
server.Stop()
255+
256+
n := atomic.LoadUint32(&numConn)
257+
if n != max_con {
258+
t.Errorf("numConn[%d] should be [%d]", n, max_con)
259+
}
260+
261+
n = atomic.LoadUint32(&numMsg)
262+
if n != max_con {
263+
t.Errorf("numMsg[%d] should be [%d]", n, max_con)
264+
}
265+
266+
n = atomic.LoadUint32(&numDiscon)
267+
if n != max_con {
268+
t.Errorf("numDiscon[%d] should be [%d]", n, max_con)
269+
}
270+
}

‎kcp_server/server.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package kcp_server
2+
3+
import (
4+
"net"
5+
"time"
6+
7+
"github.com/bailu1901/lockstepserver/network"
8+
"github.com/xtaci/kcp-go"
9+
)
10+
11+
func ListenAndServe(addr string, callback network.ConnCallback, protocol network.Protocol) (*network.Server, error) {
12+
dupConfig := &network.Config{
13+
PacketReceiveChanLimit: 1024,
14+
PacketSendChanLimit: 1024,
15+
ConnReadTimeout: time.Second * 5,
16+
ConnWriteTimeout: time.Second * 5,
17+
}
18+
19+
l, err := kcp.Listen(addr)
20+
if nil != err {
21+
return nil, err
22+
}
23+
24+
server := network.NewServer(dupConfig, callback, protocol)
25+
go server.Start(l, func(conn net.Conn, i *network.Server) *network.Conn {
26+
27+
//普通模式
28+
//setKCPConfig(32, 32, 0, 40, 0, 0, 100, 1400)
29+
30+
//极速模式
31+
//setKCPConfig(32, 32, 1, 10, 2, 1, 30, 1400)
32+
33+
//普通模式:ikcp_nodelay(kcp, 0, 40, 0, 0); 极速模式: ikcp_nodelay(kcp, 1, 10, 2, 1);
34+
35+
kcpConn := conn.(*kcp.UDPSession)
36+
kcpConn.SetNoDelay(1, 10, 2, 1)
37+
kcpConn.SetStreamMode(true)
38+
kcpConn.SetWindowSize(4096, 4096)
39+
kcpConn.SetReadBuffer(4 * 1024 * 1024)
40+
kcpConn.SetWriteBuffer(4 * 1024 * 1024)
41+
kcpConn.SetACKNoDelay(true)
42+
43+
return network.NewConn(conn, server)
44+
})
45+
46+
return server, nil
47+
}

‎main.go

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package main
2+
3+
//-----------------------------------
4+
//File:main.go
5+
//Date:2018年8月23日
6+
//Desc:帧同步战斗服务器
7+
//Auth:Bruce
8+
//-----------------------------------
9+
10+
import (
11+
"flag"
12+
"fmt"
13+
"net/http"
14+
_ "net/http/pprof"
15+
"os"
16+
"os/signal"
17+
"strings"
18+
"syscall"
19+
"time"
20+
21+
"github.com/bailu1901/lockstepserver/kcp_server"
22+
"github.com/bailu1901/lockstepserver/protocol"
23+
"github.com/bailu1901/lockstepserver/room"
24+
_ "github.com/bailu1901/lockstepserver/web"
25+
26+
"github.com/bailu1901/lockstepserver/config"
27+
28+
"github.com/bailu1901/lockstepserver/util"
29+
30+
l4g "github.com/alecthomas/log4go"
31+
)
32+
33+
var (
34+
nodeId = flag.Uint64("id", 0, "id")
35+
configFile = flag.String("config", "config.xml", "config file")
36+
gWeb = flag.String("web", "", "web listen address")
37+
outAddress = flag.String("out", ":10086", "out listen address(':10086' means use $localip:10086)")
38+
)
39+
40+
//LoadConfig 加载配置
41+
func LoadConfig() bool {
42+
43+
if err := config.LoadConfig(*configFile); err != nil {
44+
panic(fmt.Sprintf("[main] load config %v fail: %v", *configFile, err))
45+
}
46+
47+
config.Cfg.OutAddress = *outAddress
48+
temp := strings.Split(config.Cfg.OutAddress, ":")
49+
if 0 == len(temp[0]) {
50+
config.Cfg.OutAddress = util.GetOutboundIP().String() + config.Cfg.OutAddress
51+
}
52+
53+
return true
54+
}
55+
56+
//Init 初始化
57+
func Init() bool {
58+
if len(*gWeb) > 0 {
59+
60+
go func() {
61+
e := http.ListenAndServe(fmt.Sprintf(":%s", *gWeb), nil)
62+
if nil != e {
63+
panic(e)
64+
}
65+
}()
66+
l4g.Info("[main] http.ListenAndServe port=[%s]", *gWeb)
67+
}
68+
69+
return true
70+
}
71+
72+
//Run 运行
73+
func Run() {
74+
75+
defer func() {
76+
//clear
77+
time.Sleep(time.Millisecond * 100)
78+
l4g.Warn("[main] pvp %d quit", *nodeId)
79+
l4g.Global.Close()
80+
}()
81+
82+
defer room.Stop()
83+
84+
//udp server
85+
networkServer, err := kcp_server.ListenAndServe(config.Cfg.OutAddress, &room.Router{}, &protocol.MsgProtocol{})
86+
if nil != err {
87+
panic(err)
88+
}
89+
l4g.Info("[main] kcp.Listen addr=[%s]", config.Cfg.OutAddress)
90+
defer networkServer.Stop()
91+
92+
l4g.Info("[main] cluster start! etcd=[%s] key=[%s]", config.Cfg.EtcdEndPionts, config.Cfg.EtcdKey)
93+
94+
//主循环定时器
95+
ticker := time.NewTimer(time.Second)
96+
defer ticker.Stop()
97+
98+
sigs := make(chan os.Signal, 1)
99+
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, os.Interrupt)
100+
101+
l4g.Warn("[main] %d running...", *nodeId)
102+
//主循环
103+
QUIT:
104+
for {
105+
select {
106+
case sig := <-sigs:
107+
l4g.Info("Signal: %s", sig.String())
108+
if sig == syscall.SIGHUP {
109+
110+
} else {
111+
break QUIT
112+
}
113+
case <-ticker.C:
114+
//break QUIT
115+
}
116+
117+
}
118+
119+
l4g.Info("[main] pvp %d quiting...", *nodeId)
120+
}
121+
122+
func main() {
123+
124+
showIP := false
125+
flag.BoolVar(&showIP, "ip", false, "show ip info")
126+
flag.Parse()
127+
if showIP {
128+
fmt.Println("GetOutboundIP", util.GetOutboundIP())
129+
fmt.Println("GetLocalIP", util.GetLocalIP())
130+
fmt.Println("GetExternalIP", util.GetExternalIP())
131+
os.Exit(0)
132+
}
133+
134+
if LoadConfig() && Init() {
135+
Run()
136+
} else {
137+
fmt.Printf("[main] launch fail")
138+
}
139+
140+
}

‎network/conn.go

+223
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
package network
2+
3+
import (
4+
"errors"
5+
"net"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
)
10+
11+
// Error type
12+
var (
13+
ErrConnClosing = errors.New("use of closed network connection")
14+
ErrWriteBlocking = errors.New("write packet was blocking")
15+
ErrReadBlocking = errors.New("read packet was blocking")
16+
)
17+
18+
// Conn exposes a set of callbacks for the various events that occur on a connection
19+
type Conn struct {
20+
srv *Server
21+
conn net.Conn // the raw connection
22+
extraData interface{} // to save extra data
23+
closeOnce sync.Once // close the conn, once, per instance
24+
closeFlag int32 // close flag
25+
closeChan chan struct{} // close chanel
26+
packetSendChan chan Packet // packet send chanel
27+
packetReceiveChan chan Packet // packeet receive chanel
28+
callback ConnCallback // callback
29+
}
30+
31+
// ConnCallback is an interface of methods that are used as callbacks on a connection
32+
type ConnCallback interface {
33+
// OnConnect is called when the connection was accepted,
34+
// If the return value of false is closed
35+
OnConnect(*Conn) bool
36+
37+
// OnMessage is called when the connection receives a packet,
38+
// If the return value of false is closed
39+
OnMessage(*Conn, Packet) bool
40+
41+
// OnClose is called when the connection closed
42+
OnClose(*Conn)
43+
}
44+
45+
// newConn returns a wrapper of raw conn
46+
func NewConn(conn net.Conn, srv *Server) *Conn {
47+
return &Conn{
48+
srv: srv,
49+
callback: srv.callback,
50+
conn: conn,
51+
closeChan: make(chan struct{}),
52+
packetSendChan: make(chan Packet, srv.config.PacketSendChanLimit),
53+
packetReceiveChan: make(chan Packet, srv.config.PacketReceiveChanLimit),
54+
}
55+
}
56+
57+
// GetExtraData gets the extra data from the Conn
58+
func (c *Conn) GetExtraData() interface{} {
59+
return c.extraData
60+
}
61+
62+
// PutExtraData puts the extra data with the Conn
63+
func (c *Conn) PutExtraData(data interface{}) {
64+
c.extraData = data
65+
}
66+
67+
// GetRawConn returns the raw net.TCPConn from the Conn
68+
func (c *Conn) GetRawConn() net.Conn {
69+
return c.conn
70+
}
71+
72+
// Close closes the connection
73+
func (c *Conn) Close() {
74+
c.closeOnce.Do(func() {
75+
atomic.StoreInt32(&c.closeFlag, 1)
76+
close(c.closeChan)
77+
close(c.packetSendChan)
78+
close(c.packetReceiveChan)
79+
c.conn.Close()
80+
c.callback.OnClose(c)
81+
})
82+
}
83+
84+
// IsClosed indicates whether or not the connection is closed
85+
func (c *Conn) IsClosed() bool {
86+
return atomic.LoadInt32(&c.closeFlag) == 1
87+
}
88+
89+
func (c *Conn) SetCallback(callback ConnCallback) {
90+
c.callback = callback
91+
}
92+
93+
// AsyncWritePacket async writes a packet, this method will never block
94+
func (c *Conn) AsyncWritePacket(p Packet, timeout time.Duration) (err error) {
95+
if c.IsClosed() {
96+
return ErrConnClosing
97+
}
98+
99+
defer func() {
100+
if e := recover(); e != nil {
101+
err = ErrConnClosing
102+
}
103+
}()
104+
105+
if timeout == 0 {
106+
select {
107+
case c.packetSendChan <- p:
108+
return nil
109+
110+
default:
111+
return ErrWriteBlocking
112+
}
113+
114+
} else {
115+
select {
116+
case c.packetSendChan <- p:
117+
return nil
118+
119+
case <-c.closeChan:
120+
return ErrConnClosing
121+
122+
case <-time.After(timeout):
123+
return ErrWriteBlocking
124+
}
125+
}
126+
}
127+
128+
// Do it
129+
func (c *Conn) Do() {
130+
if !c.callback.OnConnect(c) {
131+
return
132+
}
133+
134+
asyncDo(c.handleLoop, c.srv.waitGroup)
135+
asyncDo(c.readLoop, c.srv.waitGroup)
136+
asyncDo(c.writeLoop, c.srv.waitGroup)
137+
}
138+
139+
func (c *Conn) readLoop() {
140+
defer func() {
141+
recover()
142+
c.Close()
143+
}()
144+
145+
for {
146+
select {
147+
case <-c.srv.exitChan:
148+
return
149+
150+
case <-c.closeChan:
151+
return
152+
153+
default:
154+
}
155+
156+
c.conn.SetReadDeadline(time.Now().Add(c.srv.config.ConnReadTimeout))
157+
p, err := c.srv.protocol.ReadPacket(c.conn)
158+
if err != nil {
159+
return
160+
}
161+
162+
c.packetReceiveChan <- p
163+
}
164+
}
165+
166+
func (c *Conn) writeLoop() {
167+
defer func() {
168+
recover()
169+
c.Close()
170+
}()
171+
172+
for {
173+
select {
174+
case <-c.srv.exitChan:
175+
return
176+
177+
case <-c.closeChan:
178+
return
179+
180+
case p := <-c.packetSendChan:
181+
if c.IsClosed() {
182+
return
183+
}
184+
c.conn.SetWriteDeadline(time.Now().Add(c.srv.config.ConnWriteTimeout))
185+
if _, err := c.conn.Write(p.Serialize()); err != nil {
186+
return
187+
}
188+
}
189+
}
190+
}
191+
192+
func (c *Conn) handleLoop() {
193+
defer func() {
194+
recover()
195+
c.Close()
196+
}()
197+
198+
for {
199+
select {
200+
case <-c.srv.exitChan:
201+
return
202+
203+
case <-c.closeChan:
204+
return
205+
206+
case p := <-c.packetReceiveChan:
207+
if c.IsClosed() {
208+
return
209+
}
210+
if !c.callback.OnMessage(c, p) {
211+
return
212+
}
213+
}
214+
}
215+
}
216+
217+
func asyncDo(fn func(), wg *sync.WaitGroup) {
218+
wg.Add(1)
219+
go func() {
220+
fn()
221+
wg.Done()
222+
}()
223+
}

‎network/protocol.go

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package network
2+
3+
import (
4+
"encoding/binary"
5+
"errors"
6+
"io"
7+
)
8+
9+
type Packet interface {
10+
Serialize() []byte
11+
}
12+
13+
type Protocol interface {
14+
ReadPacket(conn io.Reader) (Packet, error)
15+
}
16+
17+
type DefaultPacket struct {
18+
buff []byte
19+
}
20+
21+
func (this *DefaultPacket) Serialize() []byte {
22+
return this.buff
23+
}
24+
25+
func (this *DefaultPacket) GetBody() []byte {
26+
return this.buff[4:]
27+
}
28+
29+
func NewDefaultPacket(buff []byte) *DefaultPacket {
30+
p := &DefaultPacket{}
31+
32+
p.buff = make([]byte, 4+len(buff))
33+
binary.BigEndian.PutUint32(p.buff[0:4], uint32(len(buff)))
34+
copy(p.buff[4:], buff)
35+
36+
return p
37+
}
38+
39+
type DefaultProtocol struct {
40+
}
41+
42+
func (this *DefaultProtocol) ReadPacket(r io.Reader) (Packet, error) {
43+
var (
44+
lengthBytes []byte = make([]byte, 4)
45+
length uint32
46+
)
47+
48+
// read length
49+
if _, err := io.ReadFull(r, lengthBytes); err != nil {
50+
return nil, err
51+
}
52+
if length = binary.BigEndian.Uint32(lengthBytes); length > 1024 {
53+
return nil, errors.New("the size of packet is larger than the limit")
54+
}
55+
56+
buff := make([]byte, length)
57+
58+
// read body ( buff = lengthBytes + body )
59+
if _, err := io.ReadFull(r, buff); err != nil {
60+
return nil, err
61+
}
62+
63+
return NewDefaultPacket(buff), nil
64+
}

‎network/server.go

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package network
2+
3+
import (
4+
"net"
5+
"sync"
6+
"time"
7+
)
8+
9+
type Config struct {
10+
PacketSendChanLimit uint32 // the limit of packet send channel
11+
PacketReceiveChanLimit uint32 // the limit of packet receive channel
12+
ConnReadTimeout time.Duration // read timeout
13+
ConnWriteTimeout time.Duration // write timeout
14+
}
15+
16+
type Server struct {
17+
config *Config // server configuration
18+
callback ConnCallback // message callbacks in connection
19+
protocol Protocol // customize packet protocol
20+
exitChan chan struct{} // notify all goroutines to shutdown
21+
waitGroup *sync.WaitGroup // wait for all goroutines
22+
closeOnce sync.Once
23+
listener net.Listener
24+
}
25+
26+
// NewServer creates a server
27+
func NewServer(config *Config, callback ConnCallback, protocol Protocol) *Server {
28+
return &Server{
29+
config: config,
30+
callback: callback,
31+
protocol: protocol,
32+
exitChan: make(chan struct{}),
33+
waitGroup: &sync.WaitGroup{},
34+
}
35+
}
36+
37+
type ConnectionCreator func(net.Conn, *Server) *Conn
38+
39+
// Start starts service
40+
func (s *Server) Start(listener net.Listener, create ConnectionCreator) {
41+
s.listener = listener
42+
s.waitGroup.Add(1)
43+
defer func() {
44+
s.waitGroup.Done()
45+
}()
46+
47+
for {
48+
select {
49+
case <-s.exitChan:
50+
return
51+
52+
default:
53+
}
54+
55+
conn, err := listener.Accept()
56+
if err != nil {
57+
continue
58+
}
59+
60+
s.waitGroup.Add(1)
61+
go func() {
62+
create(conn, s).Do()
63+
s.waitGroup.Done()
64+
}()
65+
}
66+
}
67+
68+
// Stop stops service
69+
func (s *Server) Stop() {
70+
s.closeOnce.Do(func() {
71+
close(s.exitChan)
72+
s.listener.Close()
73+
})
74+
75+
s.waitGroup.Wait()
76+
}

‎proto/build_proto.sh

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#!/bin/bash
2+
DIR="$( cd "$( dirname "$0" )" && pwd )"
3+
echo pwd=$DIR
4+
5+
function xrsh_get_osname()
6+
{
7+
uname -s
8+
}
9+
10+
xrsh_get_osname
11+
12+
get_char()
13+
{
14+
SAVEDSTTY=`stty -g`
15+
stty -echo
16+
stty raw
17+
dd if=/dev/tty bs=1 count=1 2> /dev/null
18+
stty -raw
19+
stty echo
20+
stty $SAVEDSTTY
21+
}
22+
23+
if [ $(uname -s) = 'Linux' ]; then
24+
PROTOC=$DIR/../../../deps/protobuf/bin/protoc
25+
else
26+
PROTOC=$DIR/../../../tools/protoc_win/protoc.exe
27+
fi

‎proto/message.pb.go

+421
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎proto/message.proto

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
syntax = "proto2";
2+
3+
package pb;
4+
5+
6+
enum ID {
7+
option allow_alias = true;
8+
9+
MSG_BEGIN = 0;
10+
11+
C2S_Connect = 1;
12+
S2C_Connect = 1;
13+
14+
C2S_Heartbeat = 2;
15+
S2C_Heartbeat = 2;
16+
17+
18+
C2S_JoinRoom = 10;
19+
S2C_JoinRoom = 10;
20+
C2S_Progress = 20;
21+
S2C_Progress = 20;
22+
C2S_Ready = 30;
23+
S2C_Ready = 30;
24+
S2C_Frame = 40;
25+
C2S_InputSkill = 50;
26+
S2C_InputSkill = 50;
27+
C2S_Result = 100;
28+
S2C_Result = 100;
29+
30+
MSG_END = 255;
31+
}
32+
33+
message C2S_ConnectMsg {
34+
optional string token = 1;
35+
}
36+
37+
message S2C_ConnectMsg {
38+
optional int32 errorCode = 1;
39+
}
40+
41+
message C2S_JoinRoomMsg {
42+
optional int32 room = 1;
43+
}
44+
45+
message S2C_JoinRoomMsg {
46+
optional uint64 id = 1;
47+
repeated uint64 others = 2;
48+
}
49+
50+
message C2S_ProgressMsg {
51+
optional int32 pro = 1;
52+
}
53+
54+
55+
message S2C_ProgressMsg {
56+
optional uint64 id = 1;
57+
optional int32 pro = 2;
58+
}
59+
60+
61+
message S2C_ReadyMsg {
62+
optional uint64 id = 1;
63+
}
64+
65+
66+
message C2S_InputSkillMsg {
67+
optional int32 sid = 1;
68+
optional int32 x = 2;
69+
optional int32 y = 3;
70+
}
71+
72+
message InputData {
73+
optional uint64 id = 1;
74+
optional int32 sid = 2;
75+
optional int32 x = 3;
76+
optional int32 y = 4;
77+
}
78+
79+
message FrameData {
80+
optional uint32 frameID = 1;
81+
repeated InputData input = 2;
82+
}
83+
84+
85+
message S2C_FrameMsg {
86+
repeated FrameData frames = 1;
87+
}
88+
89+

‎protocol/protocol.go

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package protocol
2+
3+
import (
4+
"encoding/binary"
5+
"errors"
6+
"io"
7+
8+
l4g "github.com/alecthomas/log4go"
9+
"github.com/bailu1901/lockstepserver/network"
10+
"github.com/golang/protobuf/proto"
11+
)
12+
13+
const (
14+
DataLen = 2
15+
MessageIDLen = 1
16+
17+
MinPacketLen = DataLen + MessageIDLen
18+
MaxPacketLen = (2 << 8) * DataLen
19+
MaxMessageID = (2 << 8) * MessageIDLen
20+
)
21+
22+
/*
23+
24+
s->c
25+
26+
|--totalDataLen(uint16)--|--msgIDLen(uint8)--|--------------data--------------|
27+
|-------------2----------|---------1---------|---------(totalDataLen-2-1)-----|
28+
29+
*/
30+
31+
//Packet 服务端发往客户端的消息
32+
type Packet struct {
33+
id uint8
34+
data []byte
35+
}
36+
37+
func (p *Packet) GetMessageID() uint8 {
38+
return p.id
39+
}
40+
41+
func (p *Packet) GetData() []byte {
42+
return p.data
43+
}
44+
45+
func (p *Packet) Serialize() []byte {
46+
buff := make([]byte, MinPacketLen, MinPacketLen)
47+
48+
dataLen := len(p.data)
49+
binary.BigEndian.PutUint16(buff, uint16(dataLen))
50+
51+
buff[DataLen] = p.id
52+
return append(buff, p.data...)
53+
}
54+
55+
func (p *Packet) UnmarshalPB(msg proto.Message) error {
56+
return proto.Unmarshal(p.data, msg)
57+
}
58+
59+
func NewPacket(id uint8, msg interface{}) *Packet {
60+
61+
p := &Packet{
62+
id: id,
63+
}
64+
65+
switch v := msg.(type) {
66+
case []byte:
67+
p.data = v
68+
case proto.Message:
69+
if mdata, err := proto.Marshal(v); err == nil {
70+
p.data = mdata
71+
} else {
72+
l4g.Error("[NewPacket] proto marshal msg: %d error: %v",
73+
id, err)
74+
return nil
75+
}
76+
case nil:
77+
default:
78+
l4g.Error("[NewPacket] error msg type msg: %d", id)
79+
return nil
80+
}
81+
82+
return p
83+
}
84+
85+
type MsgProtocol struct {
86+
}
87+
88+
func (p *MsgProtocol) ReadPacket(r io.Reader) (network.Packet, error) /*Packet*/ {
89+
90+
buff := make([]byte, MinPacketLen, MinPacketLen)
91+
92+
//data length
93+
if _, err := io.ReadFull(r, buff); err != nil {
94+
return nil, err
95+
}
96+
dataLen := binary.BigEndian.Uint16(buff)
97+
98+
if dataLen > MaxPacketLen {
99+
return nil, errors.New("data max")
100+
}
101+
102+
//id
103+
msg := &Packet{
104+
id: buff[DataLen],
105+
}
106+
107+
//data
108+
if dataLen > 0 {
109+
msg.data = make([]byte, dataLen, dataLen)
110+
if _, err := io.ReadFull(r, msg.data); err != nil {
111+
return nil, err
112+
}
113+
}
114+
115+
return msg, nil
116+
}

‎protocol/protocol_test.go

+144
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package protocol
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"encoding/json"
7+
"strings"
8+
"testing"
9+
10+
"github.com/bailu1901/lockstepserver/proto"
11+
"github.com/golang/protobuf/proto"
12+
)
13+
14+
func Test_SCPacket(t *testing.T) {
15+
16+
var sID int32 = 19234333
17+
msg := &pb.C2S_InputSkillMsg{
18+
Sid: proto.Int32(sID),
19+
X: proto.Int32(10),
20+
Y: proto.Int32(20),
21+
}
22+
raw, _ := proto.Marshal(msg)
23+
p := NewPacket(uint8(pb.ID_C2S_InputSkill), msg)
24+
if nil == p {
25+
t.Fail()
26+
}
27+
28+
buff := p.Serialize()
29+
30+
dataLen := binary.BigEndian.Uint16(buff[0:])
31+
if dataLen != uint16(len(raw)) {
32+
t.Error("dataLen != uint16(len(raw))")
33+
}
34+
35+
if MinPacketLen+dataLen != MinPacketLen+uint16(len(raw)) {
36+
t.Error("MinPacketLen+dataLen != MinPacketLen+uint16(len(raw))")
37+
}
38+
39+
id := buff[DataLen]
40+
if p.id != id {
41+
t.Error("uint8(ID_C2S_Connect) != id")
42+
}
43+
44+
msg1 := &pb.C2S_InputSkillMsg{}
45+
if err := proto.Unmarshal(buff[MinPacketLen:], msg1); nil != err {
46+
t.Error(err)
47+
}
48+
49+
if msg.GetSid() != msg1.GetSid() || msg.GetX() != msg1.GetX() || msg.GetY() != msg1.GetY() {
50+
t.Error("msg.Sid != data1.Sid || msg.X != data1.X || msg.Y != data1.Y")
51+
}
52+
}
53+
54+
func Benchmark_SCPacket(b *testing.B) {
55+
56+
var sID int32 = 19234333
57+
msg := &pb.C2S_InputSkillMsg{
58+
Sid: proto.Int32(sID),
59+
X: proto.Int32(10),
60+
Y: proto.Int32(20),
61+
}
62+
63+
for i := 0; i < b.N; i++ {
64+
NewPacket(uint8(pb.ID_C2S_InputSkill), msg)
65+
}
66+
67+
}
68+
69+
func Test_Packet(t *testing.T) {
70+
var sID int32 = 19234333
71+
msg := &pb.C2S_InputSkillMsg{
72+
Sid: proto.Int32(sID),
73+
X: proto.Int32(10),
74+
Y: proto.Int32(20000),
75+
}
76+
77+
temp, _ := proto.Marshal(msg)
78+
79+
p := &Packet{
80+
id: uint8(pb.ID_C2S_InputSkill),
81+
data: temp,
82+
}
83+
84+
b := p.Serialize()
85+
86+
r := strings.NewReader(string(b))
87+
88+
proto := &MsgProtocol{}
89+
90+
ret, err := proto.ReadPacket(r)
91+
if nil != err {
92+
t.Error(err)
93+
}
94+
95+
packet, _ := ret.(*Packet)
96+
if packet.GetMessageID() != p.id {
97+
t.Error("packet.GetMessageID() != uint8(ID_C2S_InputSkill)")
98+
}
99+
100+
if len(packet.data) != len(p.data) {
101+
t.Error("len(packet.data)!=len(p.data)")
102+
}
103+
104+
msg1 := &pb.C2S_InputSkillMsg{}
105+
err = packet.UnmarshalPB(msg1)
106+
if nil != err {
107+
t.Error(err)
108+
}
109+
if msg.GetSid() != msg1.GetSid() || msg.GetX() != msg1.GetX() || msg.GetY() != msg1.GetY() {
110+
t.Error("msg.Sid != data1.Sid || msg.X != data1.X || msg.Y != data1.Y")
111+
}
112+
}
113+
114+
func Benchmark_Packet(b *testing.B) {
115+
var sID int32 = 19234333
116+
msg := &pb.C2S_InputSkillMsg{
117+
Sid: proto.Int32(sID),
118+
X: proto.Int32(10),
119+
Y: proto.Int32(20000),
120+
}
121+
122+
temp, _ := json.Marshal(msg)
123+
124+
p := &Packet{
125+
id: uint8(pb.ID_C2S_InputSkill),
126+
data: temp,
127+
}
128+
129+
buf := p.Serialize()
130+
131+
//strings.NewReader(string(b))
132+
133+
proto := &MsgProtocol{}
134+
135+
r := bytes.NewBuffer(nil)
136+
137+
for i := 0; i < b.N; i++ {
138+
r.Write(buf)
139+
if _, err := proto.ReadPacket(r); nil != err {
140+
b.Error(err)
141+
}
142+
}
143+
144+
}

‎pvp.xml

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<config>
3+
<!-- etcdv3得节点id,用,分割 -->
4+
<etcd_endpoints>10.18.97.201:2379,10.18.97.201:3379</etcd_endpoints>
5+
<!-->etcdv3键值</-->
6+
<etcd_key>/game/x/pvp</etcd_key>
7+
<!-->etcdv3键值自动失效时间(秒)</-->
8+
<etcd_ttl>5</etcd_ttl>
9+
<!-->最大房间数</-->
10+
<max_room>1</max_room>
11+
</config>

‎room/game/game.go

+413
Large diffs are not rendered by default.

‎room/game/lockstep.go

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package game
2+
3+
type command struct {
4+
id uint64
5+
sid int32
6+
sx int32
7+
sy int32
8+
}
9+
10+
type frameData struct {
11+
idx uint32
12+
cmds []*command
13+
}
14+
15+
func newFrameData(index uint32) *frameData {
16+
f := &frameData{
17+
idx: index,
18+
cmds: make([]*command, 0),
19+
}
20+
21+
return f
22+
}
23+
24+
type lockstep struct {
25+
frames map[uint32]*frameData
26+
frameCount uint32
27+
}
28+
29+
func newLockstep() *lockstep {
30+
l := &lockstep{
31+
frames: make(map[uint32]*frameData),
32+
}
33+
34+
return l
35+
}
36+
37+
func (l *lockstep) reset() {
38+
l.frames = make(map[uint32]*frameData)
39+
l.frameCount = 0
40+
}
41+
42+
func (l *lockstep) getFrameCount() uint32 {
43+
return l.frameCount
44+
}
45+
46+
func (l *lockstep) pushCmd(cmd *command) {
47+
f, ok := l.frames[l.frameCount]
48+
if !ok {
49+
f = newFrameData(l.frameCount)
50+
l.frames[l.frameCount] = f
51+
}
52+
53+
f.cmds = append(f.cmds, cmd)
54+
55+
}
56+
57+
func (l *lockstep) tick() uint32 {
58+
l.frameCount++
59+
return l.frameCount
60+
}
61+
62+
func (l *lockstep) getRangeFrames(from, to uint32) []*frameData {
63+
ret := make([]*frameData, 0, 3)
64+
65+
for ; from <= to && from <= l.frameCount; from++ {
66+
f, ok := l.frames[from]
67+
if !ok {
68+
continue
69+
}
70+
ret = append(ret, f)
71+
}
72+
73+
return ret
74+
}
75+
76+
func (l *lockstep) getFrame(idx uint32) *frameData {
77+
78+
return l.frames[idx]
79+
}

‎room/game/player.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package game
2+
3+
import (
4+
"github.com/bailu1901/lockstepserver/network"
5+
)
6+
7+
type Player struct {
8+
id uint64
9+
Client *network.Conn
10+
Ready bool
11+
Online bool
12+
}
13+
14+
func NewPlayer(id uint64) *Player {
15+
p := &Player{
16+
id: id,
17+
}
18+
19+
return p
20+
}
21+
22+
func (p *Player) SendMessage(msg network.Packet) {
23+
24+
if !p.Online {
25+
return
26+
}
27+
28+
if nil != p.Client {
29+
p.Client.AsyncWritePacket(msg, 0)
30+
}
31+
}
32+
33+
func (p *Player) Cleanup() {
34+
35+
if nil != p.Client {
36+
p.Client.Close()
37+
}
38+
p.Client = nil
39+
p.Ready = false
40+
p.Online = false
41+
42+
}

‎room/manager.go

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package room
2+
3+
import (
4+
"sync"
5+
)
6+
7+
var (
8+
room map[uint64]*Room
9+
wg sync.WaitGroup
10+
rw sync.RWMutex
11+
)
12+
13+
func init() {
14+
room = make(map[uint64]*Room)
15+
}
16+
17+
func CreateRoom(id uint64, typeID int32, playerID []uint64, logicServer string) (*Room, bool) {
18+
rw.Lock()
19+
defer rw.Unlock()
20+
21+
r, ok := room[id]
22+
if ok {
23+
return nil, false
24+
}
25+
26+
r = NewRoom(id, typeID, playerID, logicServer)
27+
room[id] = r
28+
29+
go func() {
30+
wg.Add(1)
31+
defer func() {
32+
rw.Lock()
33+
delete(room, id)
34+
rw.Unlock()
35+
36+
wg.Done()
37+
}()
38+
r.Run()
39+
40+
}()
41+
42+
return r, true
43+
}
44+
45+
func GetRoom(id uint64) *Room {
46+
47+
rw.RLock()
48+
defer rw.RUnlock()
49+
50+
r, _ := room[id]
51+
return r
52+
}
53+
func Stop() {
54+
55+
rw.Lock()
56+
for _, v := range room {
57+
v.Stop()
58+
}
59+
room = make(map[uint64]*Room)
60+
rw.Unlock()
61+
62+
wg.Wait()
63+
}

‎room/room.go

+238
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package room
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/bailu1901/lockstepserver/network"
8+
9+
l4g "github.com/alecthomas/log4go"
10+
"github.com/bailu1901/lockstepserver/protocol"
11+
"github.com/bailu1901/lockstepserver/room/game"
12+
)
13+
14+
const (
15+
Frequency = 30 //每分钟心跳频率
16+
TickTimer = time.Second / Frequency //心跳Timer
17+
TimeoutTime = time.Minute * 30 //超时时间
18+
)
19+
20+
type packet struct {
21+
id uint64
22+
msg network.Packet
23+
}
24+
25+
// Room 战斗房间
26+
type Room struct {
27+
wg sync.WaitGroup
28+
29+
roomID uint64
30+
typeID int32
31+
timeStamp int64
32+
secretKey string
33+
logicServer string
34+
35+
exitChan chan struct{}
36+
msgQ chan *packet
37+
inChan chan *network.Conn
38+
outChan chan *network.Conn
39+
40+
game *game.Game
41+
}
42+
43+
// NewRoom 构造
44+
func NewRoom(id uint64, typeID int32, players []uint64, logicServer string) *Room {
45+
r := &Room{
46+
roomID: id,
47+
typeID: typeID,
48+
exitChan: make(chan struct{}),
49+
msgQ: make(chan *packet, 2048),
50+
outChan: make(chan *network.Conn, 8),
51+
inChan: make(chan *network.Conn, 8),
52+
timeStamp: time.Now().Unix(),
53+
logicServer: logicServer,
54+
secretKey: "test_room",
55+
}
56+
57+
r.game = game.NewGame(id, players, r)
58+
59+
return r
60+
}
61+
62+
// ID room ID
63+
func (r *Room) ID() uint64 {
64+
return r.roomID
65+
}
66+
67+
// SecretKey secret key
68+
func (r *Room) SecretKey() string {
69+
return r.secretKey
70+
}
71+
72+
// TimeStamp time stamp
73+
func (r *Room) TimeStamp() int64 {
74+
return r.timeStamp
75+
}
76+
77+
func (r *Room) OnJoinGame(id, pid uint64) {
78+
l4g.Warn("[room(%d)] onJoinGame %d", id, pid)
79+
}
80+
func (r *Room) OnGameStart(id uint64) {
81+
l4g.Warn("[room(%d)] onGameStart", id)
82+
}
83+
84+
func (r *Room) OnLeaveGame(id, pid uint64) {
85+
l4g.Warn("[room(%d)] onLeaveGame %d", id, pid)
86+
}
87+
func (r *Room) OnGameOver(id uint64) {
88+
l4g.Warn("[room(%d)] onGameOver", id)
89+
90+
r.wg.Add(1)
91+
92+
go func() error {
93+
defer r.wg.Done()
94+
/*
95+
conn, err := grpc.Dial(r.logicServer, grpc.WithInsecure())
96+
if nil != err {
97+
l4g.Error("[reporter] ReportResult error:[%s]", err.Error())
98+
return err
99+
}
100+
defer conn.Close()
101+
102+
client := pb.NewReportPVPResultServiceClient(conn)
103+
104+
req := &pb.ReportPVPResultRequest{
105+
BattleID: id,
106+
Winner: r.game.Result(),
107+
Data: []byte("12313123"),
108+
TotalTime: time.Now().Unix() - r.timeStamp,
109+
}
110+
111+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
112+
defer cancel()
113+
114+
if _, err := client.ReportResult(ctx, req); nil != err {
115+
// TODO
116+
l4g.Error("[reporter] ReportResult error:[%s] ret:[%+v]", err.Error(), *req)
117+
return err
118+
}
119+
*/
120+
return nil
121+
}()
122+
123+
}
124+
125+
// OnConnect network.Conn callback
126+
func (r *Room) OnConnect(conn *network.Conn) bool {
127+
128+
conn.SetCallback(r) //SetCallback只能在OnConnect里调
129+
r.inChan <- conn
130+
l4g.Warn("[room(%d)] OnConnect %d", r.roomID, conn.GetExtraData().(uint64))
131+
return true
132+
}
133+
134+
// OnMessage network.Conn callback
135+
func (r *Room) OnMessage(conn *network.Conn, msg network.Packet) bool {
136+
137+
id, ok := conn.GetExtraData().(uint64)
138+
if !ok {
139+
l4g.Error("[room] OnMessage error conn don't have id")
140+
return false
141+
}
142+
143+
p := &packet{
144+
id: id,
145+
msg: msg,
146+
}
147+
r.msgQ <- p
148+
149+
return true
150+
}
151+
152+
// OnClose network.Conn callback
153+
func (r *Room) OnClose(conn *network.Conn) {
154+
r.outChan <- conn
155+
if id, ok := conn.GetExtraData().(uint64); ok {
156+
l4g.Warn("[room(%d)] OnClose %d", r.roomID, id)
157+
} else {
158+
l4g.Warn("[room(%d)] OnClose no id", r.roomID)
159+
}
160+
161+
}
162+
163+
// Run 主循环
164+
func (r *Room) Run() {
165+
r.wg.Add(1)
166+
defer r.wg.Done()
167+
defer func() {
168+
/*
169+
err := recover()
170+
if nil != err {
171+
l4g.Error("[room(%d)] Run error:%+v", r.roomID, err)
172+
}*/
173+
r.game.Cleanup()
174+
l4g.Warn("[room(%d)] quit! total time=[%d]", r.roomID, time.Now().Unix()-r.timeStamp)
175+
}()
176+
177+
//心跳
178+
tickerTick := time.NewTicker(TickTimer)
179+
defer tickerTick.Stop()
180+
181+
//超时timer
182+
timeoutTimer := time.NewTimer(TimeoutTime)
183+
184+
l4g.Info("[room(%d)] running...", r.roomID)
185+
186+
LOOP:
187+
for {
188+
select {
189+
case <-r.exitChan:
190+
l4g.Error("[room(%d)] force exit", r.roomID)
191+
return
192+
case <-timeoutTimer.C:
193+
l4g.Error("[room(%d)] time out", r.roomID)
194+
break LOOP
195+
case msg := <-r.msgQ:
196+
r.game.ProcessMsg(msg.id, msg.msg.(*protocol.Packet))
197+
case <-tickerTick.C:
198+
if !r.game.Tick(time.Now().Unix()) {
199+
l4g.Info("[room(%d)] tick over", r.roomID)
200+
break LOOP
201+
}
202+
case c := <-r.inChan:
203+
id, ok := c.GetExtraData().(uint64)
204+
if ok {
205+
if r.game.JoinGame(id, c) {
206+
l4g.Info("[room(%d)] player[%d] join room ok", r.roomID, id)
207+
} else {
208+
l4g.Error("[room(%d)] player[%d] join room failed", r.roomID, id)
209+
c.Close()
210+
}
211+
} else {
212+
c.Close()
213+
l4g.Error("[room(%d)] inChan don't have id", r.roomID)
214+
}
215+
216+
case c := <-r.outChan:
217+
if id, ok := c.GetExtraData().(uint64); ok {
218+
r.game.LeaveGame(id)
219+
} else {
220+
c.Close()
221+
l4g.Error("[room(%d)] outChan don't have id", r.roomID)
222+
}
223+
}
224+
}
225+
226+
// TODO
227+
228+
for i := 3; i > 0; i-- {
229+
<-time.After(time.Second)
230+
l4g.Info("[room(%d)] quiting %d...", r.roomID, i)
231+
}
232+
}
233+
234+
// Stop 强制关闭
235+
func (r *Room) Stop() {
236+
close(r.exitChan)
237+
r.wg.Wait()
238+
}

‎room/router.go

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package room
2+
3+
import (
4+
"strconv"
5+
"strings"
6+
"sync/atomic"
7+
"time"
8+
9+
"github.com/bailu1901/lockstepserver/network"
10+
"github.com/bailu1901/lockstepserver/proto"
11+
"github.com/bailu1901/lockstepserver/protocol"
12+
13+
"fmt"
14+
15+
l4g "github.com/alecthomas/log4go"
16+
)
17+
18+
type Router struct {
19+
sessiondID uint64
20+
lost uint64
21+
}
22+
23+
func (m *Router) OnConnect(conn *network.Conn) bool {
24+
25+
id := atomic.AddUint64(&m.sessiondID, 1)
26+
l4g.Debug("OnConnect [%s] %d", conn.GetRawConn().RemoteAddr().String(), id)
27+
return true
28+
}
29+
30+
func (m *Router) OnMessage(conn *network.Conn, p network.Packet) bool {
31+
32+
msg := p.(*protocol.Packet)
33+
34+
l4g.Info("OnMessage [%s] msg=[%d] len=[%d]", conn.GetRawConn().RemoteAddr().String(), msg.GetMessageID(), len(msg.GetData()))
35+
36+
switch pb.ID(msg.GetMessageID()) {
37+
case pb.ID_C2S_Connect:
38+
conn.AsyncWritePacket(protocol.NewPacket(uint8(pb.ID_S2C_Connect), nil), time.Millisecond)
39+
40+
// TODO
41+
rec := &pb.C2S_ConnectMsg{}
42+
if nil != msg.UnmarshalPB(rec) {
43+
return false
44+
}
45+
46+
token := rec.GetToken()
47+
l4g.Trace("ID_C2S_Connect token=[%s]", token)
48+
49+
a := strings.Split(token, ",")
50+
if 2 != len(a) {
51+
return false
52+
}
53+
54+
rId, _ := strconv.ParseUint(a[0], 10, 64)
55+
id, _ := strconv.ParseUint(a[1], 10, 64)
56+
57+
r := GetRoom(rId)
58+
if nil == r {
59+
return false
60+
}
61+
62+
//id := atomic.AddUint64(&m.sessiondID, 1)
63+
64+
conn.PutExtraData(id)
65+
conn.AsyncWritePacket(protocol.NewPacket(uint8(pb.ID_S2C_Connect), nil), time.Millisecond)
66+
return r.OnConnect(conn)
67+
68+
case pb.ID_C2S_Heartbeat:
69+
conn.AsyncWritePacket(protocol.NewPacket(uint8(pb.ID_S2C_Heartbeat), nil), time.Millisecond)
70+
return true
71+
case pb.ID_MSG_END: //test
72+
conn.AsyncWritePacket(protocol.NewPacket(uint8(pb.ID_MSG_END), msg.GetData()), time.Millisecond)
73+
return true
74+
default:
75+
return false
76+
}
77+
78+
return false
79+
80+
}
81+
82+
func (m *Router) OnClose(conn *network.Conn) {
83+
id := atomic.AddUint64(&m.lost, 1)
84+
fmt.Println("OnClose:", id)
85+
86+
//atomic.AddUint64(&m.lost, 1)
87+
}

‎util/color_logger.go

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package util
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"os"
7+
8+
l4g "github.com/alecthomas/log4go"
9+
)
10+
11+
var stdout io.Writer = os.Stdout
12+
13+
/*
14+
前景色 背景色 颜色
15+
---------------------------------------
16+
30 40 黑色
17+
31 41 红色
18+
32 42 绿色
19+
33 43 黃色
20+
34 44 蓝色
21+
35 45 紫红色
22+
36 46 青蓝色
23+
37 47 白色
24+
*/
25+
var (
26+
levelColor = [...]int{30, 30, 32, 37, 37, 33, 31, 34}
27+
levelStrings = [...]string{"FNST", "FINE", "DEBG", "TRAC", "INFO", "WARN", "EROR", "CRIT"}
28+
)
29+
30+
/*
31+
fmt.Printf("%c[%dm(1 2 3 4)%c[0m ", 0x1B, 30, 0x1B)
32+
33+
for b := 40; b <= 47; b++ { // 背景色彩 = 40-47
34+
for f := 30; f <= 37; f++ { // 前景色彩 = 30-37
35+
for d := range []int{0, 1, 4, 5, 7, 8} { // 显示方式 = 0,1,4,5,7,8
36+
fmt.Fprintf(os.Stderr, " %c[%d;%d;%dm%s(1 2 3 4 )%c[0m ", 0x1B, d, b, f, "", 0x1B)
37+
}
38+
fmt.Println("")
39+
}
40+
fmt.Println("")
41+
}
42+
*/
43+
44+
const (
45+
colorSymbol = 0x1B
46+
)
47+
48+
// This is the standard writer that prints to standard output.
49+
type ConsoleLogWriter chan *l4g.LogRecord
50+
51+
// This creates a new ConsoleLogWriter
52+
func NewColorConsoleLogWriter() ConsoleLogWriter {
53+
records := make(ConsoleLogWriter, l4g.LogBufferLength)
54+
go records.run(stdout)
55+
return records
56+
}
57+
58+
func (w ConsoleLogWriter) run(out io.Writer) {
59+
var timestr string
60+
var timestrAt int64
61+
62+
for rec := range w {
63+
if at := rec.Created.UnixNano() / 1e9; at != timestrAt {
64+
timestr, timestrAt = rec.Created.Format("01/02/06 15:04:05"), at
65+
}
66+
fmt.Fprintf(out, "%c[%dm[%s] [%s] (%s) %s\n%c[0m",
67+
colorSymbol,
68+
levelColor[rec.Level],
69+
timestr,
70+
levelStrings[rec.Level],
71+
rec.Source,
72+
rec.Message,
73+
colorSymbol)
74+
}
75+
}
76+
77+
// This is the ConsoleLogWriter's output method. This will block if the output
78+
// buffer is full.
79+
func (w ConsoleLogWriter) LogWrite(rec *l4g.LogRecord) {
80+
w <- rec
81+
}
82+
83+
// Close stops the logger from sending messages to standard output. Attempts to
84+
// send log messages to this logger after a Close have undefined behavior.
85+
func (w ConsoleLogWriter) Close() {
86+
close(w)
87+
}

‎util/config.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package util
2+
3+
import (
4+
"encoding/xml"
5+
"io/ioutil"
6+
)
7+
8+
func LoadConfig(filename string, v interface{}) error {
9+
if contents, err := ioutil.ReadFile(filename); err != nil {
10+
return err
11+
} else {
12+
if err = xml.Unmarshal(contents, v); err != nil {
13+
return err
14+
}
15+
return nil
16+
}
17+
}
18+
19+
func SaveConfig(filename string, v interface{}) error {
20+
if contents, err := xml.MarshalIndent(v, " ", " "); err != nil {
21+
return err
22+
} else {
23+
if err = ioutil.WriteFile(filename, contents, 0644); err != nil {
24+
return err
25+
}
26+
return nil
27+
}
28+
}

‎util/ip.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package util
2+
3+
import (
4+
"io/ioutil"
5+
"net"
6+
"net/http"
7+
"strings"
8+
)
9+
10+
//GetOutboundIP 获得本机内网IP,这个函数只能在启动的时候掉,运行中掉有风险
11+
func GetOutboundIP() net.IP {
12+
conn, err := net.Dial("udp", "8.8.8.8:80")
13+
if err != nil {
14+
panic(err)
15+
}
16+
defer conn.Close()
17+
18+
localAddr := conn.LocalAddr().(*net.UDPAddr)
19+
20+
return localAddr.IP
21+
}
22+
23+
//GetExternalIP 获取公网IP
24+
func GetExternalIP() string {
25+
resp, err := http.Get("http://myexternalip.com/raw")
26+
if err != nil {
27+
return ""
28+
}
29+
defer resp.Body.Close()
30+
content, _ := ioutil.ReadAll(resp.Body)
31+
return strings.TrimSpace(string(content))
32+
}
33+
34+
//GetLocalIP 获得内网IP
35+
func GetLocalIP() string {
36+
addrs, err := net.InterfaceAddrs()
37+
38+
if err != nil {
39+
return ""
40+
}
41+
42+
for _, address := range addrs {
43+
44+
// 检查ip地址判断是否回环地址
45+
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
46+
if ipnet.IP.To4() != nil {
47+
return ipnet.IP.String()
48+
}
49+
50+
}
51+
}
52+
return ""
53+
}

‎web/html.go

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package web
2+
3+
const htmlStr = `<html>
4+
<head>
5+
<title>command</title>
6+
</head>
7+
<body>
8+
<style type="text/css">
9+
#all {
10+
color: #000000;
11+
background: #ececec;
12+
width: 300px;
13+
height: 200px;
14+
}
15+
16+
div {
17+
line-height: 40px;
18+
text-align: right;
19+
}
20+
21+
input {
22+
width: 400px;
23+
height: 20px;
24+
}
25+
26+
select {
27+
padding: 5px 82px;
28+
}
29+
</style>
30+
31+
32+
33+
<label for="API">Room:</label>
34+
<p>
35+
<!-- <input type="text" name="API" id="api" value="help" rows="100"> -->
36+
<textarea id="room" name="API" rows="2" cols="30">1</textarea>
37+
38+
39+
</p>
40+
41+
<label for="API">Member:</label>
42+
<p>
43+
<textarea id="member" name="API" rows="2" cols="30">1,2</textarea>
44+
</p>
45+
46+
<p>
47+
<button id="btn" style="height:50px;width:50px">click</button>
48+
</p>
49+
50+
<label for="Return">Return:</label>
51+
<p>
52+
<textarea id="return" name="summary" rows="50" cols="120"></textarea>
53+
</p>
54+
55+
</body>
56+
<script src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>
57+
58+
<script type="text/javascript">
59+
60+
61+
$("#btn").on("click", function () {
62+
var param = $("#room").val()
63+
var param1 = $("#member").val()
64+
65+
66+
var url1 = window.location.href +'/create?room='+param+'&member='+param1
67+
68+
document.getElementById("return").value = "waiting result..."
69+
70+
console.log(url1)
71+
$.ajax({
72+
url: url1,
73+
type: "GET",
74+
contentType: "application/json; charset=utf-8",
75+
//dataType: "json",
76+
77+
//data: param,
78+
79+
success: function (res) {
80+
document.getElementById("return").value = res;
81+
},
82+
83+
error: function(xhr,textStatus){
84+
document.getElementById("return").value = "error, state = " + textStatus
85+
}
86+
})
87+
88+
})
89+
90+
</script>
91+
</html>`

‎web/web.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package web
2+
3+
import (
4+
"fmt"
5+
"html/template"
6+
"net/http"
7+
"strconv"
8+
"strings"
9+
10+
"github.com/bailu1901/lockstepserver/room"
11+
)
12+
13+
func init() {
14+
http.HandleFunc("/create", HTTPHandleFuncCreate)
15+
http.HandleFunc("/", HTTPHandleFunc)
16+
}
17+
func HTTPHandleFunc(w http.ResponseWriter, r *http.Request) {
18+
19+
query := r.URL.Query()
20+
if 0 == len(query) {
21+
t, err := template.New("test").Parse(htmlStr)
22+
if err != nil {
23+
w.Write([]byte("error"))
24+
} else {
25+
t.Execute(w, nil)
26+
}
27+
return
28+
}
29+
}
30+
31+
func HTTPHandleFuncCreate(w http.ResponseWriter, r *http.Request) {
32+
33+
ret := "error"
34+
35+
defer func() {
36+
w.Write([]byte(ret))
37+
}()
38+
39+
query := r.URL.Query()
40+
41+
roomStr := query.Get("room")
42+
roomID, _ := strconv.ParseUint(roomStr, 10, 64)
43+
44+
ps := make([]uint64, 0, 10)
45+
46+
members := query.Get("member")
47+
if len(members) > 0 {
48+
49+
a := strings.Split(members, ",")
50+
51+
for _, v := range a {
52+
id, _ := strconv.ParseUint(v, 10, 64)
53+
ps = append(ps, id)
54+
}
55+
56+
}
57+
58+
room, ok := room.CreateRoom(roomID, 0, ps, "test")
59+
if ok {
60+
ret = fmt.Sprintf("room.ID=[%d] room.Secret=[%s] room.Time=[%d]", room.ID(), room.SecretKey(), room.TimeStamp())
61+
} else {
62+
ret = "failed!"
63+
}
64+
65+
}

0 commit comments

Comments
 (0)
Please sign in to comment.