Skip to content

Commit 55424d8

Browse files
authored
Merge pull request #1002 from andrewshan/release-v1.15.0
fix: 解决eureka replicate因串行出现的超时性能问题(cherrypick 1.15)
2 parents e61e1ce + 248df93 commit 55424d8

File tree

6 files changed

+152
-71
lines changed

6 files changed

+152
-71
lines changed

apiserver/eurekaserver/access_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func TestCreateInstance(t *testing.T) {
158158
startPort := 8900
159159
host := "127.0.1.1"
160160
total := 10
161-
instances := batchBuildInstances(appId, "127.0.1.1", 8900, &LeaseInfo{
161+
instances := batchBuildInstances(appId, host, startPort, &LeaseInfo{
162162
RenewalIntervalInSecs: 30,
163163
DurationInSecs: 120,
164164
}, total)

apiserver/eurekaserver/replicate.go

+26-26
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"context"
2222
"net/http"
2323
"strconv"
24+
"sync"
25+
"sync/atomic"
2426
"time"
2527

2628
"github.com/emicklei/go-restful/v3"
@@ -86,17 +88,31 @@ func (h *EurekaServer) BatchReplication(req *restful.Request, rsp *restful.Respo
8688

8789
func (h *EurekaServer) doBatchReplicate(
8890
replicateRequest *ReplicationList, token string) (*ReplicationListResponse, uint32) {
89-
batchResponse := &ReplicationListResponse{ResponseList: []*ReplicationInstanceResponse{}}
91+
batchResponse := &ReplicationListResponse{}
9092
var resultCode = api.ExecuteSuccess
91-
for _, instanceInfo := range replicateRequest.ReplicationList {
92-
resp, code := h.dispatch(instanceInfo, token)
93-
if code != api.ExecuteSuccess {
94-
resultCode = code
95-
log.Warnf("[EUREKA-SERVER] fail to process replicate instance request, code is %d, action %s, instance %s, app %s",
96-
code, instanceInfo.Action, instanceInfo.Id, instanceInfo.AppName)
97-
}
98-
batchResponse.ResponseList = append(batchResponse.ResponseList, resp)
93+
itemCount := len(replicateRequest.ReplicationList)
94+
if itemCount == 0 {
95+
return batchResponse, resultCode
96+
}
97+
batchResponse.ResponseList = make([]*ReplicationInstanceResponse, itemCount)
98+
wg := &sync.WaitGroup{}
99+
wg.Add(itemCount)
100+
mutex := &sync.Mutex{}
101+
for i, inst := range replicateRequest.ReplicationList {
102+
go func(idx int, instanceInfo *ReplicationInstance) {
103+
defer wg.Done()
104+
resp, code := h.dispatch(instanceInfo, token)
105+
if code != api.ExecuteSuccess {
106+
atomic.CompareAndSwapUint32(&resultCode, api.ExecuteSuccess, code)
107+
log.Warnf("[EUREKA-SERVER] fail to process replicate instance request, code is %d, action %s, instance %s, app %s",
108+
code, instanceInfo.Action, instanceInfo.Id, instanceInfo.AppName)
109+
}
110+
mutex.Lock()
111+
batchResponse.ResponseList[idx] = resp
112+
mutex.Unlock()
113+
}(i, inst)
99114
}
115+
wg.Wait()
100116
return batchResponse, resultCode
101117
}
102118

@@ -213,7 +229,7 @@ func (h *EurekaServer) handleInstanceEvent(ctx context.Context, i interface{}) e
213229
appName := formatReadName(e.Service)
214230
curTimeMilli := time.Now().UnixMilli()
215231
switch e.EType {
216-
case model.EventInstanceOnline:
232+
case model.EventInstanceOnline, model.EventInstanceUpdate:
217233
instanceInfo := eventToInstance(&e, appName, curTimeMilli)
218234
h.replicateWorker.AddReplicateTask(&ReplicationInstance{
219235
AppName: appName,
@@ -242,22 +258,6 @@ func (h *EurekaServer) handleInstanceEvent(ctx context.Context, i interface{}) e
242258
rInstance.OverriddenStatus = StatusOutOfService
243259
}
244260
h.replicateWorker.AddReplicateTask(rInstance)
245-
case model.EventInstanceTurnHealth:
246-
h.replicateWorker.AddReplicateTask(&ReplicationInstance{
247-
AppName: appName,
248-
Id: e.Id,
249-
LastDirtyTimestamp: curTimeMilli,
250-
Status: StatusUp,
251-
Action: actionStatusUpdate,
252-
})
253-
case model.EventInstanceTurnUnHealth:
254-
h.replicateWorker.AddReplicateTask(&ReplicationInstance{
255-
AppName: appName,
256-
Id: e.Id,
257-
LastDirtyTimestamp: curTimeMilli,
258-
Status: StatusDown,
259-
Action: actionStatusUpdate,
260-
})
261261
case model.EventInstanceOpenIsolate:
262262
h.replicateWorker.AddReplicateTask(&ReplicationInstance{
263263
AppName: appName,
+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Tencent is pleased to support the open source community by making Polaris available.
3+
*
4+
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package eurekaserver
19+
20+
import (
21+
"testing"
22+
"time"
23+
24+
"github.com/stretchr/testify/assert"
25+
26+
api "github.com/polarismesh/polaris/common/api/v1"
27+
testsuit "github.com/polarismesh/polaris/test/suit"
28+
)
29+
30+
func TestDispatchHeartbeat(t *testing.T) {
31+
discoverSuit := &testsuit.DiscoverTestSuit{}
32+
if err := discoverSuit.Initialize(); err != nil {
33+
t.Fatal(err)
34+
}
35+
defer discoverSuit.Destroy()
36+
37+
options := map[string]interface{}{optionRefreshInterval: 5, optionDeltaExpireInterval: 120}
38+
eurekaSrv, err := createEurekaServerForTest(discoverSuit, options)
39+
assert.Nil(t, err)
40+
eurekaSrv.worker = NewApplicationsWorker(eurekaSrv.refreshInterval, eurekaSrv.deltaExpireInterval,
41+
eurekaSrv.enableSelfPreservation, eurekaSrv.namingServer, eurekaSrv.healthCheckServer, eurekaSrv.namespace)
42+
43+
appId := "TESTAPP"
44+
startPort := 8900
45+
host := "127.0.1.1"
46+
total := 30
47+
instances := batchBuildInstances(appId, host, startPort, &LeaseInfo{
48+
RenewalIntervalInSecs: 30,
49+
DurationInSecs: 120,
50+
}, total)
51+
52+
var replicateInstances = &ReplicationList{}
53+
54+
for i, instance := range instances {
55+
log.Infof("replicate test: register %d", i)
56+
replicateInstances.ReplicationList = append(replicateInstances.ReplicationList, &ReplicationInstance{
57+
AppName: appId,
58+
Id: instance.InstanceId,
59+
InstanceInfo: instance,
60+
Action: actionRegister,
61+
})
62+
}
63+
_, code := eurekaSrv.doBatchReplicate(replicateInstances, "")
64+
assert.Equal(t, api.ExecuteSuccess, code)
65+
66+
time.Sleep(10 * time.Second)
67+
for i := 0; i < 5; i++ {
68+
log.Infof("replicate test: heartbeat %d", i)
69+
replicateInstances = &ReplicationList{}
70+
for _, instance := range instances {
71+
replicateInstances.ReplicationList = append(replicateInstances.ReplicationList, &ReplicationInstance{
72+
AppName: appId,
73+
Id: instance.InstanceId,
74+
Action: actionHeartbeat,
75+
})
76+
}
77+
_, code := eurekaSrv.doBatchReplicate(replicateInstances, "")
78+
assert.Equal(t, api.ExecuteSuccess, code)
79+
}
80+
}

apiserver/eurekaserver/server.go

+16-10
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ type EurekaServer struct {
142142
enableSelfPreservation bool
143143
replicateWorker *ReplicateWorker
144144
eventHandlerHandler *EurekaInstanceEventHandler
145+
146+
replicatePeers []string
145147
}
146148

147149
// GetPort 获取端口
@@ -192,30 +194,25 @@ func (h *EurekaServer) Initialize(ctx context.Context, option map[string]interfa
192194
replicatePeers = append(replicatePeers, replicatePeerObj.(string))
193195
}
194196
}
195-
if len(replicatePeers) > 0 {
196-
h.replicateWorker = NewReplicateWorker(ctx, replicatePeers)
197-
h.eventHandlerHandler = &EurekaInstanceEventHandler{
198-
BaseInstanceEventHandler: service.NewBaseInstanceEventHandler(h.namingServer), svr: h}
199-
if err := eventhub.Subscribe(
200-
eventhub.InstanceEventTopic, "eureka-replication", h.eventHandlerHandler); nil != err {
201-
return err
202-
}
197+
h.replicatePeers = replicatePeers
198+
if len(h.replicatePeers) > 0 {
199+
h.replicateWorker = NewReplicateWorker(ctx, h.replicatePeers)
203200
}
204201
}
205202

206203
var refreshInterval int
207204
if value, ok := option[optionRefreshInterval]; ok {
208205
refreshInterval = value.(int)
209206
}
210-
if refreshInterval == 0 {
207+
if refreshInterval <= 0 {
211208
refreshInterval = DefaultRefreshInterval
212209
}
213210

214211
var deltaExpireInterval int
215212
if value, ok := option[optionDeltaExpireInterval]; ok {
216213
deltaExpireInterval = value.(int)
217214
}
218-
if deltaExpireInterval == 0 {
215+
if deltaExpireInterval <= 0 {
219216
deltaExpireInterval = DefaultDetailExpireInterval
220217
}
221218

@@ -283,6 +280,15 @@ func (h *EurekaServer) Run(errCh chan error) {
283280
errCh <- err
284281
return
285282
}
283+
if len(h.replicatePeers) > 0 {
284+
h.eventHandlerHandler = &EurekaInstanceEventHandler{
285+
BaseInstanceEventHandler: service.NewBaseInstanceEventHandler(h.namingServer), svr: h}
286+
if err = eventhub.Subscribe(
287+
eventhub.InstanceEventTopic, "eureka-replication", h.eventHandlerHandler); nil != err {
288+
errCh <- err
289+
return
290+
}
291+
}
286292
h.worker = NewApplicationsWorker(h.refreshInterval, h.deltaExpireInterval, h.enableSelfPreservation,
287293
h.namingServer, h.healthCheckServer, h.namespace)
288294
h.statis = plugin.GetStatis()

common/model/instance_event.go

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ const (
4545
EventInstanceOffline InstanceEventType = "InstanceOffline"
4646
// EventInstanceSendHeartbeat Instance send heartbeat package to server
4747
EventInstanceSendHeartbeat InstanceEventType = "InstanceSendHeartbeat"
48+
// EventInstanceUpdate Instance metadata and info update event
49+
EventInstanceUpdate InstanceEventType = "InstanceUpdate"
4850
)
4951

5052
// CtxEventKeyMetadata 用于将metadata从Context中传入并取出

service/instance.go

+27-34
Original file line numberDiff line numberDiff line change
@@ -429,11 +429,10 @@ func (s *Server) UpdateInstance(ctx context.Context, req *apiservice.Instance) *
429429
platformID := utils.ParsePlatformID(ctx)
430430
log.Info(fmt.Sprintf("old instance: %+v", instance), utils.ZapRequestID(requestID), utils.ZapPlatformID(platformID))
431431

432-
// 比对下更新前后的 isolate 状态
433-
eventTypes := diffInstanceEvent(req, instance)
434-
432+
var eventTypes map[model.InstanceEventType]bool
433+
var needUpdate bool
435434
// 存储层操作
436-
if needUpdate := s.updateInstanceAttribute(req, instance); !needUpdate {
435+
if needUpdate, eventTypes = s.updateInstanceAttribute(req, instance); !needUpdate {
437436
log.Info("update instance no data change, no need update",
438437
utils.ZapRequestID(requestID), utils.ZapPlatformID(platformID), zap.String("instance", req.String()))
439438
return api.NewInstanceResponse(apimodel.Code_NoNeedUpdate, req)
@@ -449,13 +448,13 @@ func (s *Server) UpdateInstance(ctx context.Context, req *apiservice.Instance) *
449448
log.Info(msg, utils.ZapRequestID(requestID), utils.ZapPlatformID(platformID))
450449
s.RecordHistory(ctx, instanceRecordEntry(ctx, req, service, instance, model.OUpdate))
451450

452-
for i := range eventTypes {
451+
for eventType := range eventTypes {
453452
event := &model.InstanceEvent{
454453
Id: instance.ID(),
455454
Namespace: service.Namespace,
456455
Service: service.Name,
457456
Instance: instance.Proto,
458-
EType: eventTypes[i],
457+
EType: eventType,
459458
CreateTime: time.Time{},
460459
}
461460
event.InjectMetadata(ctx)
@@ -604,71 +603,86 @@ func (s *Server) getInstancesMainByService(ctx context.Context, req *apiservice.
604603
/**
605604
* @brief 修改服务属性
606605
*/
607-
func (s *Server) updateInstanceAttribute(req *apiservice.Instance, instance *model.Instance) bool {
606+
func (s *Server) updateInstanceAttribute(
607+
req *apiservice.Instance, instance *model.Instance) (bool, map[model.InstanceEventType]bool) {
608608
// #lizard forgives
609609
instance.MallocProto()
610610
needUpdate := false
611611
insProto := instance.Proto
612+
var updateEvents = make(map[model.InstanceEventType]bool)
612613
if ok := instanceMetaNeedUpdate(req.GetMetadata(), instance.Metadata()); ok {
613614
insProto.Metadata = req.GetMetadata()
614615
needUpdate = true
616+
updateEvents[model.EventInstanceUpdate] = true
615617
}
616618

617619
if ok := instanceLocationNeedUpdate(req.GetLocation(), instance.Proto.GetLocation()); ok {
618620
insProto.Location = req.Location
619621
needUpdate = true
622+
updateEvents[model.EventInstanceUpdate] = true
620623
}
621624

622-
// if !needUpdate {
623-
// // 不需要更新metadata,则置空
624-
// insProto.Metadata = nil
625-
// }
626-
627625
if req.GetProtocol() != nil && req.GetProtocol().GetValue() != instance.Protocol() {
628626
insProto.Protocol = req.GetProtocol()
629627
needUpdate = true
628+
updateEvents[model.EventInstanceUpdate] = true
630629
}
631630

632631
if req.GetVersion() != nil && req.GetVersion().GetValue() != instance.Version() {
633632
insProto.Version = req.GetVersion()
634633
needUpdate = true
634+
updateEvents[model.EventInstanceUpdate] = true
635635
}
636636

637637
if req.GetPriority() != nil && req.GetPriority().GetValue() != instance.Priority() {
638638
insProto.Priority = req.GetPriority()
639639
needUpdate = true
640+
updateEvents[model.EventInstanceUpdate] = true
640641
}
641642

642643
if req.GetWeight() != nil && req.GetWeight().GetValue() != instance.Weight() {
643644
insProto.Weight = req.GetWeight()
644645
needUpdate = true
646+
updateEvents[model.EventInstanceUpdate] = true
645647
}
646648

647649
if req.GetHealthy() != nil && req.GetHealthy().GetValue() != instance.Healthy() {
648650
insProto.Healthy = req.GetHealthy()
649651
needUpdate = true
652+
if req.Healthy.GetValue() {
653+
updateEvents[model.EventInstanceTurnHealth] = true
654+
} else {
655+
updateEvents[model.EventInstanceTurnUnHealth] = true
656+
}
650657
}
651658

652659
if req.GetIsolate() != nil && req.GetIsolate().GetValue() != instance.Isolate() {
653660
insProto.Isolate = req.GetIsolate()
654661
needUpdate = true
662+
if req.Isolate.GetValue() {
663+
updateEvents[model.EventInstanceOpenIsolate] = true
664+
} else {
665+
updateEvents[model.EventInstanceCloseIsolate] = true
666+
}
655667
}
656668

657669
if req.GetLogicSet() != nil && req.GetLogicSet().GetValue() != instance.LogicSet() {
658670
insProto.LogicSet = req.GetLogicSet()
659671
needUpdate = true
672+
updateEvents[model.EventInstanceUpdate] = true
660673
}
661674

662675
if ok := updateHealthCheck(req, instance); ok {
663676
needUpdate = true
677+
updateEvents[model.EventInstanceUpdate] = true
664678
}
665679

666680
// 每次更改,都要生成一个新的uuid
667681
if needUpdate {
668682
insProto.Revision = utils.NewStringValue(utils.NewUUID())
669683
}
670684

671-
return needUpdate
685+
return needUpdate, updateEvents
672686
}
673687

674688
func instanceLocationNeedUpdate(req *apimodel.Location, old *apimodel.Location) bool {
@@ -1369,24 +1383,3 @@ func CheckDbInstanceFieldLen(req *apiservice.Instance) (*apiservice.Response, bo
13691383
}
13701384
return nil, false
13711385
}
1372-
1373-
func diffInstanceEvent(req *apiservice.Instance, save *model.Instance) []model.InstanceEventType {
1374-
eventTypes := make([]model.InstanceEventType, 0)
1375-
if req.Isolate != nil && save.Isolate() != req.Isolate.GetValue() {
1376-
if req.Isolate.GetValue() {
1377-
eventTypes = append(eventTypes, model.EventInstanceOpenIsolate)
1378-
} else {
1379-
eventTypes = append(eventTypes, model.EventInstanceCloseIsolate)
1380-
}
1381-
}
1382-
1383-
if req.Healthy != nil && save.Healthy() != req.Healthy.GetValue() {
1384-
if req.Healthy.GetValue() {
1385-
eventTypes = append(eventTypes, model.EventInstanceTurnHealth)
1386-
} else {
1387-
eventTypes = append(eventTypes, model.EventInstanceTurnUnHealth)
1388-
}
1389-
}
1390-
1391-
return eventTypes
1392-
}

0 commit comments

Comments
 (0)