Skip to content

Commit

Permalink
fix: the original quoted message is withdrawn and the quoted original…
Browse files Browse the repository at this point in the history
… message is displayed. (#1388)

* fix: GetUserReqApplicationList error when there is a disbanded group chat

* fix: error when querying some information about disbanded group

* fix: GetUserReqApplicationList dismissed group error

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* fix: the original message referenced by the pull message processing is withdrawn

* merge

* cicd: robot automated Change

---------

Co-authored-by: withchao <[email protected]>
  • Loading branch information
withchao and withchao authored Nov 13, 2023
1 parent 7153eeb commit 2ac54e0
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 73 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ require (
github.com/go-sql-driver/mysql v1.7.1
github.com/redis/go-redis/v9 v9.2.1
github.com/tencentyun/cos-go-sdk-v5 v0.7.45
go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.4.0
gopkg.in/src-d/go-git.v4 v4.13.1
gotest.tools v2.2.0+incompatible
)
Expand Down Expand Up @@ -127,12 +129,10 @@ require (
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand Down
3 changes: 2 additions & 1 deletion internal/api/third.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package api

import (
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"math/rand"
"net/http"
"strconv"

config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"

"github.com/gin-gonic/gin"

"github.com/OpenIMSDK/protocol/third"
Expand Down
1 change: 1 addition & 0 deletions internal/push/offlinepush/dummy/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dummy

import (
"context"

"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
)

Expand Down
3 changes: 2 additions & 1 deletion internal/tools/cron_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"testing"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)

func TestDisLock(t *testing.T) {
Expand Down
34 changes: 17 additions & 17 deletions pkg/apistruct/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,37 @@ import (
type SendMsg struct {
// SendID uniquely identifies the sender.
SendID string `json:"sendID" binding:"required"`

// GroupID is the identifier for the group, required if SessionType is 2 or 3.
GroupID string `json:"groupID" binding:"required_if=SessionType 2|required_if=SessionType 3"`

// SenderNickname is the nickname of the sender.
SenderNickname string `json:"senderNickname"`

// SenderFaceURL is the URL to the sender's avatar.
SenderFaceURL string `json:"senderFaceURL"`

// SenderPlatformID is an integer identifier for the sender's platform.
SenderPlatformID int32 `json:"senderPlatformID"`

// Content is the actual content of the message, required and excluded from Swagger documentation.
Content map[string]interface{} `json:"content" binding:"required" swaggerignore:"true"`

// ContentType is an integer that represents the type of the content.
ContentType int32 `json:"contentType" binding:"required"`

// SessionType is an integer that represents the type of session for the message.
SessionType int32 `json:"sessionType" binding:"required"`

// IsOnlineOnly specifies if the message is only sent when the receiver is online.
IsOnlineOnly bool `json:"isOnlineOnly"`

// NotOfflinePush specifies if the message should not trigger offline push notifications.
NotOfflinePush bool `json:"notOfflinePush"`

// SendTime is a timestamp indicating when the message was sent.
SendTime int64 `json:"sendTime"`

// OfflinePushInfo contains information for offline push notifications.
OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"`
}
Expand All @@ -67,10 +67,10 @@ type SendMsgReq struct {
// BatchSendMsgReq defines the structure for sending a message to multiple recipients.
type BatchSendMsgReq struct {
SendMsg

// IsSendAll indicates whether the message should be sent to all users.
IsSendAll bool `json:"isSendAll"`

// RecvIDs is a slice of receiver identifiers to whom the message will be sent, required field.
RecvIDs []string `json:"recvIDs" binding:"required"`
}
Expand All @@ -79,7 +79,7 @@ type BatchSendMsgReq struct {
type BatchSendMsgResp struct {
// Results is a slice of SingleReturnResult, representing the outcome of each message sent.
Results []*SingleReturnResult `json:"results"`

// FailedIDs is a slice of user IDs for whom the message send failed.
FailedIDs []string `json:"failedUserIDs"`
}
Expand All @@ -88,13 +88,13 @@ type BatchSendMsgResp struct {
type SingleReturnResult struct {
// ServerMsgID is the message identifier on the server-side.
ServerMsgID string `json:"serverMsgID"`

// ClientMsgID is the message identifier on the client-side.
ClientMsgID string `json:"clientMsgID"`

// SendTime is the timestamp of when the message was sent.
SendTime int64 `json:"sendTime"`

// RecvID uniquely identifies the receiver of the message.
RecvID string `json:"recvID"`
}
4 changes: 3 additions & 1 deletion pkg/common/cmd/msg_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package cmd

import (
"fmt"

"github.com/OpenIMSDK/protocol/constant"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/spf13/cobra"

config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"

"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
)

Expand Down
103 changes: 95 additions & 8 deletions pkg/common/db/controller/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,27 @@ package controller

import (
"context"
"encoding/json"
"errors"
"time"

"github.com/OpenIMSDK/protocol/constant"

"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"

"github.com/redis/go-redis/v9"

"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"

"go.mongodb.org/mongo-driver/mongo"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"go.mongodb.org/mongo-driver/mongo"

pbmsg "github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws"
Expand Down Expand Up @@ -397,7 +402,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
// log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs)
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs)
if err != nil {
return nil, err
}
Expand All @@ -408,12 +413,94 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat
return totalMsgs, nil
}

func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, seqs []int64) (totalMsgs []*unrelationtb.MsgInfoModel, err error) {
func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*unrelationtb.MsgInfoModel, userID, conversationID string, msg *unrelationtb.MsgInfoModel) {
if msg.IsRead {
msg.Msg.IsRead = true
}
if msg.Msg.ContentType != constant.Quote {
return
}
if msg.Msg.Content == "" {
return
}
type MsgData struct {
SendID string `protobuf:"bytes,1,opt,name=sendID,proto3" json:"sendID"`
RecvID string `protobuf:"bytes,2,opt,name=recvID,proto3" json:"recvID"`
GroupID string `protobuf:"bytes,3,opt,name=groupID,proto3" json:"groupID"`
ClientMsgID string `protobuf:"bytes,4,opt,name=clientMsgID,proto3" json:"clientMsgID"`
ServerMsgID string `protobuf:"bytes,5,opt,name=serverMsgID,proto3" json:"serverMsgID"`
SenderPlatformID int32 `protobuf:"varint,6,opt,name=senderPlatformID,proto3" json:"senderPlatformID"`
SenderNickname string `protobuf:"bytes,7,opt,name=senderNickname,proto3" json:"senderNickname"`
SenderFaceURL string `protobuf:"bytes,8,opt,name=senderFaceURL,proto3" json:"senderFaceURL"`
SessionType int32 `protobuf:"varint,9,opt,name=sessionType,proto3" json:"sessionType"`
MsgFrom int32 `protobuf:"varint,10,opt,name=msgFrom,proto3" json:"msgFrom"`
ContentType int32 `protobuf:"varint,11,opt,name=contentType,proto3" json:"contentType"`
Content string `protobuf:"bytes,12,opt,name=content,proto3" json:"content"`
Seq int64 `protobuf:"varint,14,opt,name=seq,proto3" json:"seq"`
SendTime int64 `protobuf:"varint,15,opt,name=sendTime,proto3" json:"sendTime"`
CreateTime int64 `protobuf:"varint,16,opt,name=createTime,proto3" json:"createTime"`
Status int32 `protobuf:"varint,17,opt,name=status,proto3" json:"status"`
IsRead bool `protobuf:"varint,18,opt,name=isRead,proto3" json:"isRead"`
Options map[string]bool `protobuf:"bytes,19,rep,name=options,proto3" json:"options" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
OfflinePushInfo *sdkws.OfflinePushInfo `protobuf:"bytes,20,opt,name=offlinePushInfo,proto3" json:"offlinePushInfo"`
AtUserIDList []string `protobuf:"bytes,21,rep,name=atUserIDList,proto3" json:"atUserIDList"`
AttachedInfo string `protobuf:"bytes,22,opt,name=attachedInfo,proto3" json:"attachedInfo"`
Ex string `protobuf:"bytes,23,opt,name=ex,proto3" json:"ex"`
}
var quoteMsg struct {
Text string `json:"text,omitempty"`
QuoteMessage *MsgData `json:"quoteMessage,omitempty"`
MessageEntityList json.RawMessage `json:"messageEntityList,omitempty"`
}
if err := json.Unmarshal([]byte(msg.Msg.Content), &quoteMsg); err != nil {
log.ZError(ctx, "json.Unmarshal", err)
return
}
if quoteMsg.QuoteMessage == nil || quoteMsg.QuoteMessage.ContentType == constant.MsgRevokeNotification {
return
}
var msgs []*unrelationtb.MsgInfoModel
if v, ok := cache[quoteMsg.QuoteMessage.Seq]; ok {
msgs = v
} else {
if quoteMsg.QuoteMessage.Seq > 0 {
ms, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, db.msg.GetDocID(conversationID, quoteMsg.QuoteMessage.Seq), []int64{quoteMsg.QuoteMessage.Seq})
if err != nil {
log.ZError(ctx, "GetMsgBySeqIndexIn1Doc", err, "conversationID", conversationID, "seq", quoteMsg.QuoteMessage.Seq)
return
}
msgs = ms
cache[quoteMsg.QuoteMessage.Seq] = ms
}
}
if len(msgs) != 0 && msgs[0].Msg.ContentType != constant.MsgRevokeNotification {
return
}
quoteMsg.QuoteMessage.ContentType = constant.MsgRevokeNotification
if len(msgs) > 0 {
quoteMsg.QuoteMessage.Content = msgs[0].Msg.Content
} else {
quoteMsg.QuoteMessage.Content = "{}"
}
data, err := json.Marshal(&quoteMsg)
if err != nil {
log.ZError(ctx, "json.Marshal", err)
return
}
msg.Msg.Content = string(data)
if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msg.GetDocID(conversationID, msg.Msg.Seq), db.msg.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil {
log.ZError(ctx, "UpdateMsgContent", err)
}
}

func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*unrelationtb.MsgInfoModel, err error) {
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs)
if err != nil {
return nil, err
}
tempCache := make(map[int64][]*unrelationtb.MsgInfoModel)
for _, msg := range msgs {
if msg.IsRead {
msg.Msg.IsRead = true
}
db.handlerDBMsg(ctx, tempCache, userID, conversationID, msg)
}
return msgs, err
}
Expand All @@ -422,7 +509,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end)
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs)
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, seqs)
msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs)
if err != nil {
return nil, err
}
Expand Down
52 changes: 16 additions & 36 deletions pkg/common/db/controller/msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ func Test_BatchInsertChat2DB(t *testing.T) {
}

func GetDB() *commonMsgDatabase {
config.Config.Mongo.Address = []string{"192.168.44.128:37017"}
config.Config.Mongo.Address = []string{"203.56.175.233:37017"}
// config.Config.Mongo.Timeout = 60
config.Config.Mongo.Database = "openIM"
config.Config.Mongo.Database = "openIM_v3"
// config.Config.Mongo.Source = "admin"
config.Config.Mongo.Username = "root"
config.Config.Mongo.Password = "openIM123"
Expand Down Expand Up @@ -232,37 +232,17 @@ func Test_FindBySeq(t *testing.T) {
// }
//}

//func Test_Delete1(t *testing.T) {
// config.Config.Mongo.DBAddress = []string{"192.168.44.128:37017"}
// config.Config.Mongo.DBTimeout = 60
// config.Config.Mongo.DBDatabase = "openIM"
// config.Config.Mongo.DBSource = "admin"
// config.Config.Mongo.DBUserName = "root"
// config.Config.Mongo.DBPassword = "openIM123"
// config.Config.Mongo.DBMaxPoolSize = 100
// config.Config.Mongo.DBRetainChatRecords = 3650
// config.Config.Mongo.ChatRecordsClearTime = "0 2 * * 3"
//
// mongo, err := unrelation.NewMongo()
// if err != nil {
// panic(err)
// }
// err = mongo.GetDatabase().Client().Ping(context.Background(), nil)
// if err != nil {
// panic(err)
// }
//
// c := mongo.GetClient().Database("openIM").Collection("msg")
//
// var o unrelationtb.MsgDocModel
//
// err = c.FindOne(context.Background(), bson.M{"doc_id": "test:0"}).Decode(&o)
// if err != nil {
// panic(err)
// }
//
// for i, model := range o.Msg {
// fmt.Println(i, model == nil)
// }
//
//}
func TestName(t *testing.T) {
db := GetDB()
var seqs []int64
for i := int64(1); i <= 4; i++ {
seqs = append(seqs, i)
}
msgs, err := db.getMsgBySeqsRange(context.Background(), "4931176757", "si_3866692501_4931176757", seqs, seqs[0], seqs[len(seqs)-1])
if err != nil {
t.Fatal(err)
}

t.Log(msgs)

}
1 change: 0 additions & 1 deletion pkg/common/db/s3/cont/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
return nil, err
}
if md5Sum := md5.Sum([]byte(strings.Join(partHashs, partSeparator))); hex.EncodeToString(md5Sum[:]) != upload.Hash {
fmt.Println("CompleteUpload sum:", hex.EncodeToString(md5Sum[:]), "upload hash:", upload.Hash)
return nil, errors.New("md5 mismatching")
}
if info, err := c.StatObject(ctx, c.HashPath(upload.Hash)); err == nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/common/prommetrics/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package prommetrics

import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"

config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
)

func NewGrpcPromObj(cusMetrics []prometheus.Collector) (*prometheus.Registry, *grpc_prometheus.ServerMetrics, error) {
Expand Down
Loading

0 comments on commit 2ac54e0

Please sign in to comment.