From a5f3883873a11ea9e3f4ce37f750a17b6410eaf6 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 26 Sep 2024 14:56:18 +0800 Subject: [PATCH 1/5] feat: code adjustment --- open_im_sdk/file.go | 24 ------------------------ open_im_sdk/third.go | 5 +++++ 2 files changed, 5 insertions(+), 24 deletions(-) delete mode 100644 open_im_sdk/file.go diff --git a/open_im_sdk/file.go b/open_im_sdk/file.go deleted file mode 100644 index 7340b9d55..000000000 --- a/open_im_sdk/file.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright © 2023 OpenIM SDK. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package open_im_sdk - -import ( - "github.com/openimsdk/openim-sdk-core/v3/internal/third/file" - "github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback" -) - -func UploadFile(callback open_im_sdk_callback.Base, operationID string, req string, progress open_im_sdk_callback.UploadFileCallback) { - call(callback, operationID, UserForSDK.File().UploadFile, req, file.UploadFileCallback(progress)) -} diff --git a/open_im_sdk/third.go b/open_im_sdk/third.go index 2cbed223d..94f575982 100644 --- a/open_im_sdk/third.go +++ b/open_im_sdk/third.go @@ -15,6 +15,7 @@ package open_im_sdk import ( + "github.com/openimsdk/openim-sdk-core/v3/internal/third/file" "github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback" "github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs" ) @@ -38,3 +39,7 @@ func Logs(callback open_im_sdk_callback.Base, operationID string, logLevel int, } call(callback, operationID, UserForSDK.Third().Log, logLevel, file, line, msgs, err, keyAndValue) } + +func UploadFile(callback open_im_sdk_callback.Base, operationID string, req string, progress open_im_sdk_callback.UploadFileCallback) { + call(callback, operationID, UserForSDK.File().UploadFile, req, file.UploadFileCallback(progress)) +} From 8f66e79ab0da3ac7663fd2b0853499820716aef6 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 27 Sep 2024 11:14:12 +0800 Subject: [PATCH 2/5] feat: Cmd2Value carry caller --- pkg/common/cmd.go | 63 +++++++++++++++++++++++++++++++++ pkg/common/trigger_channel.go | 66 ++++++++++++++--------------------- 2 files changed, 89 insertions(+), 40 deletions(-) create mode 100644 pkg/common/cmd.go diff --git a/pkg/common/cmd.go b/pkg/common/cmd.go new file mode 100644 index 000000000..3371d6c0b --- /dev/null +++ b/pkg/common/cmd.go @@ -0,0 +1,63 @@ +package common + +import ( + "context" + "fmt" + "github.com/openimsdk/tools/log" + "runtime" + "runtime/debug" + "strings" + "time" +) + +var packet string + +func init() { + build, ok := debug.ReadBuildInfo() + if !ok { + return + } + packet = build.Main.Path + if packet != "" && !strings.HasSuffix(packet, "/") { + packet += "/" + } +} + +type Cmd2Value struct { + Cmd string + Value any + Caller string + Ctx context.Context +} + +func sendCmd(ch chan<- Cmd2Value, value Cmd2Value, timeout time.Duration) error { + if value.Caller == "" { + value.Caller = getFuncName() + } + if ch == nil { + log.ZError(value.Ctx, "sendCmd chan is nil", ErrChanNil, "caller", value.Caller, "cmd", value.Cmd, "value", value.Value) + return ErrChanNil + } + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case ch <- value: + log.ZInfo(value.Ctx, "sendCmd chan success", "caller", value.Caller, "cmd", value.Cmd, "value", value.Value) + return nil + case <-timer.C: + log.ZError(value.Ctx, "sendCmd chan timeout", ErrTimeout, "caller", value.Caller, "cmd", value.Cmd, "value", value.Value) + return ErrTimeout + } +} + +func getFuncName() string { + pc, _, line, ok := runtime.Caller(3) + if !ok { + return "runtime.caller.failed" + } + name := runtime.FuncForPC(pc).Name() + if packet != "" { + name = strings.TrimPrefix(name, packet) + } + return fmt.Sprintf("%s:%d", name, line) +} diff --git a/pkg/common/trigger_channel.go b/pkg/common/trigger_channel.go index 43415afc0..da9412fe3 100644 --- a/pkg/common/trigger_channel.go +++ b/pkg/common/trigger_channel.go @@ -30,11 +30,12 @@ import ( "github.com/openimsdk/protocol/sdkws" ) -const ( - timeOut = 10000 -) +const timeout = time.Millisecond * 10000 -var ErrChanNil = errs.New("channal == nil") +var ( + ErrChanNil = errs.New("channal == nil") + ErrTimeout = errors.New("send cmd timeout") +) func TriggerCmdNewMsgCome(ctx context.Context, msg sdk_struct.CmdNewMsgComeToConversation, conversationCh chan Cmd2Value) error { if conversationCh == nil { @@ -42,7 +43,7 @@ func TriggerCmdNewMsgCome(ctx context.Context, msg sdk_struct.CmdNewMsgComeToCon } c2v := Cmd2Value{Cmd: constant.CmdNewMsgCome, Value: msg, Ctx: ctx} - return sendCmd(conversationCh, c2v, timeOut) + return sendCmd(conversationCh, c2v, timeout) } func TriggerCmdMsgSyncInReinstall(ctx context.Context, msg sdk_struct.CmdMsgSyncInReinstall, conversationCh chan Cmd2Value) error { @@ -51,12 +52,12 @@ func TriggerCmdMsgSyncInReinstall(ctx context.Context, msg sdk_struct.CmdMsgSync } c2v := Cmd2Value{Cmd: constant.CmdMsgSyncInReinstall, Value: msg, Ctx: ctx} - return sendCmd(conversationCh, c2v, timeOut) + return sendCmd(conversationCh, c2v, timeout) } func TriggerCmdNotification(ctx context.Context, msg sdk_struct.CmdNewMsgComeToConversation, conversationCh chan Cmd2Value) { c2v := Cmd2Value{Cmd: constant.CmdNotification, Value: msg, Ctx: ctx} - err := sendCmd(conversationCh, c2v, timeOut) + err := sendCmd(conversationCh, c2v, timeout) if err != nil { log.ZWarn(ctx, "TriggerCmdNotification error", err, "msg", msg) } @@ -64,7 +65,7 @@ func TriggerCmdNotification(ctx context.Context, msg sdk_struct.CmdNewMsgComeToC func TriggerCmdSyncFlag(ctx context.Context, syncFlag int, conversationCh chan Cmd2Value) { c2v := Cmd2Value{Cmd: constant.CmdSyncFlag, Value: sdk_struct.CmdNewMsgComeToConversation{SyncFlag: syncFlag}, Ctx: ctx} - err := sendCmd(conversationCh, c2v, timeOut) + err := sendCmd(conversationCh, c2v, timeout) if err != nil { log.ZWarn(ctx, "TriggerCmdNotification error", err, "syncFlag", syncFlag) } @@ -75,12 +76,12 @@ func TriggerCmdWakeUpDataSync(ctx context.Context, ch chan Cmd2Value) error { return errs.Wrap(ErrChanNil) } c2v := Cmd2Value{Cmd: constant.CmdWakeUpDataSync, Value: nil, Ctx: ctx} - return sendCmd(ch, c2v, timeOut) + return sendCmd(ch, c2v, timeout) } func TriggerCmdSyncData(ctx context.Context, ch chan Cmd2Value) { c2v := Cmd2Value{Cmd: constant.CmdSyncData, Value: nil, Ctx: ctx} - err := sendCmd(ch, c2v, timeOut) + err := sendCmd(ch, c2v, timeout) if err != nil { log.ZWarn(ctx, "TriggerCmdSyncData error", err) } @@ -92,8 +93,11 @@ func TriggerCmdUpdateConversation(ctx context.Context, node UpdateConNode, conve Value: node, Ctx: ctx, } - - return sendCmd(conversationCh, c2v, timeOut) + err := sendCmd(conversationCh, c2v, timeout) + if err != nil { + _, _ = len(conversationCh), cap(conversationCh) + } + return err } func TriggerCmdUpdateMessage(ctx context.Context, node UpdateMessageNode, conversationCh chan Cmd2Value) error { @@ -102,18 +106,17 @@ func TriggerCmdUpdateMessage(ctx context.Context, node UpdateMessageNode, conver Value: node, Ctx: ctx, } - - return sendCmd(conversationCh, c2v, timeOut) + return sendCmd(conversationCh, c2v, timeout) } -// Push message, msg for msgData slice +// TriggerCmdPushMsg Push message, msg for msgData slice func TriggerCmdPushMsg(ctx context.Context, msg *sdkws.PushMessages, ch chan Cmd2Value) error { if ch == nil { return errs.Wrap(ErrChanNil) } c2v := Cmd2Value{Cmd: constant.CmdPushMsg, Value: msg, Ctx: ctx} - return sendCmd(ch, c2v, timeOut) + return sendCmd(ch, c2v, timeout) } func TriggerCmdLogOut(ctx context.Context, ch chan Cmd2Value) error { @@ -121,16 +124,16 @@ func TriggerCmdLogOut(ctx context.Context, ch chan Cmd2Value) error { return errs.Wrap(ErrChanNil) } c2v := Cmd2Value{Cmd: constant.CmdLogOut, Ctx: ctx} - return sendCmd(ch, c2v, timeOut) + return sendCmd(ch, c2v, timeout) } -// Connection success trigger +// TriggerCmdConnected Connection success trigger func TriggerCmdConnected(ctx context.Context, ch chan Cmd2Value) error { if ch == nil { return errs.Wrap(ErrChanNil) } c2v := Cmd2Value{Cmd: constant.CmdConnSuccesss, Value: nil, Ctx: ctx} - return sendCmd(ch, c2v, timeOut) + return sendCmd(ch, c2v, timeout) } type DeleteConNode struct { @@ -138,11 +141,13 @@ type DeleteConNode struct { ConversationID string SessionType int } + type SyncReactionExtensionsNode struct { OperationID string Action int Args interface{} } + type UpdateConNode struct { ConID string Action int //1 Delete the conversation; 2 Update the latest news in the conversation or add a conversation; 3 Put a conversation on the top; @@ -150,20 +155,17 @@ type UpdateConNode struct { Args interface{} Caller string } + type UpdateMessageNode struct { Action int Args interface{} } -type Cmd2Value struct { - Cmd string - Value interface{} - Ctx context.Context -} type UpdateConInfo struct { UserID string GroupID string } + type UpdateMessageInfo struct { SessionType int32 UserID string @@ -179,15 +181,9 @@ type SourceIDAndSessionType struct { Nickname string } -func UnInitAll(conversationCh chan Cmd2Value) error { - c2v := Cmd2Value{Cmd: constant.CmdUnInit} - return sendCmd(conversationCh, c2v, timeOut) -} - type goroutine interface { Work(cmd Cmd2Value) GetCh() chan Cmd2Value - //GetContext() context.Context } func DoListener(Li goroutine, ctx context.Context) { @@ -208,14 +204,4 @@ func DoListener(Li goroutine, ctx context.Context) { return } } - -} - -func sendCmd(ch chan<- Cmd2Value, value Cmd2Value, timeout int64) error { - select { - case ch <- value: - return nil - case <-time.After(time.Millisecond * time.Duration(timeout)): - return errors.New("send cmd timeout") - } } From 5b64e79b26056e0827acba6fc61c4661d48fd653 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 27 Sep 2024 11:48:32 +0800 Subject: [PATCH 3/5] feat: Cmd2Value carry caller --- internal/conversation_msg/conversation_msg.go | 2 +- internal/conversation_msg/notification.go | 4 +-- internal/conversation_msg/read_drawing.go | 2 +- open_im_sdk/userRelated.go | 2 +- pkg/common/cmd.go | 27 +++++++++++++++++ pkg/common/trigger_channel.go | 30 +------------------ 6 files changed, 33 insertions(+), 34 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index e0298a720..492070000 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -669,7 +669,7 @@ func (c *Conversation) batchUpdateMessageList(ctx context.Context, updateMsg map conversation.LatestMsg = utils.StructToJsonString(latestMsg) c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{ConID: conversation.ConversationID, - Action: constant.AddConOrUpLatMsg, Args: *conversation, Caller: "batchUpdateMessageList"}}) + Action: constant.AddConOrUpLatMsg, Args: *conversation}}) } } diff --git a/internal/conversation_msg/notification.go b/internal/conversation_msg/notification.go index 278512d59..2cbf17ff0 100644 --- a/internal/conversation_msg/notification.go +++ b/internal/conversation_msg/notification.go @@ -44,8 +44,8 @@ const ( const InitSyncProgress = 10 func (c *Conversation) Work(c2v common.Cmd2Value) { - log.ZDebug(c2v.Ctx, "NotificationCmd start", "cmd", c2v.Cmd, "value", c2v.Value) - defer log.ZDebug(c2v.Ctx, "NotificationCmd end", "cmd", c2v.Cmd, "value", c2v.Value) + log.ZDebug(c2v.Ctx, "NotificationCmd start", "caller", c2v.Caller, "cmd", c2v.Cmd, "value", c2v.Value) + defer log.ZDebug(c2v.Ctx, "NotificationCmd end", "caller", c2v.Caller, "cmd", c2v.Cmd, "value", c2v.Value) switch c2v.Cmd { case constant.CmdNewMsgCome: c.doMsgNew(c2v) diff --git a/internal/conversation_msg/read_drawing.go b/internal/conversation_msg/read_drawing.go index 854b7ebc7..d9abf2e0b 100644 --- a/internal/conversation_msg/read_drawing.go +++ b/internal/conversation_msg/read_drawing.go @@ -210,7 +210,7 @@ func (c *Conversation) doUnreadCount(ctx context.Context, conversation *model_st } if (!latestMsg.IsRead) && datautil.Contain(latestMsg.Seq, seqs...) { c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{ConID: conversation.ConversationID, - Action: constant.UpdateLatestMessageChange, Args: []string{conversation.ConversationID}, Caller: "doUnreadCount"}, Ctx: ctx}) + Action: constant.UpdateLatestMessageChange, Args: []string{conversation.ConversationID}}, Ctx: ctx}) } } else { if err := c.db.UpdateColumnsConversation(ctx, conversation.ConversationID, map[string]interface{}{"unread_count": 0}); err != nil { diff --git a/open_im_sdk/userRelated.go b/open_im_sdk/userRelated.go index 061c8680b..89cf4807a 100644 --- a/open_im_sdk/userRelated.go +++ b/open_im_sdk/userRelated.go @@ -378,7 +378,7 @@ func setListener[T any](ctx context.Context, listener *T, getter func() T, setFu func (u *LoginMgr) run(ctx context.Context) { u.longConnMgr.Run(ctx) go u.msgSyncer.DoListener(ctx) - go common.DoListener(u.conversation, u.ctx) + go common.DoListener(u.ctx, u.conversation) go u.logoutListener(ctx) } diff --git a/pkg/common/cmd.go b/pkg/common/cmd.go index 3371d6c0b..0688e6b7c 100644 --- a/pkg/common/cmd.go +++ b/pkg/common/cmd.go @@ -34,6 +34,7 @@ func sendCmd(ch chan<- Cmd2Value, value Cmd2Value, timeout time.Duration) error if value.Caller == "" { value.Caller = getFuncName() } + log.ZDebug(value.Ctx, "sendCmd chan success", "caller", value.Caller, "cmd", value.Cmd, "value", value.Value) if ch == nil { log.ZError(value.Ctx, "sendCmd chan is nil", ErrChanNil, "caller", value.Caller, "cmd", value.Cmd, "value", value.Value) return ErrChanNil @@ -61,3 +62,29 @@ func getFuncName() string { } return fmt.Sprintf("%s:%d", name, line) } + +type goroutine interface { + Work(cmd Cmd2Value) + GetCh() chan Cmd2Value +} + +func DoListener(ctx context.Context, li goroutine) { + defer func() { + if r := recover(); r != nil { + err := fmt.Sprintf("panic: %+v\n%s", r, debug.Stack()) + log.ZWarn(ctx, "DoListener panic", nil, "panic info", err) + } + }() + + for { + select { + case cmd := <-li.GetCh(): + log.ZInfo(cmd.Ctx, "recv cmd", "caller", cmd.Caller, "cmd", cmd.Cmd, "value", cmd.Value) + li.Work(cmd) + log.ZInfo(cmd.Ctx, "done cmd", "caller", cmd.Caller, "cmd", cmd.Cmd, "value", cmd.Value) + case <-ctx.Done(): + log.ZInfo(ctx, "conversation done sdk logout.....") + return + } + } +} diff --git a/pkg/common/trigger_channel.go b/pkg/common/trigger_channel.go index da9412fe3..9cf92c6bb 100644 --- a/pkg/common/trigger_channel.go +++ b/pkg/common/trigger_channel.go @@ -17,8 +17,6 @@ package common import ( "context" "errors" - "fmt" - "runtime/debug" "time" "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" @@ -152,8 +150,7 @@ type UpdateConNode struct { ConID string Action int //1 Delete the conversation; 2 Update the latest news in the conversation or add a conversation; 3 Put a conversation on the top; // 4 Cancel a conversation on the top, 5 Messages are not read and set to 0, 6 New conversations - Args interface{} - Caller string + Args interface{} } type UpdateMessageNode struct { @@ -180,28 +177,3 @@ type SourceIDAndSessionType struct { FaceURL string Nickname string } - -type goroutine interface { - Work(cmd Cmd2Value) - GetCh() chan Cmd2Value -} - -func DoListener(Li goroutine, ctx context.Context) { - defer func() { - if r := recover(); r != nil { - err := fmt.Sprintf("panic: %+v\n%s", r, debug.Stack()) - - log.ZWarn(ctx, "DoListener panic", nil, "panic info", err) - } - }() - - for { - select { - case cmd := <-Li.GetCh(): - Li.Work(cmd) - case <-ctx.Done(): - log.ZInfo(ctx, "conversation done sdk logout.....") - return - } - } -} From d3ede4838671575bc617e5de4560774a43e276b7 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 27 Sep 2024 14:40:44 +0800 Subject: [PATCH 4/5] feat: Cmd2Value carry caller --- internal/conversation_msg/notification.go | 3 +++ pkg/common/cmd.go | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/conversation_msg/notification.go b/internal/conversation_msg/notification.go index 2cbf17ff0..096781512 100644 --- a/internal/conversation_msg/notification.go +++ b/internal/conversation_msg/notification.go @@ -191,6 +191,9 @@ func (c *Conversation) getConversationLatestMsgClientID(latestMsg string) string } func (c *Conversation) doUpdateConversation(c2v common.Cmd2Value) { + if c2v.Caller == "" { + c2v.Caller = common.GetCaller(3) + } ctx := c2v.Ctx node := c2v.Value.(common.UpdateConNode) log.ZInfo(ctx, "doUpdateConversation", "node", node) diff --git a/pkg/common/cmd.go b/pkg/common/cmd.go index 0688e6b7c..010ad4992 100644 --- a/pkg/common/cmd.go +++ b/pkg/common/cmd.go @@ -32,7 +32,7 @@ type Cmd2Value struct { func sendCmd(ch chan<- Cmd2Value, value Cmd2Value, timeout time.Duration) error { if value.Caller == "" { - value.Caller = getFuncName() + value.Caller = GetCaller(3) } log.ZDebug(value.Ctx, "sendCmd chan success", "caller", value.Caller, "cmd", value.Cmd, "value", value.Value) if ch == nil { @@ -51,8 +51,8 @@ func sendCmd(ch chan<- Cmd2Value, value Cmd2Value, timeout time.Duration) error } } -func getFuncName() string { - pc, _, line, ok := runtime.Caller(3) +func GetCaller(skip int) string { + pc, _, line, ok := runtime.Caller(skip) if !ok { return "runtime.caller.failed" } From 75085809c026b4d54d25ad287b5c86a2d762b0b4 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 27 Sep 2024 14:59:06 +0800 Subject: [PATCH 5/5] feat: Cmd2Value carry caller --- internal/conversation_msg/notification.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/internal/conversation_msg/notification.go b/internal/conversation_msg/notification.go index 096781512..426de3cfd 100644 --- a/internal/conversation_msg/notification.go +++ b/internal/conversation_msg/notification.go @@ -192,7 +192,7 @@ func (c *Conversation) getConversationLatestMsgClientID(latestMsg string) string func (c *Conversation) doUpdateConversation(c2v common.Cmd2Value) { if c2v.Caller == "" { - c2v.Caller = common.GetCaller(3) + c2v.Caller = common.GetCaller(2) } ctx := c2v.Ctx node := c2v.Value.(common.UpdateConNode) @@ -212,7 +212,9 @@ func (c *Conversation) doUpdateConversation(c2v common.Cmd2Value) { oc.LatestMsgSendTime = lc.LatestMsgSendTime oc.LatestMsg = lc.LatestMsg list = append(list, oc) - c.ConversationListener().OnConversationChanged(utils.StructToJsonString(list)) + data := utils.StructToJsonString(list) + log.ZInfo(ctx, "OnConversationChanged", "data", data) + c.ConversationListener().OnConversationChanged(data) } } } else { @@ -298,7 +300,9 @@ func (c *Conversation) doUpdateConversation(c2v common.Cmd2Value) { newCList = append(newCList, v) } } - c.ConversationListener().OnConversationChanged(utils.StructToJsonStringDefault(newCList)) + data := utils.StructToJsonStringDefault(newCList) + log.ZInfo(ctx, "OnConversationChanged", "data", data) + c.ConversationListener().OnConversationChanged(data) } case constant.NewCon: cidList := node.Args.([]string) @@ -313,6 +317,7 @@ func (c *Conversation) doUpdateConversation(c2v common.Cmd2Value) { } case constant.ConChangeDirect: cidList := node.Args.(string) + log.ZInfo(ctx, "ConversationChanged", "cidList", cidList) c.ConversationListener().OnConversationChanged(cidList) case constant.NewConDirect: