Skip to content

Commit

Permalink
chore: refactor, only get loggers from context
Browse files Browse the repository at this point in the history
  • Loading branch information
denopink committed Nov 8, 2024
1 parent b119044 commit 79be117
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 56 deletions.
37 changes: 12 additions & 25 deletions chrysom/listenerClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,12 @@ type ListenerConfig struct {
// PullInterval is how often listeners should get updates.
// (Optional). Defaults to 5 seconds.
PullInterval time.Duration

// Logger to be used by the client.
// (Optional). By default a no op logger will be used.
Logger *zap.Logger
}

// ListenerClient is the client used to poll Argus for updates.
type ListenerClient struct {
observer *observerConfig
logger *zap.Logger
setLogger func(context.Context, *zap.Logger) context.Context
reader Reader
observer *observerConfig
reader Reader
}

type observerConfig struct {
Expand All @@ -73,16 +67,10 @@ type observerConfig struct {

// NewListenerClient creates a new ListenerClient to be used to poll Argus
// for updates.
func NewListenerClient(config ListenerConfig,
setLogger func(context.Context, *zap.Logger) context.Context,
measures Measures, r Reader,
) (*ListenerClient, error) {
func NewListenerClient(config ListenerConfig, measures Measures, r Reader) (*ListenerClient, error) {
if config.Listener == nil {
return nil, ErrNoListenerProvided
}
if config.Logger == nil {
config.Logger = sallust.Default()
}
if config.PullInterval == 0 {
config.PullInterval = defaultPullInterval
}
Expand All @@ -97,27 +85,26 @@ func NewListenerClient(config ListenerConfig,
measures: measures,
shutdown: make(chan struct{}),
},
logger: config.Logger,
setLogger: setLogger,
reader: r,
reader: r,
}, nil
}

// Start begins listening for updates on an interval given that client configuration
// is setup correctly. If a listener process is already in progress, calling Start()
// is a NoOp. If you want to restart the current listener process, call Stop() first.
func (c *ListenerClient) Start(ctx context.Context) error {
logger := sallust.Get(ctx)
if c.observer == nil || c.observer.listener == nil {
c.logger.Warn("No listener was setup to receive updates.")
logger.Warn("No listener was setup to receive updates.")
return nil
}
if c.observer.ticker == nil {
c.logger.Error("Observer ticker is nil", zap.Error(ErrUndefinedIntervalTicker))
logger.Error("Observer ticker is nil", zap.Error(ErrUndefinedIntervalTicker))
return ErrUndefinedIntervalTicker
}

if !atomic.CompareAndSwapInt32(&c.observer.state, stopped, transitioning) {
c.logger.Error("Start called when a listener was not in stopped state", zap.Error(ErrListenerNotStopped))
logger.Error("Start called when a listener was not in stopped state", zap.Error(ErrListenerNotStopped))
return ErrListenerNotStopped
}

Expand All @@ -129,13 +116,12 @@ func (c *ListenerClient) Start(ctx context.Context) error {
return
case <-c.observer.ticker.C:
outcome := SuccessOutcome
ctx := c.setLogger(context.Background(), c.logger)
items, err := c.reader.GetItems(ctx, "")
if err == nil {
c.observer.listener.Update(items)
} else {
outcome = FailureOutcome
c.logger.Error("Failed to get items for listeners", zap.Error(err))
logger.Error("Failed to get items for listeners", zap.Error(err))
}
c.observer.measures.PollsTotalCounter.With(prometheus.Labels{
OutcomeLabel: outcome}).Add(1)
Expand All @@ -155,9 +141,10 @@ func (c *ListenerClient) Stop(ctx context.Context) error {
return nil
}

logger := sallust.Get(ctx)
if !atomic.CompareAndSwapInt32(&c.observer.state, running, transitioning) {
c.logger.Error("Stop called when a listener was not in running state", zap.Error(ErrListenerNotStopped))
return ErrListenerNotRunning
logger.Error("Stop called when a listener was not in running state", zap.Error(ErrListenerNotStopped))
return errors.Join(ErrListenerNotStopped, ErrListenerNotRunning)
}

c.observer.ticker.Stop()
Expand Down
9 changes: 3 additions & 6 deletions chrysom/listenerClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/xmidt-org/sallust"
)

var (
Expand All @@ -35,7 +34,6 @@ var (
happyListenerConfig = ListenerConfig{
Listener: mockListener,
PullInterval: time.Second,
Logger: sallust.Default(),
}
)

Expand All @@ -59,7 +57,7 @@ func TestListenerStartStopPairsParallel(t *testing.T) {
time.Sleep(time.Millisecond * 400)
errStop := client.Stop(context.Background())
if errStop != nil {
assert.Equal(ErrListenerNotRunning, errStop)
assert.ErrorIs(errStop, ErrListenerNotRunning)
}
fmt.Printf("%d: Done\n", testNumber)
})
Expand Down Expand Up @@ -111,12 +109,11 @@ func newStartStopClient(includeListener bool) (*ListenerClient, func(), error) {

config := ListenerConfig{
PullInterval: time.Millisecond * 200,
Logger: sallust.Default(),
}
if includeListener {
config.Listener = mockListener
}
client, err := NewListenerClient(config, sallust.With, mockMeasures, &BasicClient{})
client, err := NewListenerClient(config, mockMeasures, &BasicClient{})
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -153,7 +150,7 @@ func TestNewListenerClient(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
assert := assert.New(t)
_, err := NewListenerClient(tc.config, sallust.With, tc.measures, tc.reader)
_, err := NewListenerClient(tc.config, tc.measures, tc.reader)
assert.True(errors.Is(err, tc.expectedErr),
fmt.Errorf("error [%v] doesn't contain error [%v] in its err chain",
err, tc.expectedErr),
Expand Down
24 changes: 3 additions & 21 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"time"

"github.com/xmidt-org/ancla/chrysom"
"github.com/xmidt-org/sallust"
"go.uber.org/fx"
"go.uber.org/zap"
)

const errFmt = "%w: %v"
Expand Down Expand Up @@ -42,11 +40,6 @@ type Service interface {
type Config struct {
BasicClientConfig chrysom.BasicClientConfig

// Logger for this package.
// Gets passed to Argus config before initializing the client.
// (Optional). Defaults to a no op logger.
Logger *zap.Logger

// JWTParserType establishes which parser type will be used by the JWT token
// acquirer used by Argus. Options include 'simple' and 'raw'.
// Simple: parser assumes token payloads have the following structure: https://github.com/xmidt-org/bascule/blob/c011b128d6b95fa8358228535c63d1945347adaa/acquire/bearer.go#L77
Expand All @@ -68,23 +61,18 @@ type Config struct {

type ClientService struct {
argus chrysom.PushReader
logger *zap.Logger
config Config
now func() time.Time
}

// NewService builds the Argus client service from the given configuration.
func NewService(cfg Config) (*ClientService, error) {
if cfg.Logger == nil {
cfg.Logger = sallust.Default()
}
prepArgusBasicClientConfig(&cfg)
basic, err := chrysom.NewBasicClient(cfg.BasicClientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create chrysom basic client: %v", err)
}
svc := &ClientService{
logger: cfg.Logger,
argus: basic,
config: cfg,
now: time.Now,
Expand All @@ -95,12 +83,9 @@ func NewService(cfg Config) (*ClientService, error) {
// StartListener builds the Argus listener client service from the given configuration.
// It allows adding watchers for the internal subscription state. Call the returned
// function when you are done watching for updates.
func (s *ClientService) StartListener(cfg chrysom.ListenerConfig, setLogger func(context.Context, *zap.Logger) context.Context, metrics chrysom.Measures, watches ...Watch) (func(), error) {
if cfg.Logger == nil {
cfg.Logger = sallust.Default()
}
func (s *ClientService) StartListener(cfg chrysom.ListenerConfig, metrics chrysom.Measures, watches ...Watch) (func(), error) {
prepArgusListenerConfig(&cfg, metrics, watches...)
listener, err := chrysom.NewListenerClient(cfg, setLogger, metrics, s.argus)
listener, err := chrysom.NewListenerClient(cfg, metrics, s.argus)
if err != nil {
return nil, fmt.Errorf("failed to create chrysom listener client: %v", err)
}
Expand Down Expand Up @@ -157,12 +142,10 @@ func prepArgusBasicClientConfig(cfg *Config) error {
}

func prepArgusListenerConfig(cfg *chrysom.ListenerConfig, metrics chrysom.Measures, watches ...Watch) {
logger := cfg.Logger
watches = append(watches, webhookListSizeWatch(metrics.WebhookListSizeGauge))
cfg.Listener = chrysom.ListenerFunc(func(items chrysom.Items) {
iws, err := ItemsToInternalWebhooks(items)
if err != nil {
logger.Error("Failed to convert items to webhooks", zap.Error(err))
return
}
for _, watch := range watches {
Expand Down Expand Up @@ -195,7 +178,6 @@ type ListenerIn struct {
fx.In

Measures chrysom.Measures
Logger *zap.Logger
Svc *ClientService
listenerConfig chrysom.ListenerConfig
Watcher Watch
Expand All @@ -206,7 +188,7 @@ func ProvideListener() fx.Option {
return fx.Options(
fx.Provide(
func(in ListenerIn) (err error) {
stopWatches, err := in.Svc.StartListener(in.listenerConfig, sallust.With, in.Measures, in.Watcher)
stopWatches, err := in.Svc.StartListener(in.listenerConfig, in.Measures, in.Watcher)
if err != nil {
return fmt.Errorf("webhook service start listener error: %v", err)
}
Expand Down
5 changes: 1 addition & 4 deletions service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/xmidt-org/ancla/chrysom"
"github.com/xmidt-org/ancla/model"
"github.com/xmidt-org/sallust"
)

func TestNewService(t *testing.T) {
Expand Down Expand Up @@ -77,7 +76,7 @@ func TestStartListener(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
assert := assert.New(t)
_, err := tc.svc.StartListener(tc.listenerConfig, sallust.With, chrysom.Measures{})
_, err := tc.svc.StartListener(tc.listenerConfig, chrysom.Measures{})
if tc.expectedErr {
assert.NotNil(err)
return
Expand Down Expand Up @@ -135,7 +134,6 @@ func TestAdd(t *testing.T) {
assert := assert.New(t)
m := new(mockPushReader)
svc := ClientService{
logger: sallust.Default(),
config: Config{},
argus: m,
now: time.Now,
Expand Down Expand Up @@ -181,7 +179,6 @@ func TestAllInternalWebhooks(t *testing.T) {

svc := ClientService{
argus: m,
logger: sallust.Default(),
config: Config{},
}
// nolint:typecheck
Expand Down

0 comments on commit 79be117

Please sign in to comment.