diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index fa36d8540..c5e0210cf 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -222,30 +222,42 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) { //var unreadMessages []*model_struct.LocalConversationUnreadMessage var newMessages sdk_struct.NewMsgList // var reactionMsgModifierList, reactionMsgDeleterList sdk_struct.NewMsgList + var isUnreadCount, isConversationUpdate, isHistory, isNotPrivate, isSenderConversationUpdate bool + conversationChangedSet := make(map[string]*model_struct.LocalConversation) newConversationSet := make(map[string]*model_struct.LocalConversation) conversationSet := make(map[string]*model_struct.LocalConversation) phConversationChangedSet := make(map[string]*model_struct.LocalConversation) phNewConversationSet := make(map[string]*model_struct.LocalConversation) + log.ZDebug(ctx, "message come here conversation ch", "conversation length", len(allMsg)) b := time.Now() + onlineMap := make(map[onlineMsgKey]struct{}) + for conversationID, msgs := range allMsg { log.ZDebug(ctx, "parse message in one conversation", "conversationID", conversationID, "message length", len(msgs.Msgs)) var insertMessage, selfInsertMessage, othersInsertMessage []*model_struct.LocalChatLog var updateMessage []*model_struct.LocalChatLog + for _, v := range msgs.Msgs { log.ZDebug(ctx, "parse message ", "conversationID", conversationID, "msg", v) isHistory = utils.GetSwitchFromOptions(v.Options, constant.IsHistory) + isUnreadCount = utils.GetSwitchFromOptions(v.Options, constant.IsUnreadCount) + isConversationUpdate = utils.GetSwitchFromOptions(v.Options, constant.IsConversationUpdate) + isNotPrivate = utils.GetSwitchFromOptions(v.Options, constant.IsNotPrivate) + isSenderConversationUpdate = utils.GetSwitchFromOptions(v.Options, constant.IsSenderConversationUpdate) + msg := &sdk_struct.MsgStruct{} copier.Copy(msg, v) msg.Content = string(v.Content) + var attachedInfo sdk_struct.AttachedInfoElem _ = utils.JsonStringToStruct(v.AttachedInfo, &attachedInfo) msg.AttachedInfoElem = &attachedInfo @@ -255,7 +267,9 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) { insertMessage = append(insertMessage, c.msgStructToLocalChatLog(msg)) continue } + msg.Status = constant.MsgStatusSendSuccess + //De-analyze data err := c.msgHandleByContentType(msg) if err != nil { @@ -367,17 +381,21 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) { } } + + //todo The lock granularity needs to be optimized to the conversation level. c.conversationSyncMutex.Lock() defer c.conversationSyncMutex.Unlock() - //todo The lock granularity needs to be optimized to the conversation level. + list, err := c.db.GetAllConversationListDB(ctx) if err != nil { log.ZError(ctx, "GetAllConversationListDB", err) } + m := make(map[string]*model_struct.LocalConversation) listToMap(list, m) log.ZDebug(ctx, "listToMap: ", "local conversation", list, "generated c map", string(stringutil.StructToJsonBytes(conversationSet))) + c.diff(ctx, m, conversationSet, conversationChangedSet, newConversationSet) log.ZInfo(ctx, "trigger map is :", "newConversations", string(stringutil.StructToJsonBytes(newConversationSet)), "changedConversations", string(stringutil.StructToJsonBytes(conversationChangedSet))) @@ -429,7 +447,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) { log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) if c.batchMsgListener() != nil { - c.batchNewMessages(ctx, newMessages) + c.batchNewMessages(ctx, newMessages, conversationChangedSet, newConversationSet, onlineMap) } else { c.newMessage(ctx, newMessages, conversationChangedSet, newConversationSet, onlineMap) } @@ -451,6 +469,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) { } } } + log.ZDebug(ctx, "insert msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) } @@ -886,15 +905,52 @@ func (c *Conversation) newMessage(ctx context.Context, newMessagesList sdk_struc } } -func (c *Conversation) batchNewMessages(ctx context.Context, newMessagesList sdk_struct.NewMsgList) { - sort.Sort(newMessagesList) - if len(newMessagesList) > 0 { - c.batchMsgListener().OnRecvNewMessages(utils.StructToJsonString(newMessagesList)) - //if c.IsBackground { - // c.batchMsgListener.OnRecvOfflineNewMessages(utils.StructToJsonString(newMessagesList)) - //} +func (c *Conversation) batchNewMessages(ctx context.Context, newMessagesList sdk_struct.NewMsgList, conversationChanged, newConversation map[string]*model_struct.LocalConversation, onlineMsg map[onlineMsgKey]struct{}) { + if len(newMessagesList) == 0 { + log.ZWarn(ctx, "newMessagesList is empty", errs.New("newMessagesList is empty")) + return } + sort.Sort(newMessagesList) + var needNotificationMsgList sdk_struct.NewMsgList + + // offline + if c.GetBackground() { + u, err := c.user.GetSelfUserInfo(ctx) + if err != nil { + log.ZWarn(ctx, "GetSelfUserInfo err", err) + } + + if u.GlobalRecvMsgOpt != constant.ReceiveMessage { + return + } + + for _, w := range newMessagesList { + conversationID := utils.GetConversationIDByMsg(w) + if v, ok := conversationChanged[conversationID]; ok && v.RecvMsgOpt == constant.ReceiveMessage { + needNotificationMsgList = append(needNotificationMsgList, w) + } + if v, ok := newConversation[conversationID]; ok && v.RecvMsgOpt == constant.ReceiveMessage { + needNotificationMsgList = append(needNotificationMsgList, w) + } + } + + if len(needNotificationMsgList) != 0 { + c.batchMsgListener().OnRecvOfflineNewMessages(utils.StructToJsonString(needNotificationMsgList)) + } + } else { // online + for _, w := range newMessagesList { + if w.ContentType == constant.Typing { + continue + } + + needNotificationMsgList = append(needNotificationMsgList, w) + } + + if len(needNotificationMsgList) != 0 { + c.batchMsgListener().OnRecvNewMessages(utils.StructToJsonString(needNotificationMsgList)) + } + } } func (c *Conversation) msgConvert(msg *sdk_struct.MsgStruct) (err error) { diff --git a/open_im_sdk/em.go b/open_im_sdk/em.go index e7f0e7f8b..ef0623372 100644 --- a/open_im_sdk/em.go +++ b/open_im_sdk/em.go @@ -17,6 +17,7 @@ package open_im_sdk import ( "context" "errors" + "github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback" "github.com/openimsdk/tools/log" )