Skip to content

Commit

Permalink
feat: implement batchNewMessages method. (#687)
Browse files Browse the repository at this point in the history
* feat: implement batchNewMessages method.

* update batchnewMessages logic.

* update logic.

* update
  • Loading branch information
mo3et authored Sep 4, 2024
1 parent b55cb90 commit 3750b0f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 9 deletions.
74 changes: 65 additions & 9 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,30 +222,42 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
//var unreadMessages []*model_struct.LocalConversationUnreadMessage
var newMessages sdk_struct.NewMsgList
// var reactionMsgModifierList, reactionMsgDeleterList sdk_struct.NewMsgList

var isUnreadCount, isConversationUpdate, isHistory, isNotPrivate, isSenderConversationUpdate bool

conversationChangedSet := make(map[string]*model_struct.LocalConversation)
newConversationSet := make(map[string]*model_struct.LocalConversation)
conversationSet := make(map[string]*model_struct.LocalConversation)
phConversationChangedSet := make(map[string]*model_struct.LocalConversation)
phNewConversationSet := make(map[string]*model_struct.LocalConversation)

log.ZDebug(ctx, "message come here conversation ch", "conversation length", len(allMsg))
b := time.Now()

onlineMap := make(map[onlineMsgKey]struct{})

for conversationID, msgs := range allMsg {
log.ZDebug(ctx, "parse message in one conversation", "conversationID",
conversationID, "message length", len(msgs.Msgs))
var insertMessage, selfInsertMessage, othersInsertMessage []*model_struct.LocalChatLog
var updateMessage []*model_struct.LocalChatLog

for _, v := range msgs.Msgs {
log.ZDebug(ctx, "parse message ", "conversationID", conversationID, "msg", v)
isHistory = utils.GetSwitchFromOptions(v.Options, constant.IsHistory)

isUnreadCount = utils.GetSwitchFromOptions(v.Options, constant.IsUnreadCount)

isConversationUpdate = utils.GetSwitchFromOptions(v.Options, constant.IsConversationUpdate)

isNotPrivate = utils.GetSwitchFromOptions(v.Options, constant.IsNotPrivate)

isSenderConversationUpdate = utils.GetSwitchFromOptions(v.Options, constant.IsSenderConversationUpdate)

msg := &sdk_struct.MsgStruct{}
copier.Copy(msg, v)
msg.Content = string(v.Content)

var attachedInfo sdk_struct.AttachedInfoElem
_ = utils.JsonStringToStruct(v.AttachedInfo, &attachedInfo)
msg.AttachedInfoElem = &attachedInfo
Expand All @@ -255,7 +267,9 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
insertMessage = append(insertMessage, c.msgStructToLocalChatLog(msg))
continue
}

msg.Status = constant.MsgStatusSendSuccess

//De-analyze data
err := c.msgHandleByContentType(msg)
if err != nil {
Expand Down Expand Up @@ -367,17 +381,21 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {

}
}

//todo The lock granularity needs to be optimized to the conversation level.
c.conversationSyncMutex.Lock()
defer c.conversationSyncMutex.Unlock()
//todo The lock granularity needs to be optimized to the conversation level.

list, err := c.db.GetAllConversationListDB(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationListDB", err)
}

m := make(map[string]*model_struct.LocalConversation)
listToMap(list, m)
log.ZDebug(ctx, "listToMap: ", "local conversation", list, "generated c map",
string(stringutil.StructToJsonBytes(conversationSet)))

c.diff(ctx, m, conversationSet, conversationChangedSet, newConversationSet)
log.ZInfo(ctx, "trigger map is :", "newConversations", string(stringutil.StructToJsonBytes(newConversationSet)),
"changedConversations", string(stringutil.StructToJsonBytes(conversationChangedSet)))
Expand Down Expand Up @@ -429,7 +447,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg))

if c.batchMsgListener() != nil {
c.batchNewMessages(ctx, newMessages)
c.batchNewMessages(ctx, newMessages, conversationChangedSet, newConversationSet, onlineMap)
} else {
c.newMessage(ctx, newMessages, conversationChangedSet, newConversationSet, onlineMap)
}
Expand All @@ -451,6 +469,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
}
}
}

log.ZDebug(ctx, "insert msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg))
}

Expand Down Expand Up @@ -886,15 +905,52 @@ func (c *Conversation) newMessage(ctx context.Context, newMessagesList sdk_struc
}
}

func (c *Conversation) batchNewMessages(ctx context.Context, newMessagesList sdk_struct.NewMsgList) {
sort.Sort(newMessagesList)
if len(newMessagesList) > 0 {
c.batchMsgListener().OnRecvNewMessages(utils.StructToJsonString(newMessagesList))
//if c.IsBackground {
// c.batchMsgListener.OnRecvOfflineNewMessages(utils.StructToJsonString(newMessagesList))
//}
func (c *Conversation) batchNewMessages(ctx context.Context, newMessagesList sdk_struct.NewMsgList, conversationChanged, newConversation map[string]*model_struct.LocalConversation, onlineMsg map[onlineMsgKey]struct{}) {
if len(newMessagesList) == 0 {
log.ZWarn(ctx, "newMessagesList is empty", errs.New("newMessagesList is empty"))
return
}

sort.Sort(newMessagesList)
var needNotificationMsgList sdk_struct.NewMsgList

// offline
if c.GetBackground() {
u, err := c.user.GetSelfUserInfo(ctx)
if err != nil {
log.ZWarn(ctx, "GetSelfUserInfo err", err)
}

if u.GlobalRecvMsgOpt != constant.ReceiveMessage {
return
}

for _, w := range newMessagesList {
conversationID := utils.GetConversationIDByMsg(w)
if v, ok := conversationChanged[conversationID]; ok && v.RecvMsgOpt == constant.ReceiveMessage {
needNotificationMsgList = append(needNotificationMsgList, w)
}
if v, ok := newConversation[conversationID]; ok && v.RecvMsgOpt == constant.ReceiveMessage {
needNotificationMsgList = append(needNotificationMsgList, w)
}
}

if len(needNotificationMsgList) != 0 {
c.batchMsgListener().OnRecvOfflineNewMessages(utils.StructToJsonString(needNotificationMsgList))
}
} else { // online
for _, w := range newMessagesList {
if w.ContentType == constant.Typing {
continue
}

needNotificationMsgList = append(needNotificationMsgList, w)
}

if len(needNotificationMsgList) != 0 {
c.batchMsgListener().OnRecvNewMessages(utils.StructToJsonString(needNotificationMsgList))
}
}
}

func (c *Conversation) msgConvert(msg *sdk_struct.MsgStruct) (err error) {
Expand Down
1 change: 1 addition & 0 deletions open_im_sdk/em.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package open_im_sdk
import (
"context"
"errors"

"github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback"
"github.com/openimsdk/tools/log"
)
Expand Down

0 comments on commit 3750b0f

Please sign in to comment.