Skip to content

Commit

Permalink
feat: Cmd2Value carry caller
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Sep 27, 2024
1 parent a5f3883 commit 8f66e79
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 40 deletions.
63 changes: 63 additions & 0 deletions pkg/common/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package common

import (
"context"
"fmt"
"github.com/openimsdk/tools/log"
"runtime"
"runtime/debug"
"strings"
"time"
)

var packet string

func init() {
build, ok := debug.ReadBuildInfo()
if !ok {
return
}
packet = build.Main.Path
if packet != "" && !strings.HasSuffix(packet, "/") {
packet += "/"
}
}

type Cmd2Value struct {
Cmd string
Value any
Caller string
Ctx context.Context
}

func sendCmd(ch chan<- Cmd2Value, value Cmd2Value, timeout time.Duration) error {
if value.Caller == "" {
value.Caller = getFuncName()
}
if ch == nil {
log.ZError(value.Ctx, "sendCmd chan is nil", ErrChanNil, "caller", value.Caller, "cmd", value.Cmd, "value", value.Value)
return ErrChanNil
}
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case ch <- value:
log.ZInfo(value.Ctx, "sendCmd chan success", "caller", value.Caller, "cmd", value.Cmd, "value", value.Value)
return nil
case <-timer.C:
log.ZError(value.Ctx, "sendCmd chan timeout", ErrTimeout, "caller", value.Caller, "cmd", value.Cmd, "value", value.Value)
return ErrTimeout
}
}

func getFuncName() string {
pc, _, line, ok := runtime.Caller(3)
if !ok {
return "runtime.caller.failed"
}
name := runtime.FuncForPC(pc).Name()
if packet != "" {
name = strings.TrimPrefix(name, packet)
}
return fmt.Sprintf("%s:%d", name, line)
}
66 changes: 26 additions & 40 deletions pkg/common/trigger_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,20 @@ import (
"github.com/openimsdk/protocol/sdkws"
)

const (
timeOut = 10000
)
const timeout = time.Millisecond * 10000

var ErrChanNil = errs.New("channal == nil")
var (
ErrChanNil = errs.New("channal == nil")
ErrTimeout = errors.New("send cmd timeout")
)

func TriggerCmdNewMsgCome(ctx context.Context, msg sdk_struct.CmdNewMsgComeToConversation, conversationCh chan Cmd2Value) error {
if conversationCh == nil {
return errs.Wrap(ErrChanNil)
}

c2v := Cmd2Value{Cmd: constant.CmdNewMsgCome, Value: msg, Ctx: ctx}
return sendCmd(conversationCh, c2v, timeOut)
return sendCmd(conversationCh, c2v, timeout)
}

func TriggerCmdMsgSyncInReinstall(ctx context.Context, msg sdk_struct.CmdMsgSyncInReinstall, conversationCh chan Cmd2Value) error {
Expand All @@ -51,20 +52,20 @@ func TriggerCmdMsgSyncInReinstall(ctx context.Context, msg sdk_struct.CmdMsgSync
}

c2v := Cmd2Value{Cmd: constant.CmdMsgSyncInReinstall, Value: msg, Ctx: ctx}
return sendCmd(conversationCh, c2v, timeOut)
return sendCmd(conversationCh, c2v, timeout)
}

func TriggerCmdNotification(ctx context.Context, msg sdk_struct.CmdNewMsgComeToConversation, conversationCh chan Cmd2Value) {
c2v := Cmd2Value{Cmd: constant.CmdNotification, Value: msg, Ctx: ctx}
err := sendCmd(conversationCh, c2v, timeOut)
err := sendCmd(conversationCh, c2v, timeout)
if err != nil {
log.ZWarn(ctx, "TriggerCmdNotification error", err, "msg", msg)
}
}

func TriggerCmdSyncFlag(ctx context.Context, syncFlag int, conversationCh chan Cmd2Value) {
c2v := Cmd2Value{Cmd: constant.CmdSyncFlag, Value: sdk_struct.CmdNewMsgComeToConversation{SyncFlag: syncFlag}, Ctx: ctx}
err := sendCmd(conversationCh, c2v, timeOut)
err := sendCmd(conversationCh, c2v, timeout)
if err != nil {
log.ZWarn(ctx, "TriggerCmdNotification error", err, "syncFlag", syncFlag)
}
Expand All @@ -75,12 +76,12 @@ func TriggerCmdWakeUpDataSync(ctx context.Context, ch chan Cmd2Value) error {
return errs.Wrap(ErrChanNil)
}
c2v := Cmd2Value{Cmd: constant.CmdWakeUpDataSync, Value: nil, Ctx: ctx}
return sendCmd(ch, c2v, timeOut)
return sendCmd(ch, c2v, timeout)
}

func TriggerCmdSyncData(ctx context.Context, ch chan Cmd2Value) {
c2v := Cmd2Value{Cmd: constant.CmdSyncData, Value: nil, Ctx: ctx}
err := sendCmd(ch, c2v, timeOut)
err := sendCmd(ch, c2v, timeout)
if err != nil {
log.ZWarn(ctx, "TriggerCmdSyncData error", err)
}
Expand All @@ -92,8 +93,11 @@ func TriggerCmdUpdateConversation(ctx context.Context, node UpdateConNode, conve
Value: node,
Ctx: ctx,
}

return sendCmd(conversationCh, c2v, timeOut)
err := sendCmd(conversationCh, c2v, timeout)
if err != nil {
_, _ = len(conversationCh), cap(conversationCh)
}
return err
}

func TriggerCmdUpdateMessage(ctx context.Context, node UpdateMessageNode, conversationCh chan Cmd2Value) error {
Expand All @@ -102,68 +106,66 @@ func TriggerCmdUpdateMessage(ctx context.Context, node UpdateMessageNode, conver
Value: node,
Ctx: ctx,
}

return sendCmd(conversationCh, c2v, timeOut)
return sendCmd(conversationCh, c2v, timeout)
}

// Push message, msg for msgData slice
// TriggerCmdPushMsg Push message, msg for msgData slice
func TriggerCmdPushMsg(ctx context.Context, msg *sdkws.PushMessages, ch chan Cmd2Value) error {
if ch == nil {
return errs.Wrap(ErrChanNil)
}

c2v := Cmd2Value{Cmd: constant.CmdPushMsg, Value: msg, Ctx: ctx}
return sendCmd(ch, c2v, timeOut)
return sendCmd(ch, c2v, timeout)
}

func TriggerCmdLogOut(ctx context.Context, ch chan Cmd2Value) error {
if ch == nil {
return errs.Wrap(ErrChanNil)
}
c2v := Cmd2Value{Cmd: constant.CmdLogOut, Ctx: ctx}
return sendCmd(ch, c2v, timeOut)
return sendCmd(ch, c2v, timeout)
}

// Connection success trigger
// TriggerCmdConnected Connection success trigger
func TriggerCmdConnected(ctx context.Context, ch chan Cmd2Value) error {
if ch == nil {
return errs.Wrap(ErrChanNil)
}
c2v := Cmd2Value{Cmd: constant.CmdConnSuccesss, Value: nil, Ctx: ctx}
return sendCmd(ch, c2v, timeOut)
return sendCmd(ch, c2v, timeout)
}

type DeleteConNode struct {
SourceID string
ConversationID string
SessionType int
}

type SyncReactionExtensionsNode struct {
OperationID string
Action int
Args interface{}
}

type UpdateConNode struct {
ConID string
Action int //1 Delete the conversation; 2 Update the latest news in the conversation or add a conversation; 3 Put a conversation on the top;
// 4 Cancel a conversation on the top, 5 Messages are not read and set to 0, 6 New conversations
Args interface{}
Caller string
}

type UpdateMessageNode struct {
Action int
Args interface{}
}

type Cmd2Value struct {
Cmd string
Value interface{}
Ctx context.Context
}
type UpdateConInfo struct {
UserID string
GroupID string
}

type UpdateMessageInfo struct {
SessionType int32
UserID string
Expand All @@ -179,15 +181,9 @@ type SourceIDAndSessionType struct {
Nickname string
}

func UnInitAll(conversationCh chan Cmd2Value) error {
c2v := Cmd2Value{Cmd: constant.CmdUnInit}
return sendCmd(conversationCh, c2v, timeOut)
}

type goroutine interface {
Work(cmd Cmd2Value)
GetCh() chan Cmd2Value
//GetContext() context.Context
}

func DoListener(Li goroutine, ctx context.Context) {
Expand All @@ -208,14 +204,4 @@ func DoListener(Li goroutine, ctx context.Context) {
return
}
}

}

func sendCmd(ch chan<- Cmd2Value, value Cmd2Value, timeout int64) error {
select {
case ch <- value:
return nil
case <-time.After(time.Millisecond * time.Duration(timeout)):
return errors.New("send cmd timeout")
}
}

0 comments on commit 8f66e79

Please sign in to comment.