Skip to content

Commit

Permalink
[ISSUE #1113] 修复清理软删除实例可能导致缓存更新不正确的问题 (#1138)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored May 26, 2023
1 parent f43e43c commit 4fc36b6
Show file tree
Hide file tree
Showing 15 changed files with 121 additions and 68 deletions.
12 changes: 6 additions & 6 deletions apiserver/xdsserverv3/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func TestNodeHashID(t *testing.T) {
},
},
},
TargetID: "sidecar~default/" + TLSModeStrict,
TargetID: "default/" + TLSModeStrict,
},
{
Node: &core.Node{
Expand All @@ -289,7 +289,7 @@ func TestNodeHashID(t *testing.T) {
},
},
},
TargetID: "sidecar~polaris/" + TLSModePermissive,
TargetID: "polaris/" + TLSModePermissive,
},
{
Node: &core.Node{
Expand All @@ -304,7 +304,7 @@ func TestNodeHashID(t *testing.T) {
},
},
},
TargetID: "sidecar~default",
TargetID: "default",
},
// bad case: wrong tls mode
{
Expand All @@ -320,14 +320,14 @@ func TestNodeHashID(t *testing.T) {
},
},
},
TargetID: "sidecar~default",
TargetID: "default",
},
// no node metadata
{
Node: &core.Node{
Id: "default/9b9f5630-81a1-47cd-a558-036eb616dc71~172.17.1.1",
},
TargetID: "sidecar~default",
TargetID: "default",
},
// metadata does not contain tls mode kv
{
Expand All @@ -343,7 +343,7 @@ func TestNodeHashID(t *testing.T) {
},
},
},
TargetID: "sidecar~default",
TargetID: "default",
},
{
Node: &core.Node{
Expand Down
34 changes: 33 additions & 1 deletion maintain/job/clean_deleted_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,49 @@
package job

import (
"time"

"github.com/mitchellh/mapstructure"

"github.com/polarismesh/polaris/store"
)

type CleanDeletedInstancesJobConfig struct {
InstanceCleanTimeout time.Duration `mapstructure:"instanceCleanTimeout"`
}

type cleanDeletedInstancesJob struct {
cfg *CleanDeletedInstancesJobConfig
storage store.Store
}

func (job *cleanDeletedInstancesJob) init(raw map[string]interface{}) error {
cfg := &CleanDeletedInstancesJobConfig{
InstanceCleanTimeout: 10 * time.Minute,
}
decodeConfig := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
Result: cfg,
}
decoder, err := mapstructure.NewDecoder(decodeConfig)
if err != nil {
log.Errorf("[Maintain][Job][CleanDeletedInstances] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
log.Errorf("[Maintain][Job][CleanDeletedInstances] parse config err: %v", err)
return err
}
job.cfg = cfg

return nil
}

func (job *cleanDeletedInstancesJob) execute() {
batchSize := uint32(100)
for {
count, err := job.storage.BatchCleanDeletedInstances(batchSize)
count, err := job.storage.BatchCleanDeletedInstances(job.cfg.InstanceCleanTimeout, batchSize)
if err != nil {
log.Errorf("[Maintain][Job][CleanDeletedInstances] batch clean deleted instance, err: %v", err)
break
Expand All @@ -46,5 +74,9 @@ func (job *cleanDeletedInstancesJob) execute() {
}
}

func (job *cleanDeletedInstancesJob) interval() time.Duration {
return job.cfg.InstanceCleanTimeout
}

func (job *cleanDeletedInstancesJob) clear() {
}
24 changes: 13 additions & 11 deletions maintain/job/delete_empty_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,21 @@ type deleteEmptyAutoCreatedServiceJob struct {
}

func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) error {
cfg := &DeleteEmptyAutoCreatedServiceJobConfig{}
cfg := &DeleteEmptyAutoCreatedServiceJobConfig{
ServiceDeleteTimeout: 30 * time.Minute,
}
decodeConfig := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
Result: cfg,
}
decoder, err := mapstructure.NewDecoder(decodeConfig)
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] new config decoder err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyService] new config decoder err: %v", err)
return err
}
err = decoder.Decode(raw)
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] parse config err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyService] parse config err: %v", err)
return err
}
job.cfg = cfg
Expand All @@ -67,10 +69,14 @@ func (job *deleteEmptyAutoCreatedServiceJob) init(raw map[string]interface{}) er
func (job *deleteEmptyAutoCreatedServiceJob) execute() {
err := job.deleteEmptyAutoCreatedServices()
if err != nil {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] delete empty autocreated services, err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyService] delete empty autocreated services, err: %v", err)
}
}

func (job *deleteEmptyAutoCreatedServiceJob) interval() time.Duration {
return job.cfg.ServiceDeleteTimeout
}

func (job *deleteEmptyAutoCreatedServiceJob) clear() {
job.emptyServices = map[string]time.Time{}
}
Expand All @@ -86,10 +92,6 @@ func (job *deleteEmptyAutoCreatedServiceJob) getAllEmptyAutoCreatedServices() []
if svc.IsAlias() {
return true, nil
}
v, ok := svc.Meta[service.MetadataInternalAutoCreated]
if !ok || v != "true" {
return true, nil
}
count := job.cacheMgn.Instance().GetInstancesCountByServiceID(svc.ID)
if count.TotalInstanceCount == 0 {
res = append(res, svc)
Expand Down Expand Up @@ -132,17 +134,17 @@ func (job *deleteEmptyAutoCreatedServiceJob) deleteEmptyAutoCreatedServices() er

ctx, err := buildContext(job.storage)
if err != nil {
log.Errorf("[Maintain][Job][DeleteUnHealthyInstance] build conetxt, err: %v", err)
log.Errorf("[Maintain][Job][DeleteEmptyService] build conetxt, err: %v", err)
return err
}
resp := job.namingServer.DeleteServices(ctx, convertDeleteServiceRequest(emptyServices[i:j]))
if api.CalcCode(resp) != 200 {
log.Errorf("[Maintain][Job][DeleteEmptyAutoCreatedService] delete services err, code: %d, info: %s",
log.Errorf("[Maintain][Job][DeleteEmptyService] delete services err, code: %d, info: %s",
resp.Code.GetValue(), resp.Info.GetValue())
}
}

log.Infof("[Maintain][Job][DeleteEmptyAutoCreatedService] delete empty auto-created services count %d",
log.Infof("[Maintain][Job][DeleteEmptyService] delete empty auto-created services count %d",
len(emptyServices))
return nil
}
Expand Down
9 changes: 7 additions & 2 deletions maintain/job/delete_unhealthy_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ type deleteUnHealthyInstanceJob struct {
}

func (job *deleteUnHealthyInstanceJob) init(raw map[string]interface{}) error {
cfg := &DeleteUnHealthyInstanceJobConfig{}
cfg := &DeleteUnHealthyInstanceJobConfig{
InstanceDeleteTimeout: 60 * time.Minute,
}
decodeConfig := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
Result: cfg,
Expand All @@ -56,10 +58,13 @@ func (job *deleteUnHealthyInstanceJob) init(raw map[string]interface{}) error {
return err
}
job.cfg = cfg

return nil
}

func (job *deleteUnHealthyInstanceJob) interval() time.Duration {
return job.cfg.InstanceDeleteTimeout
}

func (job *deleteUnHealthyInstanceJob) execute() {
batchSize := uint32(100)
var count int = 0
Expand Down
15 changes: 8 additions & 7 deletions maintain/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package job
import (
"context"
"fmt"
"os"
"time"

"github.com/polarismesh/polaris/cache"
Expand Down Expand Up @@ -83,12 +84,7 @@ func (mj *MaintainJobs) StartMaintianJobs(configs []JobConfig) error {
log.Errorf("[Maintain][Job][%s] start leader election err: %v", cfg.Name, err)
return err
}
dur, err := time.ParseDuration(cfg.Interval)
if err != nil {
log.Errorf("[Maintain][Job][%s] parse job exec interval err: %v", cfg.Name, err)
return err
}
runAdminJob(ctx, cfg.Name, dur, job, mj.storage)
runAdminJob(ctx, cfg.Name, job.interval(), job, mj.storage)
mj.startedJobs[cfg.Name] = job
}
return nil
Expand Down Expand Up @@ -131,10 +127,15 @@ type maintainJob interface {
init(cfg map[string]interface{}) error
execute()
clear()
interval() time.Duration
}

func getMasterAccountToken(storage store.Store) (string, error) {
user, err := storage.GetUserByName("polaris", "")
mainUser := os.Getenv("POLARIS_MAIN_USER")
if mainUser == "" {
mainUser = "polaris"
}
user, err := storage.GetUserByName(mainUser, "")
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion maintain/maintain.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (s *Server) CleanInstance(ctx context.Context, req *apiservice.Instance) *a
}

func (s *Server) BatchCleanInstances(ctx context.Context, batchSize uint32) (uint32, error) {
return s.storage.BatchCleanDeletedInstances(batchSize)
return s.storage.BatchCleanDeletedInstances(10*time.Minute, batchSize)
}

func (s *Server) GetLastHeartbeat(_ context.Context, req *apiservice.Instance) *apiservice.Response {
Expand Down
10 changes: 5 additions & 5 deletions release/cluster/helm/templates/config-polaris-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -408,15 +408,15 @@ data:
option:
remote-conf: false # 是否使用远程配置
ip-limit: # ip级限流,全局
open: true # 系统是否开启ip级限流
open: false # 系统是否开启ip级限流
global:
open: true
open: false
bucket: 300 # 最高峰值
rate: 200 # 平均一个IP每秒的请求数
resource-cache-amount: 1024 # 最大缓存的IP个数
white-list: [127.0.0.1]
instance-limit:
open: true
open: false
global:
bucket: 200
rate: 100
Expand All @@ -426,12 +426,12 @@ data:
rules:
- name: store-read
limit:
open: true # 接口的全局配置,如果在api子项中,不配置,则该接口依据global来做限制
open: false # 接口的全局配置,如果在api子项中,不配置,则该接口依据global来做限制
bucket: 2000 # 令牌桶最大值
rate: 1000 # 每秒产生的令牌数
- name: store-write
limit:
open: true
open: false
bucket: 1000
rate: 500
apis:
Expand Down
10 changes: 5 additions & 5 deletions release/cluster/kubernetes/02-polaris-server-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,15 @@ data:
option:
remote-conf: false # 是否使用远程配置
ip-limit: # ip级限流,全局
open: true # 系统是否开启ip级限流
open: false # 系统是否开启ip级限流
global:
open: true
open: false
bucket: 300 # 最高峰值
rate: 200 # 平均一个IP每秒的请求数
resource-cache-amount: 1024 # 最大缓存的IP个数
white-list: [127.0.0.1]
instance-limit:
open: true
open: false
global:
bucket: 200
rate: 100
Expand All @@ -409,12 +409,12 @@ data:
rules:
- name: store-read
limit:
open: true # 接口的全局配置,如果在api子项中,不配置,则该接口依据global来做限制
open: false # 接口的全局配置,如果在api子项中,不配置,则该接口依据global来做限制
bucket: 2000 # 令牌桶最大值
rate: 1000 # 每秒产生的令牌数
- name: store-write
limit:
open: true
open: false
bucket: 1000
rate: 500
apis:
Expand Down
21 changes: 10 additions & 11 deletions release/conf/polaris-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -395,22 +395,21 @@ maintain:
# Clean up long term unhealthy instance
- name: DeleteUnHealthyInstance
enable: false
# job exec interval. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
interval: 24h
option:
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
instanceDeleteTimeout: 60m
# Delete auto-created service without an instance
- name: DeleteEmptyAutoCreatedService
enable: false
# job exec interval. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
interval: 1h
option:
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
serviceDeleteTimeout: 30m
# Clean soft deleted instances
- name: CleanDeletedInstances
enable: true
# job exec interval. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
interval: 24h
option:
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
serviceDeleteTimeout: 30m

# Storage configuration
store:
Expand Down Expand Up @@ -466,15 +465,15 @@ plugin:
option:
remote-conf: false # Whether to use remote configuration
ip-limit: # IP -level current, global
open: true # Whether the system opens IP -level current limit
open: false # Whether the system opens IP -level current limit
global:
open: true
open: false
bucket: 300 # Maximum peak
rate: 200 # The average number of requests per second of IP
resource-cache-amount: 1024 # Number of IP of the maximum cache
white-list: [127.0.0.1]
instance-limit:
open: true
open: false
global:
bucket: 200
rate: 100
Expand All @@ -484,12 +483,12 @@ plugin:
rules:
- name: store-read
limit:
open: true # The global configuration of the interface, if in the API sub -item, is not configured, the interface will be limited according to Global
open: false # The global configuration of the interface, if in the API sub -item, is not configured, the interface will be limited according to Global
bucket: 2000 # The maximum value of token barrels
rate: 1000 # The number of token generated per second
- name: store-write
limit:
open: true
open: false
bucket: 1000
rate: 500
apis:
Expand Down
Loading

0 comments on commit 4fc36b6

Please sign in to comment.