diff --git a/apiserver/eurekaserver/access.go b/apiserver/eurekaserver/access.go index 15c6c9796..996829507 100644 --- a/apiserver/eurekaserver/access.go +++ b/apiserver/eurekaserver/access.go @@ -436,15 +436,16 @@ func (h *EurekaServer) UpdateStatus(req *restful.Request, rsp *restful.Response) writeHeader(http.StatusOK, rsp) return } - code := h.updateStatus(context.Background(), namespace, appId, instId, status, false) + ctx := context.WithValue(context.Background(), sourceFromEureka{}, true) + code := h.updateStatus(ctx, namespace, appId, instId, status, false) writePolarisStatusCode(req, code) if code == api.ExecuteSuccess || code == api.NoNeedUpdate { - eurekalog.Infof("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been updated successfully", + eurekalog.Infof("[EUREKA-SERVER] instance (namespace=%s, instId=%s, appId=%s) has been updated successfully", namespace, instId, appId) writeHeader(http.StatusOK, rsp) return } - eurekalog.Errorf("[EUREKA-SERVER]instance ((namespace=%s, instId=%s, appId=%s) has been updated failed, "+ + eurekalog.Errorf("[EUREKA-SERVER] instance (namespace=%s, instId=%s, appId=%s) has been updated failed, "+ "code is %d", namespace, instId, appId, code) if code == api.NotFoundResource { @@ -480,7 +481,8 @@ func (h *EurekaServer) DeleteStatus(req *restful.Request, rsp *restful.Response) "client: %s,namespace=%s, instId=%s, appId=%s", remoteAddr, namespace, instId, appId) - code := h.updateStatus(context.Background(), namespace, appId, instId, StatusUp, false) + ctx := context.WithValue(context.Background(), sourceFromEureka{}, true) + code := h.updateStatus(ctx, namespace, appId, instId, StatusUp, false) writePolarisStatusCode(req, code) if code == api.ExecuteSuccess { eurekalog.Infof("[EUREKA-SERVER]instance status (namespace=%s, instId=%s, appId=%s) "+ diff --git a/apiserver/eurekaserver/access_test.go b/apiserver/eurekaserver/access_test.go index be27624e9..cd82dfdac 100644 --- a/apiserver/eurekaserver/access_test.go +++ b/apiserver/eurekaserver/access_test.go @@ -21,13 +21,21 @@ import ( "bytes" "context" "encoding/json" + "encoding/xml" "fmt" "net/http" + "net/http/httptest" + "net/url" + "reflect" "testing" "time" + "unsafe" "github.com/emicklei/go-restful/v3" + apimodel "github.com/polarismesh/specification/source/go/api/v1/model" + "github.com/polarismesh/specification/source/go/api/v1/service_manage" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/wrapperspb" api "github.com/polarismesh/polaris/common/api/v1" testsuit "github.com/polarismesh/polaris/test/suit" @@ -37,12 +45,15 @@ func createEurekaServerForTest( discoverSuit *testsuit.DiscoverTestSuit, options map[string]interface{}) (*EurekaServer, error) { eurekaSrv := &EurekaServer{ namingServer: discoverSuit.DiscoverServer(), + originDiscoverSvr: discoverSuit.OriginDiscoverServer(), healthCheckServer: discoverSuit.HealthCheckServer(), + allowAsyncRegis: false, } err := eurekaSrv.Initialize(context.Background(), options, nil) if err != nil { return nil, err } + eurekaSrv.registerInstanceChain() return eurekaSrv, nil } @@ -198,3 +209,210 @@ func TestCreateInstance(t *testing.T) { assert.Nil(t, err) checkInstanceAction(t, deltaAppResp.Applications, appId, instanceId, ActionDeleted) } + +// Test_EurekaWrite . +func Test_EurekaWrite(t *testing.T) { + discoverSuit := &testsuit.DiscoverTestSuit{} + if err := discoverSuit.Initialize(); err != nil { + t.Fatal(err) + } + defer discoverSuit.Destroy() + + options := map[string]interface{}{optionRefreshInterval: 5, optionDeltaExpireInterval: 120} + eurekaSrv, err := createEurekaServerForTest(discoverSuit, options) + assert.Nil(t, err) + + mockIns := genMockEurekaInstance() + + t.Run("RegisterInstance", func(t *testing.T) { + // pretty output must be created and written explicitly + output, err := xml.MarshalIndent(mockIns, " ", " ") + assert.NoError(t, err) + + var body bytes.Buffer + _, err = body.Write([]byte(xml.Header)) + assert.NoError(t, err) + _, err = body.Write(output) + assert.NoError(t, err) + + mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s", mockIns.AppName), &body) + mockReq.Header.Add(restful.HEADER_Accept, restful.MIME_XML) + mockReq.Header.Add(restful.HEADER_ContentType, restful.MIME_XML) + mockRsp := newMockResponseWriter() + + restfulReq := restful.NewRequest(mockReq) + injectRestfulReqPathParameters(t, restfulReq, map[string]string{ + ParamAppId: mockIns.AppName, + }) + // 这里是异步注册 + eurekaSrv.RegisterApplication(restfulReq, restful.NewResponse(mockRsp)) + assert.Equal(t, http.StatusNoContent, mockRsp.statusCode) + assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess)) + + time.Sleep(5 * time.Second) + saveIns, err := eurekaSrv.originDiscoverSvr.Cache().GetStore().GetInstance(mockIns.InstanceId) + assert.NoError(t, err) + assert.NotNil(t, saveIns) + }) + + t.Run("UpdateStatus", func(t *testing.T) { + t.Run("StatusUnknown", func(t *testing.T) { + mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s/%s/status", + mockIns.AppName, mockIns.InstanceId), nil) + mockReq.PostForm = url.Values{} + mockReq.PostForm.Add(ParamValue, StatusUnknown) + mockRsp := newMockResponseWriter() + + restfulReq := restful.NewRequest(mockReq) + injectRestfulReqPathParameters(t, restfulReq, map[string]string{ + ParamAppId: mockIns.AppName, + ParamInstId: mockIns.InstanceId, + }) + eurekaSrv.UpdateStatus(restfulReq, restful.NewResponse(mockRsp)) + assert.Equal(t, http.StatusOK, mockRsp.statusCode) + assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess)) + + // + saveIns, err := discoverSuit.Storage.GetInstance(mockIns.InstanceId) + assert.NoError(t, err) + assert.False(t, saveIns.Isolate()) + }) + + t.Run("StatusDown", func(t *testing.T) { + mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s/%s/status", + mockIns.AppName, mockIns.InstanceId), nil) + mockReq.PostForm = url.Values{} + mockReq.PostForm.Add(ParamValue, StatusDown) + mockRsp := newMockResponseWriter() + + restfulReq := restful.NewRequest(mockReq) + injectRestfulReqPathParameters(t, restfulReq, map[string]string{ + ParamAppId: mockIns.AppName, + ParamInstId: mockIns.InstanceId, + }) + eurekaSrv.UpdateStatus(restfulReq, restful.NewResponse(mockRsp)) + assert.Equal(t, http.StatusOK, mockRsp.statusCode) + assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess), fmt.Sprintf("%d", restfulReq.Attribute(statusCodeHeader))) + + // + saveIns, err := discoverSuit.Storage.GetInstance(mockIns.InstanceId) + assert.NoError(t, err) + assert.True(t, saveIns.Isolate()) + assert.Equal(t, StatusDown, saveIns.Proto.Metadata[InternalMetadataStatus]) + }) + + t.Run("StatusUp", func(t *testing.T) { + mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s/%s/status", + mockIns.AppName, mockIns.InstanceId), nil) + mockReq.PostForm = url.Values{} + mockReq.PostForm.Add(ParamValue, StatusUp) + mockRsp := newMockResponseWriter() + + restfulReq := restful.NewRequest(mockReq) + injectRestfulReqPathParameters(t, restfulReq, map[string]string{ + ParamAppId: mockIns.AppName, + ParamInstId: mockIns.InstanceId, + }) + eurekaSrv.UpdateStatus(restfulReq, restful.NewResponse(mockRsp)) + assert.Equal(t, http.StatusOK, mockRsp.statusCode) + assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess), fmt.Sprintf("%d", restfulReq.Attribute(statusCodeHeader))) + + // + saveIns, err := discoverSuit.Storage.GetInstance(mockIns.InstanceId) + assert.NoError(t, err) + assert.False(t, saveIns.Isolate()) + assert.Equal(t, StatusUp, saveIns.Proto.Metadata[InternalMetadataStatus]) + }) + + t.Run("Polaris_UpdateInstances", func(t *testing.T) { + defer func() { + rsp := discoverSuit.OriginDiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{ + { + Id: wrapperspb.String(mockIns.InstanceId), + Isolate: wrapperspb.Bool(false), + }, + }) + assert.Equal(t, apimodel.Code_ExecuteSuccess, apimodel.Code(rsp.GetCode().GetValue())) + }() + rsp := discoverSuit.OriginDiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{ + { + Id: wrapperspb.String(mockIns.InstanceId), + Isolate: wrapperspb.Bool(true), + }, + }) + assert.Equal(t, apimodel.Code_ExecuteSuccess, apimodel.Code(rsp.GetCode().GetValue())) + + // 在获取一次 + saveIns, err := discoverSuit.Storage.GetInstance(mockIns.InstanceId) + assert.NoError(t, err) + assert.True(t, saveIns.Isolate()) + assert.Equal(t, StatusOutOfService, saveIns.Proto.Metadata[InternalMetadataStatus]) + }) + + t.Run("Polaris_UpdateInstancesIsolate", func(t *testing.T) { + rsp := discoverSuit.OriginDiscoverServer().UpdateInstances(discoverSuit.DefaultCtx, []*service_manage.Instance{ + { + Id: wrapperspb.String(mockIns.InstanceId), + Isolate: wrapperspb.Bool(true), + }, + }) + assert.Equal(t, apimodel.Code_ExecuteSuccess, apimodel.Code(rsp.GetCode().GetValue())) + + // 在获取一次 + _, saveInss, err := discoverSuit.Storage.GetExpandInstances(map[string]string{ + "id": mockIns.InstanceId, + }, map[string]string{}, 0, 10) + assert.NoError(t, err) + assert.Equal(t, 1, len(saveInss)) + assert.True(t, saveInss[0].Isolate()) + assert.Equal(t, StatusOutOfService, saveInss[0].Proto.Metadata[InternalMetadataStatus]) + }) + }) +} + +func injectRestfulReqPathParameters(t *testing.T, req *restful.Request, params map[string]string) { + v := reflect.ValueOf(req) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + + field := v.FieldByName("pathParameters") + fieldVal := GetUnexportedField(field) + + pathParameters, ok := fieldVal.(map[string]string) + assert.True(t, ok) + for k, v := range params { + pathParameters[k] = v + } + SetUnexportedField(field, params) +} + +func genMockEurekaInstance() *InstanceInfo { + mockIns := &InstanceInfo{ + XMLName: struct{}{}, + InstanceId: "123", + AppName: "MOCK_SERVICE", + AppGroupName: "MOCK_SERVICE", + IpAddr: "127.0.0.1", + Sid: "", + Port: &PortWrapper{ + Port: "8080", + RealPort: 8080, + Enabled: "true", + RealEnable: true, + }, + Status: StatusUp, + OverriddenStatus: StatusUnknown, + } + return mockIns +} + +func SetUnexportedField(field reflect.Value, value interface{}) { + reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())). + Elem(). + Set(reflect.ValueOf(value)) +} + +func GetUnexportedField(field reflect.Value) interface{} { + return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface() +} diff --git a/apiserver/eurekaserver/applications.go b/apiserver/eurekaserver/applications.go index 762f53331..acc5e715c 100644 --- a/apiserver/eurekaserver/applications.go +++ b/apiserver/eurekaserver/applications.go @@ -186,11 +186,13 @@ func buildHashCode(version string, hashBuilder map[string]int, newApps *Applicat func parseStatus(instance *apiservice.Instance) string { if instance.GetIsolate().GetValue() { - status, ok := instance.Metadata[InternalMetadataStatus] - if ok { - return status + status := instance.Metadata[InternalMetadataStatus] + switch status { + case StatusDown: + return StatusDown + default: + return StatusOutOfService } - return StatusOutOfService } return StatusUp } diff --git a/apiserver/eurekaserver/chain.go b/apiserver/eurekaserver/chain.go new file mode 100644 index 000000000..642122037 --- /dev/null +++ b/apiserver/eurekaserver/chain.go @@ -0,0 +1,76 @@ +/** + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package eurekaserver + +import ( + "context" + + "go.uber.org/zap" + + "github.com/polarismesh/polaris/common/model" + "github.com/polarismesh/polaris/common/utils" + "github.com/polarismesh/polaris/service" + "github.com/polarismesh/polaris/store" +) + +type ( + sourceFromEureka struct{} +) + +func (h *EurekaServer) registerInstanceChain() { + svr := h.originDiscoverSvr.(*service.Server) + svr.AddInstanceChain(&EurekaInstanceChain{ + s: h.namingServer.Cache().GetStore(), + }) +} + +type EurekaInstanceChain struct { + s store.Store +} + +func (c *EurekaInstanceChain) AfterUpdate(ctx context.Context, instances ...*model.Instance) { + isFromEureka, _ := ctx.Value(sourceFromEureka{}).(bool) + if isFromEureka { + return + } + + // TODO:这里要注意避免 eureka -> polaris -> notify -> eureka 带来的重复操作,后续会在 context 中携带信息做判断处理 + for i := range instances { + ins := instances[i] + metadata := ins.Proto.GetMetadata() + if _, ok := metadata[InternalMetadataStatus]; !ok { + continue + } + if ins.Isolate() { + metadata[InternalMetadataStatus] = StatusOutOfService + } else { + metadata[InternalMetadataStatus] = StatusUp + } + if err := c.s.BatchAppendInstanceMetadata([]*store.InstanceMetadataRequest{ + { + InstanceID: ins.ID(), + Revision: utils.NewUUID(), + Metadata: map[string]string{ + InternalMetadataStatus: metadata[InternalMetadataStatus], + }, + }, + }); err != nil { + eurekalog.Error("[EUREKA-SERVER] after update instance isolate fail", zap.Error(err)) + } + } +} diff --git a/apiserver/eurekaserver/server.go b/apiserver/eurekaserver/server.go index 4f22fe697..b1db5bdfd 100644 --- a/apiserver/eurekaserver/server.go +++ b/apiserver/eurekaserver/server.go @@ -152,6 +152,8 @@ type EurekaServer struct { replicatePeers map[string][]string generateUniqueInstId bool subCtxs []*eventhub.SubscribtionContext + + allowAsyncRegis bool } // GetPort 获取端口 @@ -180,6 +182,7 @@ func (h *EurekaServer) Initialize(ctx context.Context, option map[string]interfa h.option = option h.openAPI = api h.subCtxs = make([]*eventhub.SubscribtionContext, 0, 4) + h.allowAsyncRegis = true var namespace = DefaultNamespace if namespaceValue, ok := option[optionNamespace]; ok { @@ -350,6 +353,7 @@ func (h *EurekaServer) Run(errCh chan error) { } h.subCtxs = append(h.subCtxs, subCtx) } + h.registerInstanceChain() h.workers = NewApplicationsWorkers(h.refreshInterval, h.deltaExpireInterval, h.enableSelfPreservation, h.namingServer, h.healthCheckServer, h.namespace) h.statis = plugin.GetStatis() diff --git a/apiserver/eurekaserver/write.go b/apiserver/eurekaserver/write.go index f39147ad5..f1d6e5629 100644 --- a/apiserver/eurekaserver/write.go +++ b/apiserver/eurekaserver/write.go @@ -26,9 +26,11 @@ import ( "github.com/golang/protobuf/ptypes/wrappers" apimodel "github.com/polarismesh/specification/source/go/api/v1/model" apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage" + "go.uber.org/zap" api "github.com/polarismesh/polaris/common/api/v1" "github.com/polarismesh/polaris/common/model" + commonstore "github.com/polarismesh/polaris/common/store" "github.com/polarismesh/polaris/common/utils" ) @@ -205,7 +207,7 @@ func (h *EurekaServer) registerInstances( ctx context.Context, namespace string, appId string, instance *InstanceInfo, replicated bool) uint32 { ctx = context.WithValue( ctx, model.CtxEventKeyMetadata, map[string]string{MetadataReplicate: strconv.FormatBool(replicated)}) - ctx = context.WithValue(ctx, utils.ContextOpenAsyncRegis, true) + ctx = context.WithValue(ctx, utils.ContextOpenAsyncRegis, h.allowAsyncRegis) appId = formatWriteName(appId) // 1. 先转换数据结构 totalInstance := convertEurekaInstance(instance, namespace, h.namespace, appId, h.generateUniqueInstId) @@ -256,7 +258,11 @@ func (h *EurekaServer) updateStatus( saveIns, err := h.originDiscoverSvr.Cache().GetStore().GetInstance(instanceId) if err != nil { - return uint32(apimodel.Code_StoreLayerException) + eurekalog.Error("[EUREKA-SERVER] get instance from store when update status", zap.Error(err)) + return uint32(commonstore.StoreCode2APICode(err)) + } + if saveIns == nil { + return uint32(apimodel.Code_NotFoundInstance) } metadata := saveIns.Metadata() diff --git a/release/conf/i18n/zh.toml b/release/conf/i18n/zh.toml index 4bce1122b..2059355da 100644 --- a/release/conf/i18n/zh.toml +++ b/release/conf/i18n/zh.toml @@ -104,7 +104,7 @@ 400219 = "当前命名空间存在配置分组,请先删除配置分组,再删除命名空间" #NamespaceExistedConfigGroups 400301 = "服务未找到" #NotFoundService 400302 = "路由未找到" #NotFoundRouting -400303 = "示例未找到" #NotFoundInstance +400303 = "服务实例未找到" #NotFoundInstance 400304 = "服务别名未找到" #NotFoundServiceAlias 400305 = "命名空间未找到" #NotFoundNamespace 400306 = "未找到和当前服务别名关联的服务" #NotFoundSourceService diff --git a/service/instance.go b/service/instance.go index 5905f68ba..64466d02f 100644 --- a/service/instance.go +++ b/service/instance.go @@ -452,6 +452,10 @@ func (s *Server) UpdateInstance(ctx context.Context, req *apiservice.Instance) * s.sendDiscoverEvent(*event) } + for i := range s.instanceChains { + s.instanceChains[i].AfterUpdate(ctx, instance) + } + return api.NewInstanceResponse(apimodel.Code_ExecuteSuccess, req) } @@ -539,6 +543,11 @@ func (s *Server) UpdateInstanceIsolate(ctx context.Context, req *apiservice.Inst CreateTime: time.Time{}, }) } + instance.Proto.Isolate = utils.NewBoolValue(req.GetIsolate().GetValue()) + } + + for i := range s.instanceChains { + s.instanceChains[i].AfterUpdate(ctx, instances...) } return api.NewInstanceResponse(apimodel.Code_ExecuteSuccess, req) diff --git a/service/server.go b/service/server.go index 07407b67e..cbcee218d 100644 --- a/service/server.go +++ b/service/server.go @@ -57,6 +57,8 @@ type Server struct { hooks []ResourceHook subCtxs []*eventhub.SubscribtionContext + + instanceChains []InstanceChain } // HealthServer 健康检查Server @@ -79,6 +81,11 @@ func (s *Server) SetResourceHooks(hooks ...ResourceHook) { s.hooks = hooks } +// AddInstanceChain . +func (s *Server) AddInstanceChain(chains ...InstanceChain) { + s.instanceChains = append(s.instanceChains, chains...) +} + // RecordHistory server对外提供history插件的简单封装 func (s *Server) RecordHistory(ctx context.Context, entry *model.RecordEntry) { // 如果插件没有初始化,那么不记录history @@ -162,3 +169,8 @@ func (s *Server) afterServiceResource(ctx context.Context, req *apiservice.Servi return nil } + +type InstanceChain interface { + // AfterUpdate . + AfterUpdate(ctx context.Context, instances ...*model.Instance) +}