Skip to content

Commit

Permalink
feat: searchLocalMessages by SenderUserID. (#739)
Browse files Browse the repository at this point in the history
* feat: searchLocalMessages by SenderUserID.

* update wasm db interface and fix error.

* add searchBykeyword logic.

* update wasm para.

* update logic.

* feat: improve method implement.

* update logic.

* try empty

* update sql query space.
  • Loading branch information
mo3et authored Nov 1, 2024
1 parent c858b54 commit 7196301
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 91 deletions.
39 changes: 26 additions & 13 deletions internal/conversation_msg/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,13 @@ func (c *Conversation) searchLocalMessages(ctx context.Context, searchParam *sdk
if err != nil {
return nil, err
}

// Search by content type or keyword based on provided parameters
if len(searchParam.MessageTypeList) != 0 && len(searchParam.KeywordList) == 0 {
list, err = c.db.SearchMessageByContentType(ctx, searchParam.MessageTypeList, searchParam.ConversationID, startTime, endTime, offset, searchParam.Count)
list, err = c.db.SearchMessageByContentType(ctx, searchParam.MessageTypeList, searchParam.SenderUserIDList, searchParam.ConversationID, startTime, endTime, offset, searchParam.Count)
if err != nil {
return nil, err
}
} else {
newContentTypeList := func(list []int) (result []int) {
for _, v := range list {
Expand All @@ -349,18 +353,24 @@ func (c *Conversation) searchLocalMessages(ctx context.Context, searchParam *sdk
}
return result
}(searchParam.MessageTypeList)

if len(newContentTypeList) == 0 {
newContentTypeList = SearchContentType
}
list, err = c.db.SearchMessageByKeyword(ctx, newContentTypeList, searchParam.KeywordList, searchParam.KeywordListMatchType,
searchParam.ConversationID, startTime, endTime, offset, searchParam.Count)

list, err = c.db.SearchMessageByKeyword(ctx, newContentTypeList, searchParam.SenderUserIDList, searchParam.KeywordList,
searchParam.KeywordListMatchType, searchParam.ConversationID, startTime, endTime, offset, searchParam.Count)
if err != nil {
return nil, err
}
}
} else {
// Comprehensive search across all conversations
if len(searchParam.MessageTypeList) == 0 {
searchParam.MessageTypeList = SearchContentType
}
list, err = c.searchMessageByContentTypeAndKeyword(ctx, searchParam.MessageTypeList, searchParam.KeywordList, searchParam.KeywordListMatchType, startTime, endTime)

list, err = c.searchMessageByContentTypeAndKeyword(ctx, searchParam.MessageTypeList, searchParam.SenderUserIDList, searchParam.KeywordList, searchParam.KeywordListMatchType, startTime, endTime)
}

// Handle any errors encountered during the search
Expand All @@ -377,7 +387,7 @@ func (c *Conversation) searchLocalMessages(ctx context.Context, searchParam *sdk
//log.Debug("hahh",utils.KMP("SSSsdf3434","F3434"))
//log.Debug("hahh",utils.KMP("SSSsdf3434","SDF3"))
// log.Debug("", "get raw data length is", len(list))
log.ZDebug(ctx, "get raw data length is", len(list))
log.ZDebug(ctx, "get raw data length is", "len", len(list))

for _, v := range list {
temp := sdk_struct.MsgStruct{}
Expand Down Expand Up @@ -465,21 +475,23 @@ func (c *Conversation) searchLocalMessages(ctx context.Context, searchParam *sdk
return &r, nil // Return the final search results
}

func (c *Conversation) searchMessageByContentTypeAndKeyword(ctx context.Context, contentType []int, keywordList []string,
func (c *Conversation) searchMessageByContentTypeAndKeyword(ctx context.Context, contentType []int, senderUserIDList []string, keywordList []string,
keywordListMatchType int, startTime, endTime int64) (result []*model_struct.LocalChatLog, err error) {
var list []*model_struct.LocalChatLog

conversationIDList, err := c.db.GetAllConversationIDList(ctx)
if err != nil {
return nil, err
}

var mu sync.Mutex
g, _ := errgroup.WithContext(ctx)
g.SetLimit(searchMessageGoroutineLimit)
for _, v := range conversationIDList {
conversationID := v
g.Go(func() error {
sList, err := c.db.SearchMessageByContentTypeAndKeyword(ctx, contentType, conversationID, keywordList, keywordListMatchType, startTime, endTime)
eg, _ := errgroup.WithContext(ctx)
eg.SetLimit(searchMessageGoroutineLimit)
for _, cID := range conversationIDList {
conversationID := cID

eg.Go(func() error {
sList, err := c.db.SearchMessageByContentTypeAndKeyword(ctx, contentType, conversationID, senderUserIDList, keywordList, keywordListMatchType, startTime, endTime)
if err != nil {
log.ZWarn(ctx, "search conversation message", err, "conversationID", conversationID)
return nil
Expand All @@ -488,11 +500,12 @@ func (c *Conversation) searchMessageByContentTypeAndKeyword(ctx context.Context,
mu.Lock()
list = append(list, sList...)
mu.Unlock()

return nil
})
}

if err := g.Wait(); err != nil {
if err := eg.Wait(); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (

const (
conversationSyncLimit int64 = math.MaxInt64
searchMessageGoroutineLimit = 10
searchMessageGoroutineLimit int = 10
)

var SearchContentType = []int{constant.Text, constant.AtText, constant.File}
Expand Down
133 changes: 85 additions & 48 deletions pkg/db/chat_log_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"strings"

"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
Expand Down Expand Up @@ -221,88 +222,124 @@ func (d *DataBase) DeleteConversationMsgsBySeqs(ctx context.Context, conversatio
return errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Where("seq IN ?", seqs).Delete(model_struct.LocalChatLog{}).Error, "DeleteConversationMsgs failed")
}

func (d *DataBase) SearchMessageByContentType(ctx context.Context, contentType []int, conversationID string, startTime, endTime int64, offset, count int) (result []*model_struct.LocalChatLog, err error) {
func (d *DataBase) SearchMessageByContentType(ctx context.Context, contentType []int, senderUserIDList []string, conversationID string, startTime, endTime int64, offset, count int) (result []*model_struct.LocalChatLog, err error) {
d.mRWMutex.RLock()
defer d.mRWMutex.RUnlock()
condition := fmt.Sprintf("send_time between %d and %d AND status <=%d And content_type IN ?", startTime, endTime, constant.MsgStatusSendFailed)
err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Where(condition, contentType).Order("send_time DESC").Offset(offset).Limit(count).Find(&result).Error, "SearchMessage failed")

var condition strings.Builder
var args []interface{}

condition.WriteString("send_time between ? AND ? AND status <= ? AND content_type IN (?) ")
args = append(args, startTime, endTime, constant.MsgStatusSendFailed, contentType)

if len(senderUserIDList) != 0 {
condition.WriteString(" And send_id IN (?)")
args = append(args, senderUserIDList)
}

err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Where(condition.String(), args...).Order("send_time DESC").Offset(offset).Limit(count).Find(&result).Error, "SearchMessage failed")
return result, err
}
func (d *DataBase) SearchMessageByKeyword(ctx context.Context, contentType []int, keywordList []string, keywordListMatchType int, conversationID string, startTime, endTime int64, offset, count int) (result []*model_struct.LocalChatLog, err error) {

func (d *DataBase) SearchMessageByKeyword(ctx context.Context, contentType []int, senderUserIDList []string, keywordList []string, keywordListMatchType int, conversationID string, startTime, endTime int64, offset, count int) (result []*model_struct.LocalChatLog, err error) {
d.mRWMutex.RLock()
defer d.mRWMutex.RUnlock()
var condition string
var subCondition string

var condition strings.Builder
var subCondition strings.Builder
var args []interface{}

condition.WriteString(" send_time between ? AND ? AND status <= ? AND content_type IN (?)")
args = append(args, startTime, endTime, constant.MsgStatusSendFailed, contentType)

// Construct a sub-condition for SQL query based on keyword list and match type
if keywordListMatchType == constant.KeywordMatchOr {
for i := 0; i < len(keywordList); i++ {
if i == 0 {
subCondition += "And ("
// Use OR logic if keywordListMatchType is KeywordMatchOr
subCondition.WriteString(" AND (")
for i, keyword := range keywordList {
if i > 0 {
subCondition.WriteString(" OR ")
}
if i+1 >= len(keywordList) {
subCondition += "content like " + "'%" + keywordList[i] + "%') "
} else {
subCondition += "content like " + "'%" + keywordList[i] + "%' " + "or "

}
subCondition.WriteString("content LIKE ?")
args = append(args, "%"+keyword+"%")
}
subCondition.WriteString(") ")
} else {
for i := 0; i < len(keywordList); i++ {
if i == 0 {
subCondition += "And ("
}
if i+1 >= len(keywordList) {
subCondition += "content like " + "'%" + keywordList[i] + "%') "
} else {
subCondition += "content like " + "'%" + keywordList[i] + "%' " + "and "
// Use AND logic for other keywordListMatchType
subCondition.WriteString(" AND (")
for i, keyword := range keywordList {
if i > 0 {
subCondition.WriteString(" AND ")
}

subCondition.WriteString("content LIKE ?")
args = append(args, "%"+keyword+"%")
}
subCondition.WriteString(") ")
}
condition = fmt.Sprintf(" send_time between %d and %d AND status <=%d And content_type IN ? ", startTime, endTime, constant.MsgStatusSendFailed)
condition += subCondition
err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Where(condition, contentType).Order("send_time DESC").Offset(offset).Limit(count).Find(&result).Error, "SearchMessage failed")

condition.WriteString(subCondition.String())

if senderUserIDList != nil {
condition.WriteString(" And send_id IN (?)")
args = append(args, senderUserIDList)
}

err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Where(condition.String(), args...).Order("send_time DESC").Offset(offset).Limit(count).Find(&result).Error, "SearchMessage failed")

return result, err
}

// SearchMessageByContentTypeAndKeyword searches for messages in the database that match specified content types and keywords within a given time range.
func (d *DataBase) SearchMessageByContentTypeAndKeyword(ctx context.Context, contentType []int, conversationID string, keywordList []string, keywordListMatchType int, startTime, endTime int64) (result []*model_struct.LocalChatLog, err error) {
func (d *DataBase) SearchMessageByContentTypeAndKeyword(ctx context.Context, contentType []int, conversationID string, senderUserIDList []string, keywordList []string, keywordListMatchType int, startTime, endTime int64) (result []*model_struct.LocalChatLog, err error) {
d.mRWMutex.RLock()
defer d.mRWMutex.RUnlock()
var condition string
var subCondition string

var condition strings.Builder
var subCondition strings.Builder
var args []interface{}

// Construct the main SQL condition string
condition.WriteString(" send_time between ? AND ? AND status <= ? AND content_type IN (?)")
args = append(args, startTime, endTime, constant.MsgStatusSendFailed, contentType)

// Construct a sub-condition for SQL query based on keyword list and match type
if keywordListMatchType == constant.KeywordMatchOr {
// Use OR logic if keywordListMatchType is KeywordMatchOr
for i := 0; i < len(keywordList); i++ {
if i == 0 {
subCondition += "And ("
}
if i+1 >= len(keywordList) {
subCondition += "content like " + "'%" + keywordList[i] + "%') "
} else {
subCondition += "content like " + "'%" + keywordList[i] + "%' " + "or "
subCondition.WriteString(" AND (")
for i, keyword := range keywordList {
if i > 0 {
subCondition.WriteString(" OR ")
}

subCondition.WriteString("content LIKE ?")
args = append(args, "%"+keyword+"%")
}
subCondition.WriteString(") ")
} else {
// Use AND logic for other keywordListMatchType
for i := 0; i < len(keywordList); i++ {
if i == 0 {
subCondition += "And ("
}
if i+1 >= len(keywordList) {
subCondition += "content like " + "'%" + keywordList[i] + "%') "
} else {
subCondition += "content like " + "'%" + keywordList[i] + "%' " + "and "
subCondition.WriteString(" AND (")
for i, keyword := range keywordList {
if i > 0 {
subCondition.WriteString(" AND ")
}

subCondition.WriteString("content LIKE ?")
args = append(args, "%"+keyword+"%")
}
subCondition.WriteString(") ")
}

// Construct the main SQL condition string
condition = fmt.Sprintf("send_time between %d and %d AND status <=%d And content_type IN ? ", startTime, endTime, constant.MsgStatusSendFailed)
condition += subCondition
condition.WriteString(subCondition.String())

if senderUserIDList != nil {
condition.WriteString(" And send_id IN (?)")
args = append(args, senderUserIDList)
}

// Execute the query using the constructed condition and handle errors
err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Where(condition, contentType).Order("send_time DESC").Find(&result).Error, "SearchMessage failed")
err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Where(condition.String(), args...).Order("send_time DESC").Find(&result).Error, "SearchMessage failed")

return result, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/db/db_interface/databse.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ type GroupModel interface {
type MessageModel interface {
BatchInsertMessageList(ctx context.Context, conversationID string, MessageList []*model_struct.LocalChatLog) error
InsertMessage(ctx context.Context, conversationID string, Message *model_struct.LocalChatLog) error
SearchMessageByKeyword(ctx context.Context, contentType []int, keywordList []string, keywordListMatchType int, conversationID string, startTime, endTime int64, offset, count int) (result []*model_struct.LocalChatLog, err error)
SearchMessageByContentType(ctx context.Context, contentType []int, conversationID string, startTime, endTime int64, offset, count int) (result []*model_struct.LocalChatLog, err error)
SearchMessageByContentTypeAndKeyword(ctx context.Context, contentType []int, conversationID string, keywordList []string, keywordListMatchType int, startTime, endTime int64) (result []*model_struct.LocalChatLog, err error)
SearchMessageByKeyword(ctx context.Context, contentType []int, senderUserIDList []string, keywordList []string, keywordListMatchType int, conversationID string, startTime, endTime int64, offset, count int) (result []*model_struct.LocalChatLog, err error)
SearchMessageByContentType(ctx context.Context, contentType []int, senderUserIDList []string, conversationID string, startTime, endTime int64, offset, count int) (result []*model_struct.LocalChatLog, err error)
SearchMessageByContentTypeAndKeyword(ctx context.Context, contentType []int, conversationID string, senderUserIDList []string, keywordList []string, keywordListMatchType int, startTime, endTime int64) (result []*model_struct.LocalChatLog, err error)
GetMessage(ctx context.Context, conversationID, clientMsgID string) (*model_struct.LocalChatLog, error)
GetMessageBySeq(ctx context.Context, conversationID string, seq int64) (*model_struct.LocalChatLog, error)
UpdateColumnsMessage(ctx context.Context, conversationID string, ClientMsgID string, args map[string]interface{}) error
Expand Down
1 change: 1 addition & 0 deletions test/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import "github.com/openimsdk/protocol/constant"
const (
APIADDR = "http://127.0.0.1:10002"
WSADDR = "ws://127.0.0.1:10001"

UserID = "2237746339"
)

Expand Down
14 changes: 12 additions & 2 deletions test/conversation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,23 @@ func Test_InsertGroupMessageToLocalStorage(t *testing.T) {
}

func Test_SearchLocalMessages(t *testing.T) {
// req := &sdk_params_callback.SearchLocalMessagesParams{
// Count: 20,
// KeywordList: []string{"1"},
// MessageTypeList: []int{105},
// PageIndex: 1,
// SenderUserIDList: []string{},
// }

req := &sdk_params_callback.SearchLocalMessagesParams{
Count: 20,
KeywordList: []string{"1"},
ConversationID: "sg_3161900504",
MessageTypeList: []int{105},
PageIndex: 1,
SenderUserIDList: []string{},
Count: 20,
SenderUserIDList: []string{"1695766238"},
}

msgs, err := open_im_sdk.UserForSDK.Conversation().SearchLocalMessages(ctx, req)
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 7196301

Please sign in to comment.