Skip to content

Commit

Permalink
refactor: performance optimization of pull messages. (#684)
Browse files Browse the repository at this point in the history
* fix: Bug fix for clearing unread messages.

Signed-off-by: Gordon <[email protected]>

* fix: Bug fix for pull messages.

Signed-off-by: Gordon <[email protected]>

* refactor: performance optimization of pull messages.

Signed-off-by: Gordon <[email protected]>

---------

Signed-off-by: Gordon <[email protected]>
  • Loading branch information
FGadvancer authored Sep 2, 2024
1 parent 1404a2b commit 6778adb
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 35 deletions.
2 changes: 1 addition & 1 deletion internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
exceptionMsg = append(exceptionMsg, c.msgStructToLocalErrChatLog(msg))
log.ZWarn(ctx, "Deduplication operation ", nil, "msg", *c.msgStructToLocalErrChatLog(msg))
msg.Status = constant.MsgStatusFiltered
msg.ClientMsgID = msg.ClientMsgID + utils.Int64ToString(utils.GetCurrentTimestampByNano())
msg.ClientMsgID = msg.ClientMsgID + utils.Int64ToString(msg.Seq)
othersInsertMessage = append(othersInsertMessage, c.msgStructToLocalChatLog(msg))
}
}
Expand Down
114 changes: 80 additions & 34 deletions internal/conversation_msg/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package conversation_msg
import (
"context"
"errors"
"time"

"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
Expand All @@ -29,20 +30,23 @@ import (
"github.com/openimsdk/protocol/sdkws"
)

// 检测其内部连续性,如果不连续,则向前补齐,获取这一组消息的最大最小seq,以及需要补齐的seq列表长度
// Check for internal continuity. If discontinuity is found, fill in the gaps.
// Retrieve the maximum and minimum seq of this group of messages, as well as the length of the seq list that needs to be filled in.
func (c *Conversation) messageBlocksInternalContinuityCheck(ctx context.Context, conversationID string, notStartTime, isReverse bool, count int,
startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) (max, min int64, length int) {
var lostSeqListLength int
maxSeq, minSeq, haveSeqList := c.getMaxAndMinHaveSeqList(*list)
log.ZDebug(ctx, "getMaxAndMinHaveSeqList is:", "maxSeq", maxSeq, "minSeq", minSeq, "haveSeqList", haveSeqList)
if maxSeq != 0 && minSeq != 0 {
successiveSeqList := func(max, min int64) (seqList []int64) {
for i := min; i <= max; i++ {
seqList = append(seqList, i)
var lostSeqList []int64
haveSeqSet := datautil.SliceSetAny(haveSeqList, func(e int64) int64 {
return e
})
for i := minSeq; i <= maxSeq; i++ {
if _, found := haveSeqSet[i]; !found {
lostSeqList = append(lostSeqList, i)
}
return seqList
}(maxSeq, minSeq)
lostSeqList := utils.DifferenceSubset(successiveSeqList, haveSeqList)
}
lostSeqListLength = len(lostSeqList)
log.ZDebug(ctx, "get lost seqList is :", "maxSeq", maxSeq, "minSeq", minSeq, "lostSeqList", lostSeqList, "length:", lostSeqListLength)
if lostSeqListLength > 0 {
Expand All @@ -52,14 +56,16 @@ func (c *Conversation) messageBlocksInternalContinuityCheck(ctx context.Context,
} else {
pullSeqList = lostSeqList[lostSeqListLength-constant.PullMsgNumForReadDiffusion : lostSeqListLength]
}
log.ZDebug(ctx, "messageBlocksInternalContinuityCheck", "pullSeqList", pullSeqList)
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, pullSeqList, notStartTime, isReverse, count, startTime, list, messageListCallback)
}

}
return maxSeq, minSeq, lostSeqListLength
}

// 检测消息块之间的连续性,如果不连续,则向前补齐,返回块之间是否连续,bool
// Check the continuity between message blocks. If discontinuity is found, fill in the gaps forward.
// Returns whether the blocks are continuous as a boolean value.
func (c *Conversation) messageBlocksBetweenContinuityCheck(ctx context.Context, lastMinSeq, maxSeq int64, conversationID string,
notStartTime, isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) bool {
if lastMinSeq != 0 {
Expand All @@ -78,6 +84,7 @@ func (c *Conversation) messageBlocksBetweenContinuityCheck(ctx context.Context,
}(lastMinSeq-1, startSeq)
log.ZDebug(ctx, "get lost successiveSeqList is :", "successiveSeqList", successiveSeqList, "length:", len(successiveSeqList))
if len(successiveSeqList) > 0 {
log.ZDebug(ctx, "messageBlocksBetweenContinuityCheck", "successiveSeqList", successiveSeqList)
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, successiveSeqList, notStartTime, isReverse, count, startTime, list, messageListCallback)
}
} else {
Expand All @@ -95,7 +102,6 @@ func (c *Conversation) messageBlocksBetweenContinuityCheck(ctx context.Context,
return false
}

// 根据最小seq向前补齐消息,由服务器告诉拉取消息结果是否到底,如果网络,则向前补齐,获取这一组消息的最大最小seq,以及需要补齐的seq列表长度
func (c *Conversation) messageBlocksEndContinuityCheck(ctx context.Context, minSeq int64, conversationID string, notStartTime,
isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
if minSeq != 0 {
Expand All @@ -113,10 +119,12 @@ func (c *Conversation) messageBlocksEndContinuityCheck(ctx context.Context, minS
log.ZDebug(ctx, "pull seqList is ", "seqList", seqList, "len", len(seqList))

if len(seqList) > 0 {
log.ZDebug(ctx, "messageBlocksEndContinuityCheck", "seqList", seqList)
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, seqList, notStartTime, isReverse, count, startTime, list, messageListCallback)
}

} else {
log.ZDebug(ctx, "messageBlocksEndContinuityCheck", "minSeq", minSeq, "conversationID", conversationID)
//local don't have messages,本地无消息,但是服务器最大消息不为0
seqList := []int64{0, 0}
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, seqList, notStartTime, isReverse, count, startTime, list, messageListCallback)
Expand Down Expand Up @@ -156,6 +164,9 @@ func (c *Conversation) pullMessageAndReGetHistoryMessages(ctx context.Context, c
"seqList", seqList)
return
}
if len(existedSeqList) > 0 {
log.ZWarn(ctx, "GetAlreadyExistSeqList", nil, "conversationID", conversationID, "seqList", seqList, "existedSeqList", existedSeqList)
}
if len(existedSeqList) == len(seqList) {
log.ZDebug(ctx, "do not pull message", "seqList", seqList, "existedSeqList", existedSeqList)
return
Expand All @@ -166,6 +177,8 @@ func (c *Conversation) pullMessageAndReGetHistoryMessages(ctx context.Context, c
"newSeqList", newSeqList)
return
}
//todo The process of pulling messages needs to be changed to pull based on a list of sequences,
// as the current method may cause sequence filling issues. This update is required for versions 3.9 +.
var pullMsgResp sdkws.PullMessageBySeqsResp
var pullMsgReq sdkws.PullMessageBySeqsReq
pullMsgReq.UserID = c.loginUserID
Expand All @@ -182,7 +195,7 @@ func (c *Conversation) pullMessageAndReGetHistoryMessages(ctx context.Context, c
err = c.SendReqWaitResp(ctx, &pullMsgReq, constant.PullMsgBySeqList, &pullMsgResp)
if err != nil {
errHandle(newSeqList, list, err, messageListCallback)
log.ZDebug(ctx, "pullmsg SendReqWaitResp failed", err, "req")
log.ZDebug(ctx, "pull SendReqWaitResp failed", err, "req")
} else {
log.ZDebug(ctx, "syncMsgFromServerSplit pull msg", "resp", pullMsgResp)
if pullMsgResp.Msgs == nil {
Expand All @@ -191,7 +204,7 @@ func (c *Conversation) pullMessageAndReGetHistoryMessages(ctx context.Context, c
return
}
if v, ok := pullMsgResp.Msgs[conversationID]; ok {
c.pullMessageIntoTable(ctx, pullMsgResp.Msgs, conversationID)
c.pullMessageIntoTable(ctx, pullMsgResp.Msgs)
messageListCallback.IsEnd = v.IsEnd

if notStartTime {
Expand Down Expand Up @@ -220,7 +233,7 @@ func errHandle(seqList []int64, list *[]*model_struct.LocalChatLog, err error, m
}
*list = result
}
func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map[string]*sdkws.PullMsgs, conversationID string) {
func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map[string]*sdkws.PullMsgs) {
insertMsg := make(map[string][]*model_struct.LocalChatLog, 20)
updateMsg := make(map[string][]*model_struct.LocalChatLog, 30)
var insertMessage, selfInsertMessage, othersInsertMessage []*model_struct.LocalChatLog
Expand All @@ -229,60 +242,78 @@ func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map

log.ZDebug(ctx, "do Msg come here, len: ", "msg length", len(pullMsgData))
for conversationID, msgs := range pullMsgData {
msgIDs := datautil.Slice(msgs.Msgs, func(msg *sdkws.MsgData) string {
return msg.ClientMsgID
})
localMessages, err := c.db.GetMessagesByClientMsgIDs(ctx, conversationID, msgIDs)
if err != nil {
log.ZWarn(ctx, "Failed to get messages by ClientMsgIDs", err)
}
localMessagesMap := datautil.SliceToMap(localMessages, func(msg *model_struct.LocalChatLog) string { return msg.ClientMsgID })
for _, v := range msgs.Msgs {
log.ZDebug(ctx, "msg detail", "msg", v, "conversationID", conversationID)
msg := c.msgDataToLocalChatLog(v)
//When the message has been marked and deleted by the cloud, it is directly inserted locally without any conversation and message update.
//When the message has been marked and deleted by the cloud, it is directly inserted locally
//without any conversation and message update.
if msg.Status == constant.MsgStatusHasDeleted {
insertMessage = append(insertMessage, msg)
continue
}
msg.Status = constant.MsgStatusSendSuccess
// log.Info(operationID, "new msg, seq, ServerMsgID, ClientMsgID", msg.Seq, msg.ServerMsgID, msg.ClientMsgID)
//De-analyze data
// The message might be a filler provided by the server due to a gap in the sequence.
if msg.ClientMsgID == "" {
msg.ClientMsgID = utils.GetMsgID(c.loginUserID) + utils.Int64ToString(msg.Seq)
exceptionMsg = append(exceptionMsg, c.msgDataToLocalErrChatLog(msg))
insertMessage = append(insertMessage, msg)
continue
}
existingMsg, exists := localMessagesMap[msg.ClientMsgID]
if v.SendID == c.loginUserID { //seq
// Messages sent by myself //if sent through this terminal
m, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID)
if err == nil {
log.ZInfo(ctx, "have message", "msg", msg)
if m.Seq == 0 {
if exists {
log.ZDebug(ctx, "have message", "msg", msg)
if existingMsg.Seq == 0 {
updateMessage = append(updateMessage, msg)

} else {
// The message you sent is duplicated, possibly due to a resend or the server consuming
// the message multiple times.
msg.ClientMsgID = msg.ClientMsgID + utils.Int64ToString(msg.Seq)
exceptionMsg = append(exceptionMsg, c.msgDataToLocalErrChatLog(msg))
insertMessage = append(insertMessage, msg)
}
} else { // send through other terminal
log.ZInfo(ctx, "sync message", "msg", msg)
log.ZDebug(ctx, "sync message", "msg", msg)
selfInsertMessage = append(selfInsertMessage, msg)
}
} else { //Sent by others
if oldMessage, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID); err != nil { //Deduplication operation
if !exists {
othersInsertMessage = append(othersInsertMessage, msg)

} else {
if oldMessage.Seq == 0 {
updateMessage = append(updateMessage, msg)
}
// The message sent by others is duplicated, possibly due to a resend or the server consuming
// the message multiple times.
msg.ClientMsgID = msg.ClientMsgID + utils.Int64ToString(msg.Seq)
exceptionMsg = append(exceptionMsg, c.msgDataToLocalErrChatLog(msg))
insertMessage = append(insertMessage, msg)
}
}

}
timeNow := time.Now()
insertMsg[conversationID] = append(insertMessage, c.faceURLAndNicknameHandle(ctx, selfInsertMessage, othersInsertMessage, conversationID)...)
updateMsg[conversationID] = updateMessage
log.ZDebug(ctx, "faceURLAndNicknameHandle, ", "cost time", time.Since(timeNow).Milliseconds(),
"updateMsg", updateMessage, "insertMsg", insertMessage, "selfInsertMessage", selfInsertMessage, "othersInsertMessage", othersInsertMessage)

//update message
if err6 := c.batchUpdateMessageList(ctx, updateMsg); err6 != nil {
log.ZError(ctx, "sync seq normal message err :", err6)
}
b3 := utils.GetCurrentTimestampByMill()
timeNow = time.Now()
//Normal message storage
_ = c.batchInsertMessageList(ctx, insertMsg)
b4 := utils.GetCurrentTimestampByMill()
log.ZDebug(ctx, "BatchInsertMessageListController, ", "cost time", b4-b3)
log.ZDebug(ctx, "BatchInsertMessageListController, ", "cost time", time.Since(timeNow).Milliseconds())

//Exception message storage
for _, v := range exceptionMsg {
Expand All @@ -296,6 +327,11 @@ func (c *Conversation) pullMessageIntoTable(ctx context.Context, pullMsgData map
// 拉取消息不满量,获取服务器中该群最大seq以及用户对于此群最小seq,本地该群的最小seq,如果本地的不为0并且小于等于服务器最小的,说明已经到底部
// 如果本地的为0,可以理解为初始化的时候,数据还未同步,或者异常情况,如果服务器最大seq-服务器最小seq>=0说明还未到底部,否则到底部

// faceURLAndNicknameHandle handles the assignment of face URLs and nicknames for chat logs
// based on the conversation type (single chat or group chat).
// It first retrieves the conversation information using the provided conversationID.
// Depending on the conversation type, it delegates the handling to either singleHandle (for single chats)
// or groupHandle (for group chats). If conversation information retrieval fails, it returns the merged chat logs.
func (c *Conversation) faceURLAndNicknameHandle(ctx context.Context, self, others []*model_struct.LocalChatLog, conversationID string) []*model_struct.LocalChatLog {
lc, err := c.db.GetConversation(ctx, conversationID)
if err != nil {
Expand All @@ -310,22 +346,33 @@ func (c *Conversation) faceURLAndNicknameHandle(ctx context.Context, self, other
return append(self, others...)
}

// singleHandle processes chat logs for single chat conversations.
// It updates the SenderFaceURL and SenderNickname fields for messages in the `self` list
// using the logged-in user's information, and for messages in the `others` list
// using the other party's information if available in the conversation.
func (c *Conversation) singleHandle(ctx context.Context, self, others []*model_struct.LocalChatLog, lc *model_struct.LocalConversation) {
userInfo, err := c.db.GetLoginUser(ctx, c.loginUserID)
if err == nil {
for _, chatLog := range self {
chatLog.SenderFaceURL = userInfo.FaceURL
chatLog.SenderNickname = userInfo.Nickname
if len(self) > 0 {
userInfo, err := c.db.GetLoginUser(ctx, c.loginUserID)
if err == nil {
for _, chatLog := range self {
chatLog.SenderFaceURL = userInfo.FaceURL
chatLog.SenderNickname = userInfo.Nickname
}
}
}

if lc.FaceURL != "" && lc.ShowName != "" {
for _, chatLog := range others {
chatLog.SenderFaceURL = lc.FaceURL
chatLog.SenderNickname = lc.ShowName
}
}

}

// groupHandle processes chat logs for group chat conversations.
// It merges the `self` and `others` chat logs and updates the SenderFaceURL and SenderNickname fields
// using the group members' information. If group member information is not available,
// it attempts to retrieve the sender's information from a local cache.
func (c *Conversation) groupHandle(ctx context.Context, self, others []*model_struct.LocalChatLog, lc *model_struct.LocalConversation) {
allMessage := append(self, others...)

Expand Down Expand Up @@ -356,5 +403,4 @@ func (c *Conversation) groupHandle(ctx context.Context, self, others []*model_st
}
}
}

}

0 comments on commit 6778adb

Please sign in to comment.