Skip to content

Commit 4048019

Browse files
authored
merge from fix fed kafkasend to latest (finogeeks#49)
Co-authored-by: chenzhangrong <>
1 parent 4164246 commit 4048019

File tree

111 files changed

+3023
-1261
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

111 files changed

+3023
-1261
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@
1616

1717
bin
1818
.vscode
19-
./log
19+
log
2020
.DS_Store
2121
.idea

adapter/adapter.go

+25
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ type CommonCfg struct {
5959
Domain DomainCfg
6060
DistLock DistLockCfg
6161
Debug DebugCfg
62+
Cache CacheCfg
63+
}
64+
65+
type CacheCfg struct {
66+
TokenExpire int64
67+
UtlExpire int64
68+
LatestToken int
6269
}
6370

6471
var AdapterCfg CommonCfg
@@ -170,3 +177,21 @@ func Random(min, max int) int {
170177
rand.Seed(time.Now().UnixNano())
171178
return rand.Intn(max-min) + min
172179
}
180+
181+
func SetCacheCfg(tokenExpire, utlExpire int64, latestToken int) {
182+
AdapterCfg.Cache.TokenExpire = tokenExpire
183+
AdapterCfg.Cache.UtlExpire = utlExpire
184+
AdapterCfg.Cache.LatestToken = latestToken
185+
}
186+
187+
func GetTokenExpire() int64 {
188+
return AdapterCfg.Cache.TokenExpire
189+
}
190+
191+
func GetUtlExpire() int64 {
192+
return AdapterCfg.Cache.UtlExpire
193+
}
194+
195+
func GetLatestToken() int {
196+
return AdapterCfg.Cache.LatestToken
197+
}

cache/base.go

+5
Original file line numberDiff line numberDiff line change
@@ -190,3 +190,8 @@ func (rc *RedisCache) HMSet(key string, val interface{}) (err error) {
190190
func (rc *RedisCache) HMGet(key string, fields []string) ([]interface{}, error) {
191191
return redis.Values(rc.SafeDo("HMGET", redis.Args{}.Add(key).AddFlat(fields)...))
192192
}
193+
194+
func (c *RedisCache) Expire(key string, expire int64) error {
195+
_, err := Bool(c.SafeDo("EXPIRE", key, expire))
196+
return err
197+
}

cache/redis_cache.go

+37-6
Original file line numberDiff line numberDiff line change
@@ -765,14 +765,14 @@ func (rc *RedisCache) GetRoomUnreadCount(userID, roomID string) (int64, int64, e
765765
func (rc *RedisCache) GetPresences(userID string) (*authtypes.Presences, bool) {
766766
key := fmt.Sprintf("%s:%s", "presences", userID)
767767

768-
reply, err := redis.Values(rc.SafeDo("hmget", key, "user_id", "status", "status_msg", "ext_status_msg"))
768+
reply, err := redis.Values(rc.SafeDo("hmget", key, "user_id", "status", "status_msg", "ext_status_msg","server_status"))
769769
if err != nil {
770770
log.Errorw("cache missed for presences", log.KeysAndValues{"userID", userID, "error", err})
771771
return nil, false
772772
} else {
773773
var presences authtypes.Presences
774774

775-
reply, err = redis.Scan(reply, &presences.UserID, &presences.Status, &presences.StatusMsg, &presences.ExtStatusMsg)
775+
reply, err = redis.Scan(reply, &presences.UserID, &presences.Status, &presences.StatusMsg, &presences.ExtStatusMsg, &presences.ServerStatus)
776776
if err != nil {
777777
log.Errorw("Scan error for presences", log.KeysAndValues{"userID", userID, "error", err})
778778
return nil, false
@@ -794,6 +794,17 @@ func (rc *RedisCache) SetPresences(userID, status, statusMsg, extStatusMsg strin
794794
return conn.Flush()
795795
}
796796

797+
func (rc *RedisCache) SetPresencesServerStatus(userID, serverStatus string) error {
798+
conn := rc.pool().Get()
799+
defer conn.Close()
800+
key := fmt.Sprintf("%s:%s", "presences", userID)
801+
err := conn.Send("hmset", key, "server_status", serverStatus)
802+
if err != nil {
803+
return err
804+
}
805+
return conn.Flush()
806+
}
807+
797808
func (rc *RedisCache) SetAccountData(userID, roomID, acctType, content string) error {
798809
conn := rc.pool().Get()
799810
defer conn.Close()
@@ -998,24 +1009,24 @@ func (rc *RedisCache) GetDomains() ([]string, error) {
9981009
}
9991010

10001011
func (rc *RedisCache) GetUserInfoByUserID(userID string) (result *authtypes.UserInfo) {
1001-
reply, err := redis.Values(rc.SafeDo("hmget", fmt.Sprintf("%s:%s", "user_info", userID), "user_id", "user_name", "job_number", "mobile", "landline", "email"))
1012+
reply, err := redis.Values(rc.SafeDo("hmget", fmt.Sprintf("%s:%s", "user_info", userID), "user_id", "user_name", "job_number", "mobile", "landline", "email","state"))
10021013
if err != nil {
10031014
log.Errorw("cache missed for user_info", log.KeysAndValues{"userID", userID, "error", err})
10041015
} else {
10051016
result = &authtypes.UserInfo{}
1006-
reply, err = redis.Scan(reply, &result.UserID, &result.UserName, &result.JobNumber, &result.Mobile, &result.Landline, &result.Email)
1017+
reply, err = redis.Scan(reply, &result.UserID, &result.UserName, &result.JobNumber, &result.Mobile, &result.Landline, &result.Email,&result.State)
10071018
if err != nil {
10081019
log.Errorw("Scan error for user_info", log.KeysAndValues{"userID", userID, "error", err})
10091020
}
10101021
}
10111022
return
10121023
}
10131024

1014-
func (rc *RedisCache) SetUserInfo(userID, userName, jobNumber, mobile, landline, email string) error {
1025+
func (rc *RedisCache) SetUserInfo(userID, userName, jobNumber, mobile, landline, email string, state int) error {
10151026
conn := rc.pool().Get()
10161027
defer conn.Close()
10171028

1018-
err := conn.Send("hmset", fmt.Sprintf("%s:%s", "user_info", userID), "user_id", userID, "user_name", userName, "job_number", jobNumber, "mobile", mobile, "landline", landline, "email", email)
1029+
err := conn.Send("hmset", fmt.Sprintf("%s:%s", "user_info", userID), "user_id", userID, "user_name", userName, "job_number", jobNumber, "mobile", mobile, "landline", landline, "email", email, "state", state)
10191030
if err != nil {
10201031
return err
10211032
}
@@ -1375,3 +1386,23 @@ func (rc *RedisCache) FreeFedBackfill(roomID string) error {
13751386
err := conn.Send("DEL", "fedbackfill:"+roomID)
13761387
return err
13771388
}
1389+
1390+
func (rc *RedisCache) SetRoomLatestOffset(roomId string, offset int64) error {
1391+
conn := rc.pool().Get()
1392+
defer conn.Close()
1393+
key := fmt.Sprintf("%s:%s", "roomlatestoffset", roomId)
1394+
err := conn.Send("set", key, offset)
1395+
if err != nil {
1396+
return err
1397+
}
1398+
return conn.Flush()
1399+
}
1400+
1401+
func (rc *RedisCache) GetRoomLatestOffset(roomId string) (int64, error) {
1402+
key := fmt.Sprintf("%s:%s", "roomlatestoffset", roomId)
1403+
offset, err := redis.Int64(rc.SafeDo("get", key))
1404+
if err != nil {
1405+
return -1, err
1406+
}
1407+
return offset, err
1408+
}

cache/token.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package cache
2+
3+
import (
4+
"fmt"
5+
"sort"
6+
"strconv"
7+
"time"
8+
"github.com/finogeeks/ligase/adapter"
9+
"github.com/finogeeks/ligase/skunkworks/log"
10+
)
11+
12+
func (rc *RedisCache) SetToken(userID, device string, utl int64, roomoffsets map[string]int64) error {
13+
key := fmt.Sprintf("synctoken:%s:%s:%d", userID,device, utl)
14+
err := rc.HMSet(key, roomoffsets)
15+
if err == nil {
16+
e := rc.Expire(key,adapter.GetTokenExpire())
17+
if e != nil {
18+
log.Errorf("set token expire key:%s err:%v", key, e)
19+
}
20+
}
21+
return err
22+
}
23+
24+
func (rc *RedisCache) GetToken(userID, device string, utl int64) (map[string]int64, error) {
25+
key := fmt.Sprintf("synctoken:%s:%s:%d", userID,device, utl)
26+
result, err := rc.HGetAll(key)
27+
if err != nil {
28+
return nil, err
29+
} else {
30+
if result == nil {
31+
return nil, nil
32+
}
33+
r := make(map[string]int64)
34+
for k, v := range result {
35+
r[k], _ = Int64(v, nil)
36+
}
37+
e := rc.Expire(key,adapter.GetTokenExpire())
38+
if e != nil {
39+
log.Errorf("get token expire key:%s err:%v", key, e)
40+
}
41+
return r, nil
42+
}
43+
}
44+
45+
func (rc *RedisCache) DelTokens(userID, device string, utls[]int64) error {
46+
for _, utl := range utls {
47+
key := fmt.Sprintf("synctoken:%s:%s:%d", userID,device, utl)
48+
if err := rc.Del(key); err != nil {
49+
log.Warnf("del token key:%s utl:%d err:%v", key, utl, err)
50+
}else{
51+
tokenUtl := fmt.Sprintf("allutl:%s:%s",userID, device)
52+
if err := rc.HDel(tokenUtl,strconv.FormatInt(utl,10)); err != nil {
53+
log.Warnf("del utl key:%s field:%d err:%v", tokenUtl, utl, err)
54+
}
55+
}
56+
}
57+
return nil
58+
}
59+
60+
func (rc *RedisCache) AddTokenUtl(userID, device string, utl int64) error {
61+
key := fmt.Sprintf("allutl:%s:%s",userID, device)
62+
err := rc.HSet(key, strconv.FormatInt(utl,10), time.Now().UnixNano()/1000000)
63+
if err == nil {
64+
e := rc.Expire(key,adapter.GetUtlExpire())
65+
if e != nil {
66+
log.Errorf("add token utl expire key:%s err:%v", key, e)
67+
}
68+
}
69+
return err
70+
}
71+
72+
func (rc *RedisCache) GetTokenUtls(userID, device string)(utls []int64, err error) {
73+
key := fmt.Sprintf("allutl:%s:%s",userID, device)
74+
us, err := rc.HGetAll(key)
75+
if err != nil || us == nil {
76+
return []int64{}, err
77+
}
78+
for k, _ := range us {
79+
utl, err := strconv.ParseInt(k, 10, 64)
80+
if err == nil {
81+
utls = append(utls, utl)
82+
}
83+
}
84+
e := rc.Expire(key,adapter.GetUtlExpire())
85+
if e != nil {
86+
log.Errorf("add token utl expire key:%s err:%v", key, e)
87+
}
88+
sort.Slice(utls, func(i, j int) bool {
89+
return utls[i] > utls[j]
90+
})
91+
return utls, nil
92+
}
93+
94+
func (rc *RedisCache) GetLastValidToken(userID, device string) (int64, map[string]int64, error) {
95+
utls, err := rc.GetTokenUtls(userID, device)
96+
if err != nil {
97+
return 0, nil, err
98+
}
99+
if len(utls) <= 0 {
100+
return 0, nil, nil
101+
}
102+
token, err := rc.GetToken(userID, device, utls[0])
103+
return utls[0], token, err
104+
}

cachewriter/consumers/account_dbev_consumer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func (s *AccountDBEvCacheConsumer) OnUpsertUserInfo(
237237
conn := s.pool.Pool().Get()
238238
defer conn.Close()
239239

240-
err := conn.Send("hmset", fmt.Sprintf("%s:%s", "user_info", msg.UserID), "user_id", msg.UserID, "user_name", msg.UserName, "job_number", msg.JobNumber, "mobile", msg.Mobile, "landline", msg.Landline, "email", msg.Email)
240+
err := conn.Send("hmset", fmt.Sprintf("%s:%s", "user_info", msg.UserID), "user_id", msg.UserID, "user_name", msg.UserName, "job_number", msg.JobNumber, "mobile", msg.Mobile, "landline", msg.Landline, "email", msg.Email, "state", msg.State)
241241
if err != nil {
242242
return err
243243
}

clientapi/api/api.go

+64
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,13 @@ func init() {
180180
apiconsumer.SetAPIProcessor(ReqGetSetting{})
181181
apiconsumer.SetAPIProcessor(ReqPutSetting{})
182182
apiconsumer.SetAPIProcessor(ReqGetRoomInfo{})
183+
apiconsumer.SetAPIProcessor(ReqInternalGetRoomInfo{})
183184
apiconsumer.SetAPIProcessor(ReqPostReport{})
184185
apiconsumer.SetAPIProcessor(ReqGetUserInfo{})
185186
apiconsumer.SetAPIProcessor(ReqPutUserInfo{})
186187
apiconsumer.SetAPIProcessor(ReqPostUserInfo{})
187188
apiconsumer.SetAPIProcessor(ReqDeleteUserInfo{})
189+
apiconsumer.SetAPIProcessor(ReqDismissRoom{})
188190
}
189191

190192
type ReqPostCreateRoom struct{}
@@ -1984,6 +1986,37 @@ func (ReqGetRoomInfo) Process(ctx context.Context, consumer interface{}, msg cor
19841986
return routing.OnRoomInfoRequest(ctx, device.UserID, req.RoomIDs, c.rsRpcCli, c.cacheIn)
19851987
}
19861988

1989+
type ReqInternalGetRoomInfo struct{}
1990+
1991+
func (ReqInternalGetRoomInfo) GetRoute() string { return "/rooms" }
1992+
func (ReqInternalGetRoomInfo) GetMetricsName() string { return "room_info" }
1993+
func (ReqInternalGetRoomInfo) GetMsgType() int32 { return internals.MSG_INTERNAL_POST_ROOM_INFO }
1994+
func (ReqInternalGetRoomInfo) GetAPIType() int8 { return apiconsumer.APITypeInternalAuth }
1995+
func (ReqInternalGetRoomInfo) GetMethod() []string {
1996+
return []string{http.MethodPost, http.MethodOptions}
1997+
}
1998+
func (ReqInternalGetRoomInfo) GetTopic(cfg *config.Dendrite) string { return getProxyRpcTopic(cfg) }
1999+
func (ReqInternalGetRoomInfo) NewRequest() core.Coder {
2000+
return new(external.GetRoomInfoRequest)
2001+
}
2002+
func (ReqInternalGetRoomInfo) FillRequest(coder core.Coder, req *http.Request, vars map[string]string) error {
2003+
msg := coder.(*external.GetRoomInfoRequest)
2004+
err := common.UnmarshalJSON(req, msg)
2005+
if err != nil {
2006+
return err
2007+
}
2008+
return nil
2009+
}
2010+
func (ReqInternalGetRoomInfo) NewResponse(code int) core.Coder {
2011+
return new(external.GetRoomInfoResponse)
2012+
}
2013+
func (ReqInternalGetRoomInfo) GetPrefix() []string { return []string{"inr0"} }
2014+
func (ReqInternalGetRoomInfo) Process(ctx context.Context, consumer interface{}, msg core.Coder, device *authtypes.Device) (int, core.Coder) {
2015+
c := consumer.(*InternalMsgConsumer)
2016+
req := msg.(*external.GetRoomInfoRequest)
2017+
return routing.OnRoomInfoRequest(ctx, device.UserID, req.RoomIDs, c.rsRpcCli, c.cacheIn)
2018+
}
2019+
19872020
type ReqGetUserNewToken struct{}
19882021

19892022
func (ReqGetUserNewToken) GetRoute() string { return "/user/new_token" }
@@ -2267,3 +2300,34 @@ func (ReqDeleteUserInfo) Process(ctx context.Context, consumer interface{}, msg
22672300
ctx, req, c.Cfg, c.federation, c.accountDB, c.cacheIn,
22682301
)
22692302
}
2303+
2304+
type ReqDismissRoom struct{}
2305+
2306+
func (ReqDismissRoom) GetRoute() string { return "/{roomID}/dismiss" }
2307+
func (ReqDismissRoom) GetMetricsName() string { return "dismiss_room" }
2308+
func (ReqDismissRoom) GetMsgType() int32 { return internals.MSG_POST_ROOM_DISMISS }
2309+
func (ReqDismissRoom) GetAPIType() int8 { return apiconsumer.APITypeAuth }
2310+
func (ReqDismissRoom) GetMethod() []string { return []string{http.MethodPost, http.MethodOptions} }
2311+
func (ReqDismissRoom) GetTopic(cfg *config.Dendrite) string { return getProxyRpcTopic(cfg) }
2312+
func (ReqDismissRoom) NewRequest() core.Coder {
2313+
return new(external.DismissRoomRequest)
2314+
}
2315+
func (ReqDismissRoom) FillRequest(coder core.Coder, req *http.Request, vars map[string]string) error {
2316+
msg := coder.(*external.DismissRoomRequest)
2317+
if vars != nil {
2318+
msg.RoomID = vars["roomID"]
2319+
}
2320+
return nil
2321+
}
2322+
func (ReqDismissRoom) NewResponse(code int) core.Coder { return nil }
2323+
func (ReqDismissRoom) GetPrefix() []string { return []string{"r0"} }
2324+
func (ReqDismissRoom) Process(ctx context.Context, consumer interface{}, msg core.Coder, device *authtypes.Device) (int, core.Coder) {
2325+
c := consumer.(*InternalMsgConsumer)
2326+
req := msg.(*external.DismissRoomRequest)
2327+
userID := device.UserID
2328+
deviceID := device.ID
2329+
return routing.DismissRoom(
2330+
ctx, req, c.accountDB, userID, deviceID, req.RoomID,
2331+
c.Cfg, c.rsRpcCli, c.federation, c.cacheIn, c.idg, c.complexCache,
2332+
)
2333+
}

clientapi/routing/createroom.go

+15
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ func createRoom(ctx context.Context, r *external.PostCreateRoomRequest,
114114
accountDB model.AccountsDatabase, rpcCli roomserverapi.RoomserverRPCAPI, cache service.Cache,
115115
idg *uid.UidGenerator, complexCache *common.ComplexCache,
116116
) (int, core.Coder) {
117+
bs := time.Now().UnixNano()/1000000
118+
defer func(bs int64, roomID, userID string){
119+
spend := time.Now().UnixNano()/1000000 - bs
120+
log.Infof("userID:%s createRoom roomID:%s spend:%d", userID, roomID, spend)
121+
}(bs, roomID, userID)
117122
now := time.Now()
118123
last := now
119124
fields := util.GetLogFields(ctx)
@@ -264,6 +269,16 @@ func createRoom(ctx context.Context, r *external.PostCreateRoomRequest,
264269
val := int(mapVal.(float64))
265270
createContent.RoomType = &val
266271
}
272+
mapVal, ok = r.CreationContent["is_organization_room"]
273+
if ok {
274+
val := mapVal.(bool)
275+
createContent.IsOrganizationRoom = &val
276+
}
277+
mapVal, ok = r.CreationContent["is_group_room"]
278+
if ok {
279+
val := mapVal.(bool)
280+
createContent.IsGroupRoom = &val
281+
}
267282

268283
eventsToMake := []external.StateEvent{
269284
{Type: "m.room.create", Content: createContent},

0 commit comments

Comments
 (0)