Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor, only get loggers from context #263

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 12 additions & 25 deletions chrysom/listenerClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,12 @@ type ListenerConfig struct {
// PullInterval is how often listeners should get updates.
// (Optional). Defaults to 5 seconds.
PullInterval time.Duration

// Logger to be used by the client.
// (Optional). By default a no op logger will be used.
Logger *zap.Logger
}

// ListenerClient is the client used to poll Argus for updates.
type ListenerClient struct {
observer *observerConfig
logger *zap.Logger
setLogger func(context.Context, *zap.Logger) context.Context
reader Reader
observer *observerConfig
reader Reader
}

type observerConfig struct {
Expand All @@ -73,16 +67,10 @@ type observerConfig struct {

// NewListenerClient creates a new ListenerClient to be used to poll Argus
// for updates.
func NewListenerClient(config ListenerConfig,
setLogger func(context.Context, *zap.Logger) context.Context,
measures Measures, r Reader,
) (*ListenerClient, error) {
func NewListenerClient(config ListenerConfig, measures Measures, r Reader) (*ListenerClient, error) {
if config.Listener == nil {
return nil, ErrNoListenerProvided
}
if config.Logger == nil {
config.Logger = sallust.Default()
}
if config.PullInterval == 0 {
config.PullInterval = defaultPullInterval
}
Expand All @@ -97,27 +85,26 @@ func NewListenerClient(config ListenerConfig,
measures: measures,
shutdown: make(chan struct{}),
},
logger: config.Logger,
setLogger: setLogger,
reader: r,
reader: r,
}, nil
}

// Start begins listening for updates on an interval given that client configuration
// is setup correctly. If a listener process is already in progress, calling Start()
// is a NoOp. If you want to restart the current listener process, call Stop() first.
func (c *ListenerClient) Start(ctx context.Context) error {
logger := sallust.Get(ctx)
if c.observer == nil || c.observer.listener == nil {
c.logger.Warn("No listener was setup to receive updates.")
logger.Warn("No listener was setup to receive updates.")
return nil
}
if c.observer.ticker == nil {
c.logger.Error("Observer ticker is nil", zap.Error(ErrUndefinedIntervalTicker))
logger.Error("Observer ticker is nil", zap.Error(ErrUndefinedIntervalTicker))
return ErrUndefinedIntervalTicker
}

if !atomic.CompareAndSwapInt32(&c.observer.state, stopped, transitioning) {
c.logger.Error("Start called when a listener was not in stopped state", zap.Error(ErrListenerNotStopped))
logger.Error("Start called when a listener was not in stopped state", zap.Error(ErrListenerNotStopped))
return ErrListenerNotStopped
}

Expand All @@ -129,13 +116,12 @@ func (c *ListenerClient) Start(ctx context.Context) error {
return
case <-c.observer.ticker.C:
outcome := SuccessOutcome
ctx := c.setLogger(context.Background(), c.logger)
items, err := c.reader.GetItems(ctx, "")
if err == nil {
c.observer.listener.Update(items)
} else {
outcome = FailureOutcome
c.logger.Error("Failed to get items for listeners", zap.Error(err))
logger.Error("Failed to get items for listeners", zap.Error(err))
}
c.observer.measures.PollsTotalCounter.With(prometheus.Labels{
OutcomeLabel: outcome}).Add(1)
Expand All @@ -155,9 +141,10 @@ func (c *ListenerClient) Stop(ctx context.Context) error {
return nil
}

logger := sallust.Get(ctx)
if !atomic.CompareAndSwapInt32(&c.observer.state, running, transitioning) {
c.logger.Error("Stop called when a listener was not in running state", zap.Error(ErrListenerNotStopped))
return ErrListenerNotRunning
logger.Error("Stop called when a listener was not in running state", zap.Error(ErrListenerNotStopped))
return errors.Join(ErrListenerNotStopped, ErrListenerNotRunning)
}

c.observer.ticker.Stop()
Expand Down
9 changes: 3 additions & 6 deletions chrysom/listenerClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/xmidt-org/sallust"
)

var (
Expand All @@ -35,7 +34,6 @@ var (
happyListenerConfig = ListenerConfig{
Listener: mockListener,
PullInterval: time.Second,
Logger: sallust.Default(),
}
)

Expand All @@ -59,7 +57,7 @@ func TestListenerStartStopPairsParallel(t *testing.T) {
time.Sleep(time.Millisecond * 400)
errStop := client.Stop(context.Background())
if errStop != nil {
assert.Equal(ErrListenerNotRunning, errStop)
assert.ErrorIs(errStop, ErrListenerNotRunning)
}
fmt.Printf("%d: Done\n", testNumber)
})
Expand Down Expand Up @@ -111,12 +109,11 @@ func newStartStopClient(includeListener bool) (*ListenerClient, func(), error) {

config := ListenerConfig{
PullInterval: time.Millisecond * 200,
Logger: sallust.Default(),
}
if includeListener {
config.Listener = mockListener
}
client, err := NewListenerClient(config, sallust.With, mockMeasures, &BasicClient{})
client, err := NewListenerClient(config, mockMeasures, &BasicClient{})
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -153,7 +150,7 @@ func TestNewListenerClient(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
assert := assert.New(t)
_, err := NewListenerClient(tc.config, sallust.With, tc.measures, tc.reader)
_, err := NewListenerClient(tc.config, tc.measures, tc.reader)
assert.True(errors.Is(err, tc.expectedErr),
fmt.Errorf("error [%v] doesn't contain error [%v] in its err chain",
err, tc.expectedErr),
Expand Down
24 changes: 3 additions & 21 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"time"

"github.com/xmidt-org/ancla/chrysom"
"github.com/xmidt-org/sallust"
"go.uber.org/fx"
"go.uber.org/zap"
)

const errFmt = "%w: %v"
Expand Down Expand Up @@ -42,11 +40,6 @@ type Service interface {
type Config struct {
BasicClientConfig chrysom.BasicClientConfig

// Logger for this package.
// Gets passed to Argus config before initializing the client.
// (Optional). Defaults to a no op logger.
Logger *zap.Logger

// JWTParserType establishes which parser type will be used by the JWT token
// acquirer used by Argus. Options include 'simple' and 'raw'.
// Simple: parser assumes token payloads have the following structure: https://github.com/xmidt-org/bascule/blob/c011b128d6b95fa8358228535c63d1945347adaa/acquire/bearer.go#L77
Expand All @@ -68,23 +61,18 @@ type Config struct {

type ClientService struct {
argus chrysom.PushReader
logger *zap.Logger
config Config
now func() time.Time
}

// NewService builds the Argus client service from the given configuration.
func NewService(cfg Config) (*ClientService, error) {
if cfg.Logger == nil {
cfg.Logger = sallust.Default()
}
prepArgusBasicClientConfig(&cfg)
basic, err := chrysom.NewBasicClient(cfg.BasicClientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create chrysom basic client: %v", err)
}
svc := &ClientService{
logger: cfg.Logger,
argus: basic,
config: cfg,
now: time.Now,
Expand All @@ -95,12 +83,9 @@ func NewService(cfg Config) (*ClientService, error) {
// StartListener builds the Argus listener client service from the given configuration.
// It allows adding watchers for the internal subscription state. Call the returned
// function when you are done watching for updates.
func (s *ClientService) StartListener(cfg chrysom.ListenerConfig, setLogger func(context.Context, *zap.Logger) context.Context, metrics chrysom.Measures, watches ...Watch) (func(), error) {
if cfg.Logger == nil {
cfg.Logger = sallust.Default()
}
func (s *ClientService) StartListener(cfg chrysom.ListenerConfig, metrics chrysom.Measures, watches ...Watch) (func(), error) {
prepArgusListenerConfig(&cfg, metrics, watches...)
listener, err := chrysom.NewListenerClient(cfg, setLogger, metrics, s.argus)
listener, err := chrysom.NewListenerClient(cfg, metrics, s.argus)
if err != nil {
return nil, fmt.Errorf("failed to create chrysom listener client: %v", err)
}
Expand Down Expand Up @@ -157,12 +142,10 @@ func prepArgusBasicClientConfig(cfg *Config) error {
}

func prepArgusListenerConfig(cfg *chrysom.ListenerConfig, metrics chrysom.Measures, watches ...Watch) {
logger := cfg.Logger
watches = append(watches, webhookListSizeWatch(metrics.WebhookListSizeGauge))
cfg.Listener = chrysom.ListenerFunc(func(items chrysom.Items) {
iws, err := ItemsToInternalWebhooks(items)
if err != nil {
logger.Error("Failed to convert items to webhooks", zap.Error(err))
return
}
for _, watch := range watches {
Expand Down Expand Up @@ -195,7 +178,6 @@ type ListenerIn struct {
fx.In

Measures chrysom.Measures
Logger *zap.Logger
Svc *ClientService
listenerConfig chrysom.ListenerConfig
Watcher Watch
Expand All @@ -206,7 +188,7 @@ func ProvideListener() fx.Option {
return fx.Options(
fx.Provide(
func(in ListenerIn) (err error) {
stopWatches, err := in.Svc.StartListener(in.listenerConfig, sallust.With, in.Measures, in.Watcher)
stopWatches, err := in.Svc.StartListener(in.listenerConfig, in.Measures, in.Watcher)
if err != nil {
return fmt.Errorf("webhook service start listener error: %v", err)
}
Expand Down
5 changes: 1 addition & 4 deletions service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/xmidt-org/ancla/chrysom"
"github.com/xmidt-org/ancla/model"
"github.com/xmidt-org/sallust"
)

func TestNewService(t *testing.T) {
Expand Down Expand Up @@ -77,7 +76,7 @@ func TestStartListener(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.desc, func(t *testing.T) {
assert := assert.New(t)
_, err := tc.svc.StartListener(tc.listenerConfig, sallust.With, chrysom.Measures{})
_, err := tc.svc.StartListener(tc.listenerConfig, chrysom.Measures{})
if tc.expectedErr {
assert.NotNil(err)
return
Expand Down Expand Up @@ -135,7 +134,6 @@ func TestAdd(t *testing.T) {
assert := assert.New(t)
m := new(mockPushReader)
svc := ClientService{
logger: sallust.Default(),
config: Config{},
argus: m,
now: time.Now,
Expand Down Expand Up @@ -181,7 +179,6 @@ func TestAllInternalWebhooks(t *testing.T) {

svc := ClientService{
argus: m,
logger: sallust.Default(),
config: Config{},
}
// nolint:typecheck
Expand Down
Loading