Skip to content

Commit

Permalink
fix:修复北极星摘流操作无法同步到eureka (#1269)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Oct 15, 2023
1 parent 6a55daa commit 289a5bd
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 11 deletions.
10 changes: 6 additions & 4 deletions apiserver/eurekaserver/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,15 +436,16 @@ func (h *EurekaServer) UpdateStatus(req *restful.Request, rsp *restful.Response)
writeHeader(http.StatusOK, rsp)
return
}
code := h.updateStatus(context.Background(), namespace, appId, instId, status, false)
ctx := context.WithValue(context.Background(), sourceFromEureka{}, true)
code := h.updateStatus(ctx, namespace, appId, instId, status, false)
writePolarisStatusCode(req, code)
if code == api.ExecuteSuccess || code == api.NoNeedUpdate {
eurekalog.Infof("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been updated successfully",
eurekalog.Infof("[EUREKA-SERVER] instance (namespace=%s, instId=%s, appId=%s) has been updated successfully",
namespace, instId, appId)
writeHeader(http.StatusOK, rsp)
return
}
eurekalog.Errorf("[EUREKA-SERVER]instance ((namespace=%s, instId=%s, appId=%s) has been updated failed, "+
eurekalog.Errorf("[EUREKA-SERVER] instance (namespace=%s, instId=%s, appId=%s) has been updated failed, "+
"code is %d",
namespace, instId, appId, code)
if code == api.NotFoundResource {
Expand Down Expand Up @@ -480,7 +481,8 @@ func (h *EurekaServer) DeleteStatus(req *restful.Request, rsp *restful.Response)
"client: %s,namespace=%s, instId=%s, appId=%s",
remoteAddr, namespace, instId, appId)

code := h.updateStatus(context.Background(), namespace, appId, instId, StatusUp, false)
ctx := context.WithValue(context.Background(), sourceFromEureka{}, true)
code := h.updateStatus(ctx, namespace, appId, instId, StatusUp, false)
writePolarisStatusCode(req, code)
if code == api.ExecuteSuccess {
eurekalog.Infof("[EUREKA-SERVER]instance status (namespace=%s, instId=%s, appId=%s) "+
Expand Down
218 changes: 218 additions & 0 deletions apiserver/eurekaserver/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@ import (
"bytes"
"context"
"encoding/json"
"encoding/xml"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"testing"
"time"
"unsafe"

"github.com/emicklei/go-restful/v3"
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
"github.com/polarismesh/specification/source/go/api/v1/service_manage"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/wrapperspb"

api "github.com/polarismesh/polaris/common/api/v1"
testsuit "github.com/polarismesh/polaris/test/suit"
Expand All @@ -37,12 +45,15 @@ func createEurekaServerForTest(
discoverSuit *testsuit.DiscoverTestSuit, options map[string]interface{}) (*EurekaServer, error) {
eurekaSrv := &EurekaServer{
namingServer: discoverSuit.DiscoverServer(),
originDiscoverSvr: discoverSuit.OriginDiscoverServer(),
healthCheckServer: discoverSuit.HealthCheckServer(),
allowAsyncRegis: false,
}
err := eurekaSrv.Initialize(context.Background(), options, nil)
if err != nil {
return nil, err
}
eurekaSrv.registerInstanceChain()
return eurekaSrv, nil
}

Expand Down Expand Up @@ -198,3 +209,210 @@ func TestCreateInstance(t *testing.T) {
assert.Nil(t, err)
checkInstanceAction(t, deltaAppResp.Applications, appId, instanceId, ActionDeleted)
}

// Test_EurekaWrite .
func Test_EurekaWrite(t *testing.T) {
discoverSuit := &testsuit.DiscoverTestSuit{}
if err := discoverSuit.Initialize(); err != nil {
t.Fatal(err)
}
defer discoverSuit.Destroy()

options := map[string]interface{}{optionRefreshInterval: 5, optionDeltaExpireInterval: 120}
eurekaSrv, err := createEurekaServerForTest(discoverSuit, options)
assert.Nil(t, err)

mockIns := genMockEurekaInstance()

t.Run("RegisterInstance", func(t *testing.T) {
// pretty output must be created and written explicitly
output, err := xml.MarshalIndent(mockIns, " ", " ")
assert.NoError(t, err)

var body bytes.Buffer
_, err = body.Write([]byte(xml.Header))
assert.NoError(t, err)
_, err = body.Write(output)
assert.NoError(t, err)

mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s", mockIns.AppName), &body)
mockReq.Header.Add(restful.HEADER_Accept, restful.MIME_XML)
mockReq.Header.Add(restful.HEADER_ContentType, restful.MIME_XML)
mockRsp := newMockResponseWriter()

restfulReq := restful.NewRequest(mockReq)
injectRestfulReqPathParameters(t, restfulReq, map[string]string{
ParamAppId: mockIns.AppName,
})
// 这里是异步注册
eurekaSrv.RegisterApplication(restfulReq, restful.NewResponse(mockRsp))
assert.Equal(t, http.StatusNoContent, mockRsp.statusCode)
assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess))

time.Sleep(5 * time.Second)
saveIns, err := eurekaSrv.originDiscoverSvr.Cache().GetStore().GetInstance(mockIns.InstanceId)
assert.NoError(t, err)
assert.NotNil(t, saveIns)
})

t.Run("UpdateStatus", func(t *testing.T) {
t.Run("StatusUnknown", func(t *testing.T) {
mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s/%s/status",
mockIns.AppName, mockIns.InstanceId), nil)
mockReq.PostForm = url.Values{}
mockReq.PostForm.Add(ParamValue, StatusUnknown)
mockRsp := newMockResponseWriter()

restfulReq := restful.NewRequest(mockReq)
injectRestfulReqPathParameters(t, restfulReq, map[string]string{
ParamAppId: mockIns.AppName,
ParamInstId: mockIns.InstanceId,
})
eurekaSrv.UpdateStatus(restfulReq, restful.NewResponse(mockRsp))
assert.Equal(t, http.StatusOK, mockRsp.statusCode)
assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess))

//
saveIns, err := discoverSuit.Storage.GetInstance(mockIns.InstanceId)
assert.NoError(t, err)
assert.False(t, saveIns.Isolate())
})

t.Run("StatusDown", func(t *testing.T) {
mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s/%s/status",
mockIns.AppName, mockIns.InstanceId), nil)
mockReq.PostForm = url.Values{}
mockReq.PostForm.Add(ParamValue, StatusDown)
mockRsp := newMockResponseWriter()

restfulReq := restful.NewRequest(mockReq)
injectRestfulReqPathParameters(t, restfulReq, map[string]string{
ParamAppId: mockIns.AppName,
ParamInstId: mockIns.InstanceId,
})
eurekaSrv.UpdateStatus(restfulReq, restful.NewResponse(mockRsp))
assert.Equal(t, http.StatusOK, mockRsp.statusCode)
assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess), fmt.Sprintf("%d", restfulReq.Attribute(statusCodeHeader)))

//
saveIns, err := discoverSuit.Storage.GetInstance(mockIns.InstanceId)
assert.NoError(t, err)
assert.True(t, saveIns.Isolate())
assert.Equal(t, StatusDown, saveIns.Proto.Metadata[InternalMetadataStatus])
})

t.Run("StatusUp", func(t *testing.T) {
mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s/%s/status",
mockIns.AppName, mockIns.InstanceId), nil)
mockReq.PostForm = url.Values{}
mockReq.PostForm.Add(ParamValue, StatusUp)
mockRsp := newMockResponseWriter()

restfulReq := restful.NewRequest(mockReq)
injectRestfulReqPathParameters(t, restfulReq, map[string]string{
ParamAppId: mockIns.AppName,
ParamInstId: mockIns.InstanceId,
})
eurekaSrv.UpdateStatus(restfulReq, restful.NewResponse(mockRsp))
assert.Equal(t, http.StatusOK, mockRsp.statusCode)
assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess), fmt.Sprintf("%d", restfulReq.Attribute(statusCodeHeader)))

//
saveIns, err := discoverSuit.Storage.GetInstance(mockIns.InstanceId)
assert.NoError(t, err)
assert.False(t, saveIns.Isolate())
assert.Equal(t, StatusUp, saveIns.Proto.Metadata[InternalMetadataStatus])
})

t.Run("Polaris_UpdateInstances", func(t *testing.T) {
defer func() {
rsp := discoverSuit.OriginDiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{
{
Id: wrapperspb.String(mockIns.InstanceId),
Isolate: wrapperspb.Bool(false),
},
})
assert.Equal(t, apimodel.Code_ExecuteSuccess, apimodel.Code(rsp.GetCode().GetValue()))
}()
rsp := discoverSuit.OriginDiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{
{
Id: wrapperspb.String(mockIns.InstanceId),
Isolate: wrapperspb.Bool(true),
},
})
assert.Equal(t, apimodel.Code_ExecuteSuccess, apimodel.Code(rsp.GetCode().GetValue()))

// 在获取一次
saveIns, err := discoverSuit.Storage.GetInstance(mockIns.InstanceId)
assert.NoError(t, err)
assert.True(t, saveIns.Isolate())
assert.Equal(t, StatusOutOfService, saveIns.Proto.Metadata[InternalMetadataStatus])
})

t.Run("Polaris_UpdateInstancesIsolate", func(t *testing.T) {
rsp := discoverSuit.OriginDiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{
{
Id: wrapperspb.String(mockIns.InstanceId),
Isolate: wrapperspb.Bool(true),
},
})
assert.Equal(t, apimodel.Code_ExecuteSuccess, apimodel.Code(rsp.GetCode().GetValue()))

// 在获取一次
_, saveInss, err := discoverSuit.Storage.GetExpandInstances(map[string]string{
"id": mockIns.InstanceId,
}, map[string]string{}, 0, 10)
assert.NoError(t, err)
assert.Equal(t, 1, len(saveInss))
assert.True(t, saveInss[0].Isolate())
assert.Equal(t, StatusOutOfService, saveInss[0].Proto.Metadata[InternalMetadataStatus])
})
})
}

func injectRestfulReqPathParameters(t *testing.T, req *restful.Request, params map[string]string) {
v := reflect.ValueOf(req)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}

field := v.FieldByName("pathParameters")
fieldVal := GetUnexportedField(field)

pathParameters, ok := fieldVal.(map[string]string)
assert.True(t, ok)
for k, v := range params {
pathParameters[k] = v
}
SetUnexportedField(field, params)
}

func genMockEurekaInstance() *InstanceInfo {
mockIns := &InstanceInfo{
XMLName: struct{}{},
InstanceId: "123",
AppName: "MOCK_SERVICE",
AppGroupName: "MOCK_SERVICE",
IpAddr: "127.0.0.1",
Sid: "",
Port: &PortWrapper{
Port: "8080",
RealPort: 8080,
Enabled: "true",
RealEnable: true,
},
Status: StatusUp,
OverriddenStatus: StatusUnknown,
}
return mockIns
}

func SetUnexportedField(field reflect.Value, value interface{}) {
reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).
Elem().
Set(reflect.ValueOf(value))
}

func GetUnexportedField(field reflect.Value) interface{} {
return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface()
}
10 changes: 6 additions & 4 deletions apiserver/eurekaserver/applications.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,13 @@ func buildHashCode(version string, hashBuilder map[string]int, newApps *Applicat

func parseStatus(instance *apiservice.Instance) string {
if instance.GetIsolate().GetValue() {
status, ok := instance.Metadata[InternalMetadataStatus]
if ok {
return status
status := instance.Metadata[InternalMetadataStatus]
switch status {
case StatusDown:
return StatusDown
default:
return StatusOutOfService
}
return StatusOutOfService
}
return StatusUp
}
Expand Down
76 changes: 76 additions & 0 deletions apiserver/eurekaserver/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package eurekaserver

import (
"context"

"go.uber.org/zap"

"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/service"
"github.com/polarismesh/polaris/store"
)

type (
sourceFromEureka struct{}
)

func (h *EurekaServer) registerInstanceChain() {
svr := h.originDiscoverSvr.(*service.Server)
svr.AddInstanceChain(&EurekaInstanceChain{
s: h.namingServer.Cache().GetStore(),
})
}

type EurekaInstanceChain struct {
s store.Store
}

func (c *EurekaInstanceChain) AfterUpdate(ctx context.Context, instances ...*model.Instance) {
isFromEureka, _ := ctx.Value(sourceFromEureka{}).(bool)
if isFromEureka {
return
}

// TODO:这里要注意避免 eureka -> polaris -> notify -> eureka 带来的重复操作,后续会在 context 中携带信息做判断处理
for i := range instances {
ins := instances[i]
metadata := ins.Proto.GetMetadata()
if _, ok := metadata[InternalMetadataStatus]; !ok {
continue
}
if ins.Isolate() {
metadata[InternalMetadataStatus] = StatusOutOfService
} else {
metadata[InternalMetadataStatus] = StatusUp
}
if err := c.s.BatchAppendInstanceMetadata([]*store.InstanceMetadataRequest{
{
InstanceID: ins.ID(),
Revision: utils.NewUUID(),
Metadata: map[string]string{
InternalMetadataStatus: metadata[InternalMetadataStatus],
},
},
}); err != nil {
eurekalog.Error("[EUREKA-SERVER] after update instance isolate fail", zap.Error(err))
}
}
}
4 changes: 4 additions & 0 deletions apiserver/eurekaserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ type EurekaServer struct {
replicatePeers map[string][]string
generateUniqueInstId bool
subCtxs []*eventhub.SubscribtionContext

allowAsyncRegis bool
}

// GetPort 获取端口
Expand Down Expand Up @@ -180,6 +182,7 @@ func (h *EurekaServer) Initialize(ctx context.Context, option map[string]interfa
h.option = option
h.openAPI = api
h.subCtxs = make([]*eventhub.SubscribtionContext, 0, 4)
h.allowAsyncRegis = true

var namespace = DefaultNamespace
if namespaceValue, ok := option[optionNamespace]; ok {
Expand Down Expand Up @@ -350,6 +353,7 @@ func (h *EurekaServer) Run(errCh chan error) {
}
h.subCtxs = append(h.subCtxs, subCtx)
}
h.registerInstanceChain()
h.workers = NewApplicationsWorkers(h.refreshInterval, h.deltaExpireInterval, h.enableSelfPreservation,
h.namingServer, h.healthCheckServer, h.namespace)
h.statis = plugin.GetStatis()
Expand Down
Loading

0 comments on commit 289a5bd

Please sign in to comment.