diff --git a/apiserver/xdsserverv3/server_test.go b/apiserver/xdsserverv3/server_test.go index 4fac5a515..0dff498a2 100644 --- a/apiserver/xdsserverv3/server_test.go +++ b/apiserver/xdsserverv3/server_test.go @@ -274,7 +274,7 @@ func TestNodeHashID(t *testing.T) { }, }, }, - TargetID: "sidecar~default/" + TLSModeStrict, + TargetID: "default/" + TLSModeStrict, }, { Node: &core.Node{ @@ -289,7 +289,7 @@ func TestNodeHashID(t *testing.T) { }, }, }, - TargetID: "sidecar~polaris/" + TLSModePermissive, + TargetID: "polaris/" + TLSModePermissive, }, { Node: &core.Node{ @@ -304,7 +304,7 @@ func TestNodeHashID(t *testing.T) { }, }, }, - TargetID: "sidecar~default", + TargetID: "default", }, // bad case: wrong tls mode { @@ -320,14 +320,14 @@ func TestNodeHashID(t *testing.T) { }, }, }, - TargetID: "sidecar~default", + TargetID: "default", }, // no node metadata { Node: &core.Node{ Id: "default/9b9f5630-81a1-47cd-a558-036eb616dc71~172.17.1.1", }, - TargetID: "sidecar~default", + TargetID: "default", }, // metadata does not contain tls mode kv { @@ -343,7 +343,7 @@ func TestNodeHashID(t *testing.T) { }, }, }, - TargetID: "sidecar~default", + TargetID: "default", }, { Node: &core.Node{ diff --git a/maintain/job/clean_deleted_instance.go b/maintain/job/clean_deleted_instance.go index e1809c684..332bf5d06 100644 --- a/maintain/job/clean_deleted_instance.go +++ b/maintain/job/clean_deleted_instance.go @@ -18,21 +18,49 @@ package job import ( + "time" + + "github.com/mitchellh/mapstructure" + "github.com/polarismesh/polaris/store" ) +type CleanDeletedInstancesJobConfig struct { + InstanceCleanTimeout time.Duration `mapstructure:"instanceCleanTimeout"` +} + type cleanDeletedInstancesJob struct { + cfg *CleanDeletedInstancesJobConfig storage store.Store } func (job *cleanDeletedInstancesJob) init(raw map[string]interface{}) error { + cfg := &CleanDeletedInstancesJobConfig{ + InstanceCleanTimeout: 10 * time.Minute, + } + decodeConfig := &mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + Result: cfg, + } + decoder, err := mapstructure.NewDecoder(decodeConfig) + if err != nil { + log.Errorf("[Maintain][Job][CleanDeletedInstances] new config decoder err: %v", err) + return err + } + err = decoder.Decode(raw) + if err != nil { + log.Errorf("[Maintain][Job][CleanDeletedInstances] parse config err: %v", err) + return err + } + job.cfg = cfg + return nil } func (job *cleanDeletedInstancesJob) execute() { batchSize := uint32(100) for { - count, err := job.storage.BatchCleanDeletedInstances(batchSize) + count, err := job.storage.BatchCleanDeletedInstances(job.cfg.InstanceCleanTimeout, batchSize) if err != nil { log.Errorf("[Maintain][Job][CleanDeletedInstances] batch clean deleted instance, err: %v", err) break @@ -46,5 +74,9 @@ func (job *cleanDeletedInstancesJob) execute() { } } +func (job *cleanDeletedInstancesJob) interval() time.Duration { + return job.cfg.InstanceCleanTimeout +} + func (job *cleanDeletedInstancesJob) clear() { } diff --git a/maintain/job/delete_empty_service.go b/maintain/job/delete_empty_service.go index d91da1d64..bf9af1ff1 100644 --- a/maintain/job/delete_empty_service.go +++ b/maintain/job/delete_empty_service.go @@ -44,19 +44,21 @@ type deleteEmptyAutoCreatedServiceJob struct { } func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) error { - cfg := &DeleteEmptyAutoCreatedServiceJobConfig{} + cfg := &DeleteEmptyAutoCreatedServiceJobConfig{ + ServiceDeleteTimeout: 30 * time.Minute, + } decodeConfig := &mapstructure.DecoderConfig{ DecodeHook: mapstructure.StringToTimeDurationHookFunc(), Result: cfg, } decoder, err := mapstructure.NewDecoder(decodeConfig) if err != nil { - log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] new config decoder err: %v", err) + log.Errorf("[Maintain][Job][DeleteEmptyService] new config decoder err: %v", err) return err } err = decoder.Decode(raw) if err != nil { - log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] parse config err: %v", err) + log.Errorf("[Maintain][Job][DeleteEmptyService] parse config err: %v", err) return err } job.cfg = cfg @@ -67,10 +69,14 @@ func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) er func (job *deleteEmptyAutoCreatedServiceJob) execute() { err := job.deleteEmptyAutoCreatedServices() if err != nil { - log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] delete empty autocreated services, err: %v", err) + log.Errorf("[Maintain][Job][DeleteEmptyService] delete empty autocreated services, err: %v", err) } } +func (job *deleteEmptyAutoCreatedServiceJob) interval() time.Duration { + return job.cfg.ServiceDeleteTimeout +} + func (job *deleteEmptyAutoCreatedServiceJob) clear() { job.emptyServices = map[string]time.Time{} } @@ -86,10 +92,6 @@ func (job *deleteEmptyAutoCreatedServiceJob) getAllEmptyAutoCreatedServices() [] if svc.IsAlias() { return true, nil } - v, ok := svc.Meta[service.MetadataInternalAutoCreated] - if !ok || v != "true" { - return true, nil - } count := job.cacheMgn.Instance().GetInstancesCountByServiceID(svc.ID) if count.TotalInstanceCount == 0 { res = append(res, svc) @@ -132,17 +134,17 @@ func (job *deleteEmptyAutoCreatedServiceJob) deleteEmptyAutoCreatedServices() er ctx, err := buildContext(job.storage) if err != nil { - log.Errorf("[Maintain][Job][DeleteUnHealthyInstance] build conetxt, err: %v", err) + log.Errorf("[Maintain][Job][DeleteEmptyService] build conetxt, err: %v", err) return err } resp := job.namingServer.DeleteServices(ctx, convertDeleteServiceRequest(emptyServices[i:j])) if api.CalcCode(resp) != 200 { - log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] delete services err, code: %d, info: %s", + log.Errorf("[Maintain][Job][DeleteEmptyService] delete services err, code: %d, info: %s", resp.Code.GetValue(), resp.Info.GetValue()) } } - log.Infof("[Maintain][Job][DeleteEmptyAutoCreatedService] delete empty auto-created services count %d", + log.Infof("[Maintain][Job][DeleteEmptyService] delete empty auto-created services count %d", len(emptyServices)) return nil } diff --git a/maintain/job/delete_unhealthy_instance.go b/maintain/job/delete_unhealthy_instance.go index c93c778a7..9093d823c 100644 --- a/maintain/job/delete_unhealthy_instance.go +++ b/maintain/job/delete_unhealthy_instance.go @@ -40,7 +40,9 @@ type deleteUnHealthyInstanceJob struct { } func (job *deleteUnHealthyInstanceJob) init(raw map[string]interface{}) error { - cfg := &DeleteUnHealthyInstanceJobConfig{} + cfg := &DeleteUnHealthyInstanceJobConfig{ + InstanceDeleteTimeout: 60 * time.Minute, + } decodeConfig := &mapstructure.DecoderConfig{ DecodeHook: mapstructure.StringToTimeDurationHookFunc(), Result: cfg, @@ -56,10 +58,13 @@ func (job *deleteUnHealthyInstanceJob) init(raw map[string]interface{}) error { return err } job.cfg = cfg - return nil } +func (job *deleteUnHealthyInstanceJob) interval() time.Duration { + return job.cfg.InstanceDeleteTimeout +} + func (job *deleteUnHealthyInstanceJob) execute() { batchSize := uint32(100) var count int = 0 diff --git a/maintain/job/job.go b/maintain/job/job.go index 948b32b71..3318700bd 100644 --- a/maintain/job/job.go +++ b/maintain/job/job.go @@ -20,6 +20,7 @@ package job import ( "context" "fmt" + "os" "time" "github.com/polarismesh/polaris/cache" @@ -83,12 +84,7 @@ func (mj *MaintainJobs) StartMaintianJobs(configs []JobConfig) error { log.Errorf("[Maintain][Job][%s] start leader election err: %v", cfg.Name, err) return err } - dur, err := time.ParseDuration(cfg.Interval) - if err != nil { - log.Errorf("[Maintain][Job][%s] parse job exec interval err: %v", cfg.Name, err) - return err - } - runAdminJob(ctx, cfg.Name, dur, job, mj.storage) + runAdminJob(ctx, cfg.Name, job.interval(), job, mj.storage) mj.startedJobs[cfg.Name] = job } return nil @@ -131,10 +127,15 @@ type maintainJob interface { init(cfg map[string]interface{}) error execute() clear() + interval() time.Duration } func getMasterAccountToken(storage store.Store) (string, error) { - user, err := storage.GetUserByName("polaris", "") + mainUser := os.Getenv("POLARIS_MAIN_USER") + if mainUser == "" { + mainUser = "polaris" + } + user, err := storage.GetUserByName(mainUser, "") if err != nil { return "", err } diff --git a/maintain/maintain.go b/maintain/maintain.go index e6742d21f..f74e7b21d 100644 --- a/maintain/maintain.go +++ b/maintain/maintain.go @@ -164,7 +164,7 @@ func (s *Server) CleanInstance(ctx context.Context, req *apiservice.Instance) *a } func (s *Server) BatchCleanInstances(ctx context.Context, batchSize uint32) (uint32, error) { - return s.storage.BatchCleanDeletedInstances(batchSize) + return s.storage.BatchCleanDeletedInstances(10*time.Minute, batchSize) } func (s *Server) GetLastHeartbeat(_ context.Context, req *apiservice.Instance) *apiservice.Response { diff --git a/release/cluster/helm/templates/config-polaris-server.yaml b/release/cluster/helm/templates/config-polaris-server.yaml index 24059982c..f7b6abe3c 100644 --- a/release/cluster/helm/templates/config-polaris-server.yaml +++ b/release/cluster/helm/templates/config-polaris-server.yaml @@ -408,15 +408,15 @@ data: option: remote-conf: false # 是否使用远程配置 ip-limit: # ip级限流,全局 - open: true # 系统是否开启ip级限流 + open: false # 系统是否开启ip级限流 global: - open: true + open: false bucket: 300 # 最高峰值 rate: 200 # 平均一个IP每秒的请求数 resource-cache-amount: 1024 # 最大缓存的IP个数 white-list: [127.0.0.1] instance-limit: - open: true + open: false global: bucket: 200 rate: 100 @@ -426,12 +426,12 @@ data: rules: - name: store-read limit: - open: true # 接口的全局配置,如果在api子项中,不配置,则该接口依据global来做限制 + open: false # 接口的全局配置,如果在api子项中,不配置,则该接口依据global来做限制 bucket: 2000 # 令牌桶最大值 rate: 1000 # 每秒产生的令牌数 - name: store-write limit: - open: true + open: false bucket: 1000 rate: 500 apis: diff --git a/release/cluster/kubernetes/02-polaris-server-config.yaml b/release/cluster/kubernetes/02-polaris-server-config.yaml index 36a00336f..b521792e6 100644 --- a/release/cluster/kubernetes/02-polaris-server-config.yaml +++ b/release/cluster/kubernetes/02-polaris-server-config.yaml @@ -391,15 +391,15 @@ data: option: remote-conf: false # 是否使用远程配置 ip-limit: # ip级限流,全局 - open: true # 系统是否开启ip级限流 + open: false # 系统是否开启ip级限流 global: - open: true + open: false bucket: 300 # 最高峰值 rate: 200 # 平均一个IP每秒的请求数 resource-cache-amount: 1024 # 最大缓存的IP个数 white-list: [127.0.0.1] instance-limit: - open: true + open: false global: bucket: 200 rate: 100 @@ -409,12 +409,12 @@ data: rules: - name: store-read limit: - open: true # 接口的全局配置,如果在api子项中,不配置,则该接口依据global来做限制 + open: false # 接口的全局配置,如果在api子项中,不配置,则该接口依据global来做限制 bucket: 2000 # 令牌桶最大值 rate: 1000 # 每秒产生的令牌数 - name: store-write limit: - open: true + open: false bucket: 1000 rate: 500 apis: diff --git a/release/conf/polaris-server.yaml b/release/conf/polaris-server.yaml index c040b5b1b..55aa9b7a8 100644 --- a/release/conf/polaris-server.yaml +++ b/release/conf/polaris-server.yaml @@ -395,22 +395,21 @@ maintain: # Clean up long term unhealthy instance - name: DeleteUnHealthyInstance enable: false - # job exec interval. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". - interval: 24h option: + # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". instanceDeleteTimeout: 60m # Delete auto-created service without an instance - name: DeleteEmptyAutoCreatedService enable: false - # job exec interval. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". - interval: 1h option: + # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". serviceDeleteTimeout: 30m # Clean soft deleted instances - name: CleanDeletedInstances enable: true - # job exec interval. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". - interval: 24h + option: + # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + serviceDeleteTimeout: 30m # Storage configuration store: @@ -466,15 +465,15 @@ plugin: option: remote-conf: false # Whether to use remote configuration ip-limit: # IP -level current, global - open: true # Whether the system opens IP -level current limit + open: false # Whether the system opens IP -level current limit global: - open: true + open: false bucket: 300 # Maximum peak rate: 200 # The average number of requests per second of IP resource-cache-amount: 1024 # Number of IP of the maximum cache white-list: [127.0.0.1] instance-limit: - open: true + open: false global: bucket: 200 rate: 100 @@ -484,12 +483,12 @@ plugin: rules: - name: store-read limit: - open: true # The global configuration of the interface, if in the API sub -item, is not configured, the interface will be limited according to Global + open: false # The global configuration of the interface, if in the API sub -item, is not configured, the interface will be limited according to Global bucket: 2000 # The maximum value of token barrels rate: 1000 # The number of token generated per second - name: store-write limit: - open: true + open: false bucket: 1000 rate: 500 apis: diff --git a/release/standalone/docker-compose/server/polaris-server.yaml b/release/standalone/docker-compose/server/polaris-server.yaml index 780e9caf2..ae04a5082 100644 --- a/release/standalone/docker-compose/server/polaris-server.yaml +++ b/release/standalone/docker-compose/server/polaris-server.yaml @@ -396,15 +396,15 @@ plugin: option: remote-conf: false # 是否使用远程配置 ip-limit: # ip级限流,全局 - open: true # 系统是否开启ip级限流 + open: false # 系统是否开启ip级限流 global: - open: true + open: false bucket: 300 # 最高峰值 rate: 200 # 平均一个IP每秒的请求数 resource-cache-amount: 1024 # 最大缓存的IP个数 white-list: [ 127.0.0.1 ] instance-limit: - open: true + open: false global: bucket: 200 rate: 100 @@ -414,12 +414,12 @@ plugin: rules: - name: store-read limit: - open: true # 接口的全局配置,如果在api子项中,不配置,则该接口依据global来做限制 + open: false # 接口的全局配置,如果在api子项中,不配置,则该接口依据global来做限制 bucket: 2000 # 令牌桶最大值 rate: 1000 # 每秒产生的令牌数 - name: store-write limit: - open: true + open: false bucket: 1000 rate: 500 apis: diff --git a/store/boltdb/maintain.go b/store/boltdb/maintain.go index 9596b8c93..2e28818ee 100644 --- a/store/boltdb/maintain.go +++ b/store/boltdb/maintain.go @@ -102,15 +102,28 @@ func (m *maintainStore) ReleaseLeaderElection(key string) error { } // BatchCleanDeletedInstances -func (m *maintainStore) BatchCleanDeletedInstances(batchSize uint32) (uint32, error) { - fields := []string{insFieldValid} +func (m *maintainStore) BatchCleanDeletedInstances(timeout time.Duration, batchSize uint32) (uint32, error) { + mtime := time.Now().Add(-timeout) + fields := []string{insFieldValid, insFieldModifyTime} values, err := m.handler.LoadValuesByFilter(tblNameInstance, fields, &model.Instance{}, func(m map[string]interface{}) bool { valid, ok := m[insFieldValid] - if ok && !valid.(bool) { - return true + if !ok { + return false } - return false + if valid.(bool) { + return false + } + + modifyTime, ok := m[insFieldModifyTime] + if !ok { + return false + } + if modifyTime.(time.Time).After(mtime) { + return false + } + + return true }) if err != nil { return 0, err diff --git a/store/boltdb/maintain_test.go b/store/boltdb/maintain_test.go index 88916deb8..9e014a3cc 100644 --- a/store/boltdb/maintain_test.go +++ b/store/boltdb/maintain_test.go @@ -98,7 +98,7 @@ func TestMaintainStore_BatchCleanDeletedInstances(t *testing.T) { t.Fatal(err) } - count, err := store.BatchCleanDeletedInstances(2) + count, err := store.BatchCleanDeletedInstances(0, 2) if err != nil { t.Fatal(err) } diff --git a/store/maintain_api.go b/store/maintain_api.go index a86afa9a1..c7c7359fe 100644 --- a/store/maintain_api.go +++ b/store/maintain_api.go @@ -42,7 +42,7 @@ type MaintainStore interface { ReleaseLeaderElection(key string) error // BatchCleanDeletedInstances batch clean soft deleted instances - BatchCleanDeletedInstances(batchSize uint32) (uint32, error) + BatchCleanDeletedInstances(timeout time.Duration, batchSize uint32) (uint32, error) // GetUnHealthyInstances get unhealthy instances which mtime time out GetUnHealthyInstances(timeout time.Duration, limit uint32) ([]string, error) diff --git a/store/mock/api_mock.go b/store/mock/api_mock.go index 27ff4c9b0..29be0c9d6 100644 --- a/store/mock/api_mock.go +++ b/store/mock/api_mock.go @@ -150,18 +150,18 @@ func (mr *MockStoreMockRecorder) BatchAddInstances(instances interface{}) *gomoc } // BatchCleanDeletedInstances mocks base method. -func (m *MockStore) BatchCleanDeletedInstances(batchSize uint32) (uint32, error) { +func (m *MockStore) BatchCleanDeletedInstances(timeout time.Duration, batchSize uint32) (uint32, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BatchCleanDeletedInstances", batchSize) + ret := m.ctrl.Call(m, "BatchCleanDeletedInstances", timeout, batchSize) ret0, _ := ret[0].(uint32) ret1, _ := ret[1].(error) return ret0, ret1 } // BatchCleanDeletedInstances indicates an expected call of BatchCleanDeletedInstances. -func (mr *MockStoreMockRecorder) BatchCleanDeletedInstances(batchSize interface{}) *gomock.Call { +func (mr *MockStoreMockRecorder) BatchCleanDeletedInstances(timeout, batchSize interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchCleanDeletedInstances", reflect.TypeOf((*MockStore)(nil).BatchCleanDeletedInstances), batchSize) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchCleanDeletedInstances", reflect.TypeOf((*MockStore)(nil).BatchCleanDeletedInstances), timeout, batchSize) } // BatchDeleteClients mocks base method. diff --git a/store/mysql/maintain.go b/store/mysql/maintain.go index 613107476..cd5bc9343 100644 --- a/store/mysql/maintain.go +++ b/store/mysql/maintain.go @@ -419,12 +419,13 @@ func (m *maintainStore) ReleaseLeaderElection(key string) error { } // BatchCleanDeletedInstances batch clean soft deleted instances -func (m *maintainStore) BatchCleanDeletedInstances(batchSize uint32) (uint32, error) { +func (m *maintainStore) BatchCleanDeletedInstances(timeout time.Duration, batchSize uint32) (uint32, error) { log.Infof("[Store][database] batch clean soft deleted instances(%d)", batchSize) var rows int64 err := m.master.processWithTransaction("batchCleanDeletedInstances", func(tx *BaseTx) error { - mainStr := "delete from instance where flag = 1 limit ?" - result, err := tx.Exec(mainStr, batchSize) + mainStr := "delete from instance where flag = 1 and " + + "mtime <= FROM_UNIXTIME(UNIX_TIMESTAMP(SYSDATE()) - ?) limit ?" + result, err := tx.Exec(mainStr, int32(timeout.Seconds()), batchSize) if err != nil { log.Errorf("[Store][database] batch clean soft deleted instances(%d), err: %s", batchSize, err.Error()) return store.Error(err)