forked from TarsCloud/TarsGo
-
Notifications
You must be signed in to change notification settings - Fork 4
/
servant.go
130 lines (118 loc) · 2.92 KB
/
servant.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package tars
import (
"context"
"strings"
"sync"
"sync/atomic"
"time"
"tars/model"
"tars/protocol/res/basef"
"tars/protocol/res/requestf"
"tars/util/tools"
)
var (
maxInt32 int32 = 1<<31 - 1
msgID int32
)
//ServantProxy is the struct for proxy servants.
type ServantProxy struct {
name string
comm *Communicator
obj *ObjectProxy
timeout int
}
//Init init the ServantProxy struct.
func (s *ServantProxy) Init(comm *Communicator, objName string) {
pos := strings.Index(objName, "@")
if pos > 0 {
s.name = objName[0:pos]
} else {
s.name = objName
}
s.comm = comm
of := new(ObjectProxyFactory)
of.Init(comm)
s.timeout = s.comm.Client.AsyncInvokeTimeout
s.obj = of.GetObjectProxy(objName)
}
//TarsSetTimeout sets the timeout for client calling the server , which is in ms.
func (s *ServantProxy) TarsSetTimeout(t int) {
s.timeout = t
}
//TarsSetProtocol tars set model protocol
func (s *ServantProxy) TarsSetProtocol(proto model.Protocol) {
s.obj.TarsSetProtocol(proto)
}
//Tars_invoke is use for client inoking server.
func (s *ServantProxy) Tars_invoke(ctx context.Context, ctype byte,
sFuncName string,
buf []byte,
status map[string]string,
reqContext map[string]string,
Resp *requestf.ResponsePacket) error {
defer checkPanic()
//TODO 重置sid,防止溢出
atomic.CompareAndSwapInt32(&msgID, maxInt32, 1)
req := requestf.RequestPacket{
IVersion: 1,
CPacketType: 0,
IRequestId: atomic.AddInt32(&msgID, 1),
SServantName: s.name,
SFuncName: sFuncName,
SBuffer: tools.ByteToInt8(buf),
ITimeout: ReqDefaultTimeout,
Context: reqContext,
Status: status,
}
msg := &Message{Req: &req, Ser: s, Obj: s.obj}
msg.Init()
var err error
if allFilters.cf != nil {
err = allFilters.cf(ctx, msg, s.obj.Invoke, time.Duration(s.timeout)*time.Millisecond)
} else {
err = s.obj.Invoke(ctx, msg, time.Duration(s.timeout)*time.Millisecond)
}
if err != nil {
TLOG.Errorf("Invoke Obj:%s,fun:%s,error:%s", s.name, sFuncName, err.Error())
if msg.Resp == nil {
ReportStat(msg, 0, 0, 1)
} else if msg.Status == basef.TARSINVOKETIMEOUT {
ReportStat(msg, 0, 1, 0)
} else {
ReportStat(msg, 0, 0, 1)
}
return err
}
msg.End()
*Resp = *msg.Resp
//report
ReportStat(msg, 1, 0, 0)
return err
}
//ServantProxyFactory is ServantProxy' factory struct.
type ServantProxyFactory struct {
objs map[string]*ServantProxy
comm *Communicator
fm *sync.Mutex
}
//Init init the ServantProxyFactory.
func (o *ServantProxyFactory) Init(comm *Communicator) {
o.fm = new(sync.Mutex)
o.comm = comm
o.objs = make(map[string]*ServantProxy)
}
//GetServantProxy gets the ServanrProxy for the object.
func (o *ServantProxyFactory) GetServantProxy(objName string) *ServantProxy {
o.fm.Lock()
if obj, ok := o.objs[objName]; ok {
o.fm.Unlock()
return obj
}
o.fm.Unlock()
obj := new(ServantProxy)
obj.Init(o.comm, objName)
o.fm.Lock()
o.objs[objName] = obj
o.fm.Unlock()
return obj
}