Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support FetchSurroundingMessages #741

Merged
merged 11 commits into from
Oct 16, 2024
84 changes: 71 additions & 13 deletions internal/conversation_msg/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,76 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
}
log.ZDebug(ctx, "pull message", "pull cost time", time.Since(t))
t = time.Now()
//var thisMinSeq int64
//for _, v := range list {
// if v.Seq != 0 && thisMinSeq == 0 {
// thisMinSeq = v.Seq
// }
// if v.Seq < thisMinSeq && v.Seq != 0 {
// thisMinSeq = v.Seq
// }
// if v.Status >= constant.MsgStatusHasDeleted {
// log.ZDebug(ctx, "this message has been deleted or exception message", "msg", v)
// continue
// }
// temp := sdk_struct.MsgStruct{}
// temp.ClientMsgID = v.ClientMsgID
// temp.ServerMsgID = v.ServerMsgID
// temp.CreateTime = v.CreateTime
// temp.SendTime = v.SendTime
// temp.SessionType = v.SessionType
// temp.SendID = v.SendID
// temp.RecvID = v.RecvID
// temp.MsgFrom = v.MsgFrom
// temp.ContentType = v.ContentType
// temp.SenderPlatformID = v.SenderPlatformID
// temp.SenderNickname = v.SenderNickname
// temp.SenderFaceURL = v.SenderFaceURL
// temp.Content = v.Content
// temp.Seq = v.Seq
// temp.IsRead = v.IsRead
// temp.Status = v.Status
// var attachedInfo sdk_struct.AttachedInfoElem
// _ = utils.JsonStringToStruct(v.AttachedInfo, &attachedInfo)
// temp.AttachedInfoElem = &attachedInfo
// temp.Ex = v.Ex
// temp.LocalEx = v.LocalEx
// err := c.msgHandleByContentType(&temp)
// if err != nil {
// log.ZError(ctx, "Parsing data error", err, "temp", temp)
// continue
// }
// switch sessionType {
// case constant.WriteGroupChatType:
// fallthrough
// case constant.ReadGroupChatType:
// temp.GroupID = temp.RecvID
// temp.RecvID = c.loginUserID
// }
// if attachedInfo.IsPrivateChat && temp.SendTime+int64(attachedInfo.BurnDuration) < time.Now().Unix() {
// continue
// }
// messageList = append(messageList, &temp)
//}
var thisMinSeq int64
thisMinSeq, messageList = c.LocalChatLog2MsgStruct(ctx, list, sessionType)
log.ZDebug(ctx, "message convert and unmarshal", "unmarshal cost time", time.Since(t))
t = time.Now()
if !isReverse {
sort.Sort(messageList)
}
log.ZDebug(ctx, "sort", "sort cost time", time.Since(t))
messageListCallback.MessageList = messageList
if thisMinSeq == 0 {
thisMinSeq = req.LastMinSeq
}
messageListCallback.LastMinSeq = thisMinSeq
return &messageListCallback, nil

}

func (c *Conversation) LocalChatLog2MsgStruct(ctx context.Context, list []*model_struct.LocalChatLog, sessionType int) (int64, []*sdk_struct.MsgStruct) {
messageList := make([]*sdk_struct.MsgStruct, 0, len(list))
var thisMinSeq int64
for _, v := range list {
if v.Seq != 0 && thisMinSeq == 0 {
Expand Down Expand Up @@ -161,19 +231,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
}
messageList = append(messageList, &temp)
}
log.ZDebug(ctx, "message convert and unmarshal", "unmarshal cost time", time.Since(t))
t = time.Now()
if !isReverse {
sort.Sort(messageList)
}
log.ZDebug(ctx, "sort", "sort cost time", time.Since(t))
messageListCallback.MessageList = messageList
if thisMinSeq == 0 {
thisMinSeq = req.LastMinSeq
}
messageListCallback.LastMinSeq = thisMinSeq
return &messageListCallback, nil

return thisMinSeq, messageList
}

func (c *Conversation) typingStatusUpdate(ctx context.Context, recvID, msgTip string) error {
Expand Down
51 changes: 51 additions & 0 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
"math"
"sync"

Expand Down Expand Up @@ -1013,3 +1014,53 @@ func (c *Conversation) GetInputStates(ctx context.Context, conversationID string
func (c *Conversation) ChangeInputStates(ctx context.Context, conversationID string, focus bool) error {
return c.typing.ChangeInputStates(ctx, conversationID, focus)
}

func (c *Conversation) FetchSurroundingMessages(ctx context.Context, conversationID string, seq int64, before int64, after int64) ([]*sdk_struct.MsgStruct, error) {
lc, err := c.db.GetConversation(ctx, conversationID)
if err != nil {
return nil, err
}
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, []int64{seq}, false, false, 0, 0, &[]*model_struct.LocalChatLog{}, &sdk.GetAdvancedHistoryMessageListCallback{})
res, err := c.db.GetMessagesBySeqs(ctx, conversationID, []int64{seq})
if err != nil {
return nil, err
}
if len(res) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
_, msgList := c.LocalChatLog2MsgStruct(ctx, []*model_struct.LocalChatLog{res[0]}, int(lc.ConversationType))
if len(msgList) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
msg := msgList[0]
result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
if before > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
LastMinSeq: msg.Seq,
ConversationID: conversationID,
Count: int(before),
StartClientMsgID: msg.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
result = append(result, msg)
if after > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
LastMinSeq: msg.Seq,
ConversationID: conversationID,
Count: int(after),
StartClientMsgID: msg.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
sort.Sort(sdk_struct.NewMsgList(result))
return result, nil
}
5 changes: 1 addition & 4 deletions internal/conversation_msg/entering.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,7 @@ func (e *typing) onNewMsg(ctx context.Context, msg *sdkws.MsgData) {
return
}
now := time.Now().UnixMilli()
expirationTimestamp := msg.SendTime + int64(inputStatesSendTime/time.Millisecond)
if msg.SendTime > now || expirationTimestamp <= now {
return
}
expirationTimestamp := now + int64(inputStatesSendTime/time.Millisecond)
var sourceID string
if msg.GroupID == "" {
sourceID = msg.SendID
Expand Down
4 changes: 4 additions & 0 deletions open_im_sdk/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,7 @@ func ChangeInputStates(callback open_im_sdk_callback.Base, operationID string, c
func GetInputStates(callback open_im_sdk_callback.Base, operationID string, conversationID string, userID string) {
call(callback, operationID, UserForSDK.Conversation().GetInputStates, conversationID, userID)
}

func FetchSurroundingMessages(callback open_im_sdk_callback.Base, operationID string, conversationID string, seq int64, before int64, after int64) {
call(callback, operationID, UserForSDK.Conversation().FetchSurroundingMessages, conversationID, seq, before, after)
}
12 changes: 9 additions & 3 deletions test/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ package test

import "github.com/openimsdk/protocol/constant"

//const (
// APIADDR = "http://127.0.0.1:10002"
// WSADDR = "ws://127.0.0.1:10001"
// UserID = "3717417654"
//)

const (
APIADDR = "http://127.0.0.1:10002"
WSADDR = "ws://127.0.0.1:10001"
UserID = "3717417654"
APIADDR = "http://172.16.8.135:10002"
WSADDR = "ws://172.16.8.135:10001"
UserID = "3725415129"
)

const (
Expand Down
13 changes: 13 additions & 0 deletions test/create_msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,16 @@ func Test_CreateForwardMessage(t *testing.T) {
}
t.Log(message)
}

func Test_FetchSurroundingMessages(t *testing.T) {
msgs, err := open_im_sdk.UserForSDK.Conversation().FetchSurroundingMessages(ctx, "sg_3559850526", 15, 14, 8)
if err != nil {
t.Error(err)
return
}
t.Log(len(msgs))
for _, msg := range msgs {
t.Logf("[%d] %#v", msg.Seq, msg.TextElem)
}
t.Log(msgs)
}
6 changes: 5 additions & 1 deletion test/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ func init() {
if err := open_im_sdk.UserForSDK.Login(ctx, UserID, token); err != nil {
panic(err)
}
open_im_sdk.UserForSDK.SetConversationListener(&onConversationListener{ctx: ctx})
ch := make(chan error)
open_im_sdk.UserForSDK.SetConversationListener(&onConversationListener{ctx: ctx, ch: ch})
open_im_sdk.UserForSDK.SetGroupListener(&onGroupListener{ctx: ctx})
open_im_sdk.UserForSDK.SetAdvancedMsgListener(&onAdvancedMsgListener{ctx: ctx})
open_im_sdk.UserForSDK.SetFriendshipListener(&onFriendshipListener{ctx: ctx})
open_im_sdk.UserForSDK.SetUserListener(&onUserListener{ctx: ctx})
if err := <-ch; err != nil {
panic(err)
}
}

func getConf(APIADDR, WSADDR string) sdk_struct.IMConfig {
Expand Down
4 changes: 4 additions & 0 deletions test/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package test

import (
"context"
"fmt"
"github.com/openimsdk/tools/log"
)

Expand Down Expand Up @@ -45,6 +46,7 @@ func (c *OnConnListener) OnUserTokenExpired() {

type onConversationListener struct {
ctx context.Context
ch chan error
}

func (o *onConversationListener) OnSyncServerStart(reinstalled bool) {
Expand All @@ -53,10 +55,12 @@ func (o *onConversationListener) OnSyncServerStart(reinstalled bool) {

func (o *onConversationListener) OnSyncServerFinish(reinstalled bool) {
log.ZInfo(o.ctx, "OnSyncServerFinish")
o.ch <- nil
}

func (o *onConversationListener) OnSyncServerFailed(reinstalled bool) {
log.ZInfo(o.ctx, "OnSyncServerFailed")
o.ch <- fmt.Errorf("OnSyncServerFailed")
}

func (o *onConversationListener) OnSyncServerProgress(progress int) {
Expand Down
1 change: 1 addition & 0 deletions wasm/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func registerFunc() {

js.Global().Set("changeInputStates", js.FuncOf(wrapperConMsg.ChangeInputStates))
js.Global().Set("getInputStates", js.FuncOf(wrapperConMsg.GetInputStates))
js.Global().Set("fetchSurroundingMessages", js.FuncOf(wrapperConMsg.FetchSurroundingMessages))

//register group func
wrapperGroup := wasm_wrapper.NewWrapperGroup(globalFuc)
Expand Down
5 changes: 5 additions & 0 deletions wasm/wasm_wrapper/wasm_conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,8 @@ func (w *WrapperConMsg) GetInputStates(_ js.Value, args []js.Value) interface{}
callback := event_listener.NewBaseCallback(utils.FirstLower(utils.GetSelfFuncName()), w.commonFunc)
return event_listener.NewCaller(open_im_sdk.GetInputStates, callback, &args).AsyncCallWithCallback()
}

func (w *WrapperConMsg) FetchSurroundingMessages(_ js.Value, args []js.Value) interface{} {
callback := event_listener.NewBaseCallback(utils.FirstLower(utils.GetSelfFuncName()), w.commonFunc)
return event_listener.NewCaller(open_im_sdk.FetchSurroundingMessages, callback, &args).AsyncCallWithCallback()
}