Skip to content

Commit

Permalink
Refactor/admin job (#1055)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Apr 3, 2023
1 parent d8bcc3e commit ad61b67
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 29 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ require (
require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/polarismesh/specification v1.2.1
github.com/robfig/cron/v3 v3.0.1
)

replace gopkg.in/yaml.v2 => gopkg.in/yaml.v2 v2.2.2
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polarismesh/go-restful-openapi/v2 v2.0.0-20220928152401-083908d10219 h1:XnFyNUWnciM6zgXaz6tm+Egs35rhoD0KGMmKh4gCdi0=
github.com/polarismesh/go-restful-openapi/v2 v2.0.0-20220928152401-083908d10219/go.mod h1:4WhwBysTom9Eoy0hQ4W69I0FmO+T0EpjEW9/5sgHoUk=
github.com/polarismesh/specification v1.2.1-alpha.1 h1:0JC+lPssydCRJtUg/mQmwcqBz3SJ3+IegJtf1nHyT54=
github.com/polarismesh/specification v1.2.1-alpha.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
github.com/polarismesh/specification v1.2.1 h1:NcFoinO+aSWIOIyKTAhvEbjaPvZAAm5/DwJAL2FGdXs=
github.com/polarismesh/specification v1.2.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
Expand All @@ -348,8 +346,6 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down
2 changes: 1 addition & 1 deletion maintain/job/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ package job
type JobConfig struct {
Name string `yaml:"name"`
Enable bool `yaml:"enable"`
CronSpec string `yaml:"cronSpec"`
Interval string `yaml:"interval"`
Option map[string]interface{} `yaml:"option"`
}
45 changes: 25 additions & 20 deletions maintain/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ package job
import (
"context"
"fmt"

"github.com/robfig/cron/v3"
"time"

"github.com/polarismesh/polaris/cache"
commonlog "github.com/polarismesh/polaris/common/log"
Expand All @@ -36,8 +35,8 @@ var log = commonlog.GetScopeOrDefaultByName(commonlog.DefaultLoggerName)
type MaintainJobs struct {
jobs map[string]maintainJob
startedJobs map[string]maintainJob
scheduler *cron.Cron
storage store.Store
cancel context.CancelFunc
}

// NewMaintainJobs
Expand All @@ -53,13 +52,14 @@ func NewMaintainJobs(namingServer service.DiscoverServer, cacheMgn *cache.CacheM
storage: storage},
},
startedJobs: map[string]maintainJob{},
scheduler: newCron(),
storage: storage,
}
}

// StartMaintainJobs
func (mj *MaintainJobs) StartMaintianJobs(configs []JobConfig) error {
ctx, cancel := context.WithCancel(context.Background())
mj.cancel = cancel
for _, cfg := range configs {
if !cfg.Enable {
log.Infof("[Maintain][Job] job (%s) not enable", cfg.Name)
Expand All @@ -83,33 +83,27 @@ func (mj *MaintainJobs) StartMaintianJobs(configs []JobConfig) error {
log.Errorf("[Maintain][Job][%s] start leader election err: %v", cfg.Name, err)
return err
}
_, err = mj.scheduler.AddFunc(cfg.CronSpec, newCronCmd(cfg.Name, job, mj.storage))
dur, err := time.ParseDuration(cfg.Interval)
if err != nil {
log.Errorf("[Maintain][Job] job (%s) fail to start, err: %v", cfg.Name, err)
return fmt.Errorf("[Maintain][Job] job (%s) fail to start", cfg.Name)
log.Errorf("[Maintain][Job][%s] parse job exec interval err: %v", cfg.Name, err)
return err
}
runAdminJob(ctx, cfg.Name, dur, job, mj.storage)
mj.startedJobs[cfg.Name] = job
}
mj.scheduler.Start()
return nil
}

// StopMaintainJobs
func (mj *MaintainJobs) StopMaintainJobs() {
ctx := mj.scheduler.Stop()
<-ctx.Done()
if mj.cancel != nil {
mj.cancel()
}
mj.startedJobs = map[string]maintainJob{}
}

func newCron() *cron.Cron {
return cron.New(cron.WithChain(
cron.Recover(cron.DefaultLogger)),
cron.WithParser(cron.NewParser(
cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor)))
}

func newCronCmd(name string, job maintainJob, storage store.Store) func() {
return func() {
func runAdminJob(ctx context.Context, name string, interval time.Duration, job maintainJob, storage store.Store) {
f := func() {
if !storage.IsLeader(store.ElectionKeyMaintainJobPrefix + name) {
log.Infof("[Maintain][Job][%s] I am follower", name)
job.clear()
Expand All @@ -118,8 +112,19 @@ func newCronCmd(name string, job maintainJob, storage store.Store) func() {
log.Infof("[Maintain][Job][%s] I am leader, job start", name)
job.execute()
log.Infof("[Maintain][Job][%s] I am leader, job end", name)

}

ticker := time.NewTicker(interval)
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
f()
}
}
}(ctx)
}

type maintainJob interface {
Expand Down
9 changes: 6 additions & 3 deletions release/conf/polaris-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -395,19 +395,22 @@ maintain:
# Clean up long term unhealthy instance
- name: DeleteUnHealthyInstance
enable: false
cronSpec: "0 0 * * ?"
# job exec interval. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
interval: 24h
option:
instanceDeleteTimeout: 60m
# Delete auto-created service without an instance
- name: DeleteEmptyAutoCreatedService
enable: false
cronSpec: "*/10 * * * ?"
# job exec interval. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
interval: 1h
option:
serviceDeleteTimeout: 30m
# Clean soft deleted instances
- name: CleanDeletedInstances
enable: true
cronSpec: "0 0 * * 1"
# job exec interval. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
interval: 24h

# Storage configuration
store:
Expand Down

0 comments on commit ad61b67

Please sign in to comment.