@@ -21,8 +21,10 @@ import (
2121 "fmt"
2222 "io"
2323 "path/filepath"
24+ "strconv"
2425 "strings"
2526 "sync"
27+ "sync/atomic"
2628 "time"
2729
2830 embedetcd "github.com/linkall-labs/embed-etcd"
@@ -73,20 +75,22 @@ func NewController(cfg Config, member embedetcd.Member) *controller {
7375}
7476
7577type controller struct {
76- cfg * Config
77- kvStore kv.Client
78- volumeMgr volume.Manager
79- eventLogMgr eventlog.Manager
80- ssMgr server.Manager
81- eventBusMap map [string ]* metadata.Eventbus
82- member embedetcd.Member
83- cancelCtx context.Context
84- cancelFunc context.CancelFunc
85- membershipMutex sync.Mutex
86- isLeader bool
87- readyNotify chan error
88- stopNotify chan error
89- mutex sync.Mutex
78+ cfg * Config
79+ kvStore kv.Client
80+ volumeMgr volume.Manager
81+ eventLogMgr eventlog.Manager
82+ ssMgr server.Manager
83+ eventBusMap map [string ]* metadata.Eventbus
84+ member embedetcd.Member
85+ cancelCtx context.Context
86+ cancelFunc context.CancelFunc
87+ membershipMutex sync.Mutex
88+ isLeader bool
89+ readyNotify chan error
90+ stopNotify chan error
91+ mutex sync.Mutex
92+ eventbusUpdatedCount int64
93+ eventbusDeletedCount int64
9094}
9195
9296func (ctrl * controller ) Start (_ context.Context ) error {
@@ -97,6 +101,7 @@ func (ctrl *controller) Start(_ context.Context) error {
97101 ctrl .kvStore = store
98102 ctrl .cancelCtx , ctrl .cancelFunc = context .WithCancel (context .Background ())
99103 go ctrl .member .RegisterMembershipChangedProcessor (ctrl .membershipChangedProcessor )
104+ go ctrl .recordMetrics ()
100105 return nil
101106}
102107
@@ -188,7 +193,7 @@ func (ctrl *controller) createEventBus(ctx context.Context,
188193 return nil , errors .ErrResourceAlreadyExist .WithMessage ("the eventbus already exist" )
189194 }
190195 for idx := 0 ; idx < eb .LogNumber ; idx ++ {
191- el , err := ctrl .eventLogMgr .AcquireEventLog (ctx , eb .ID )
196+ el , err := ctrl .eventLogMgr .AcquireEventLog (ctx , eb .ID , eb . Name )
192197 if err != nil {
193198 return nil , err
194199 }
@@ -202,7 +207,6 @@ func (ctrl *controller) createEventBus(ctx context.Context,
202207 return nil , err
203208 }
204209 }
205- metrics .EventbusGauge .Set (float64 (len (ctrl .eventBusMap )))
206210 return ctrl .getEventbus (eb .Name )
207211}
208212
@@ -231,7 +235,7 @@ func (ctrl *controller) DeleteEventBus(ctx context.Context, eb *metapb.EventBus)
231235 }(v .ID )
232236 }
233237 wg .Wait ()
234- metrics . EventbusGauge . Set ( float64 ( len ( ctrl .eventBusMap )) )
238+ atomic . AddInt64 ( & ctrl .eventbusDeletedCount , 1 )
235239 return & emptypb.Empty {}, nil
236240}
237241
@@ -271,6 +275,7 @@ func (ctrl *controller) ListEventBus(ctx context.Context, _ *emptypb.Empty) (*ct
271275
272276func (ctrl * controller ) UpdateEventBus (ctx context.Context ,
273277 req * ctrlpb.UpdateEventBusRequest ) (* metapb.EventBus , error ) {
278+ atomic .AddInt64 (& ctrl .eventbusUpdatedCount , 1 )
274279 return & metapb.EventBus {}, nil
275280}
276281
@@ -516,6 +521,33 @@ func (ctrl *controller) ReportSegmentLeader(ctx context.Context,
516521 return & emptypb.Empty {}, nil
517522}
518523
524+ func (ctrl * controller ) recordMetrics () {
525+ t := time .NewTicker (time .Second )
526+ defer t .Stop ()
527+ for {
528+ select {
529+ case <- t .C :
530+ ctrl .membershipMutex .Lock ()
531+ if ctrl .isLeader {
532+ metrics .ControllerLeaderGaugeVec .DeleteLabelValues (strconv .FormatBool (! ctrl .isLeader ))
533+ metrics .ControllerLeaderGaugeVec .WithLabelValues (strconv .FormatBool (ctrl .isLeader )).Set (1 )
534+ } else {
535+ metrics .ControllerLeaderGaugeVec .WithLabelValues (strconv .FormatBool (! ctrl .isLeader )).Set (0 )
536+ }
537+ ctrl .membershipMutex .Unlock ()
538+
539+ ctrl .mutex .Lock ()
540+ metrics .EventbusGauge .Set (float64 (len (ctrl .eventBusMap )))
541+ metrics .EventbusUpdatedGauge .Set (float64 (atomic .LoadInt64 (& ctrl .eventbusUpdatedCount )))
542+ metrics .EventbusDeletedGauge .Set (float64 (atomic .LoadInt64 (& ctrl .eventbusDeletedCount )))
543+ ctrl .mutex .Unlock ()
544+ case <- ctrl .cancelCtx .Done ():
545+ log .Info (ctrl .cancelCtx , "record leadership exiting..." , nil )
546+ return
547+ }
548+ }
549+ }
550+
519551func (ctrl * controller ) membershipChangedProcessor (ctx context.Context , event embedetcd.MembershipChangedEvent ) error {
520552 ctrl .membershipMutex .Lock ()
521553 defer ctrl .membershipMutex .Unlock ()
@@ -571,7 +603,6 @@ func (ctrl *controller) loadEventbus(ctx context.Context) error {
571603 }
572604 ctrl .eventBusMap [filepath .Base (pair .Key )] = busInfo
573605 }
574- metrics .EventbusGauge .Set (float64 (len (ctrl .eventBusMap )))
575606 return nil
576607}
577608
0 commit comments