Skip to content

Commit

Permalink
fix: reduce lock msg transfer (#1308)
Browse files Browse the repository at this point in the history
* fix: reduce lock msg transfer

Signed-off-by: rfyiamcool <[email protected]>

* fix: reduce lock msg transfer

Signed-off-by: rfyiamcool <[email protected]>

---------

Signed-off-by: rfyiamcool <[email protected]>
  • Loading branch information
rfyiamcool authored Nov 13, 2023
1 parent 69eb24f commit fd42c6d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 32 deletions.
3 changes: 1 addition & 2 deletions internal/msgtransfer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ import (
"net/http"
"sync"

"github.com/OpenIMSDK/tools/mw"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/OpenIMSDK/tools/mw"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
Expand Down
73 changes: 43 additions & 30 deletions internal/msgtransfer/online_history_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,49 +427,62 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
break
}
}
rwLock := new(sync.RWMutex)
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
t := time.NewTicker(time.Millisecond * 100)

split := 1000
rwLock := new(sync.RWMutex)
messages := make([]*sarama.ConsumerMessage, 0, 1000)
ticker := time.NewTicker(time.Millisecond * 100)

go func() {
for {
select {
case <-t.C:
if len(cMsg) > 0 {
rwLock.Lock()
ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
for _, v := range cMsg {
ccMsg = append(ccMsg, v)
}
cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
rwLock.Unlock()
split := 1000
ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg))
for i := 0; i < len(ccMsg)/split; i++ {
// log.Debug()
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
ctx: ctx, cMsgList: ccMsg[i*split : (i+1)*split],
}}
}
if (len(ccMsg) % split) > 0 {
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):],
}}
}
log.ZDebug(ctx, "timer trigger msg consumer end", "length", len(ccMsg))
case <-ticker.C:
if len(messages) == 0 {
continue
}

rwLock.Lock()
buffer := make([]*sarama.ConsumerMessage, 0, len(messages))
buffer = append(buffer, messages...)

// reuse slice, set cap to 0
messages = messages[:0]
rwLock.Unlock()

start := time.Now()
ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer))
for i := 0; i < len(buffer)/split; i++ {
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
ctx: ctx, cMsgList: buffer[i*split : (i+1)*split],
}}
}
if (len(buffer) % split) > 0 {
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
ctx: ctx, cMsgList: buffer[split*(len(buffer)/split):],
}}
}

log.ZDebug(ctx, "timer trigger msg consumer end",
"length", len(buffer), "time_cost", time.Since(start),
)
}
}
}()

for msg := range claim.Messages() {
rwLock.Lock()
if len(msg.Value) != 0 {
cMsg = append(cMsg, msg)
if len(msg.Value) == 0 {
continue
}

rwLock.Lock()
messages = append(messages, msg)
rwLock.Unlock()

sess.MarkMessage(msg, "")
}

return nil
}

0 comments on commit fd42c6d

Please sign in to comment.