Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions aggregator/pkg/common/time_provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package common

import (
"testing"
"time"
)

func TestNewRealTimeProvider_NowReturnsCurrentTime(t *testing.T) {
tp := NewRealTimeProvider()
before := time.Now().Add(-1 * time.Second)
now := tp.Now()
after := time.Now().Add(1 * time.Second)

if now.Before(before) || now.After(after) {
t.Fatalf("expected Now() to be close to current time, got %v", now)
}
}

func TestMockTimeProvider_SetAndAdvance(t *testing.T) {
initial := time.Date(2024, 10, 1, 12, 0, 0, 0, time.UTC)
tp := NewMockTimeProvider(initial)

if got := tp.Now(); !got.Equal(initial) {
t.Fatalf("expected initial time %v, got %v", initial, got)
}

// SetTime
next := initial.Add(5 * time.Minute)
tp.SetTime(next)
if got := tp.Now(); !got.Equal(next) {
t.Fatalf("expected set time %v, got %v", next, got)
}

// AdvanceTime
tp.AdvanceTime(30 * time.Second)
want := next.Add(30 * time.Second)
if got := tp.Now(); !got.Equal(want) {
t.Fatalf("expected advanced time %v, got %v", want, got)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package configuration

import (
"os"
"path/filepath"
"testing"
)

func TestLoadConfig_Success_MinimalMemory(t *testing.T) {
tmpDir := t.TempDir()
cfgPath := filepath.Join(tmpDir, "agg.toml")
content := `
[server]
address = ":50051"

[storage]
type = "memory"

[chainStatuses]
maxChainStatusesPerRequest = 10

[rateLimiting]
enabled = false

[committees]
[committees.default]
[committees.default.quorumConfigs]
`
if err := os.WriteFile(cfgPath, []byte(content), 0o600); err != nil {
t.Fatalf("failed to write temp config: %v", err)
}

cfg, err := LoadConfig(cfgPath)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if cfg == nil || cfg.Server.Address != ":50051" {
t.Fatalf("unexpected config: %+v", cfg)
}
if cfg.Storage == nil || string(cfg.Storage.StorageType) != "memory" {
t.Fatalf("expected memory storage, got %+v", cfg.Storage)
}
}

func TestLoadConfig_Error_FileMissing(t *testing.T) {
if _, err := LoadConfig("/non/existent/file.toml"); err == nil {
t.Fatalf("expected error for missing file")
}
}
100 changes: 100 additions & 0 deletions aggregator/pkg/health/http_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package health

import (
"context"
"encoding/json"
"net/http/httptest"
"testing"

"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/common"
"github.com/smartcontractkit/chainlink-ccv/protocol/common/logging"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

type stubHealthyComponent struct{}

func (s *stubHealthyComponent) HealthCheck(_ context.Context) *common.ComponentHealth {
return &common.ComponentHealth{Name: "stub", Status: common.HealthStatusHealthy}
}

type stubDegradedComponent struct{}

func (s *stubDegradedComponent) HealthCheck(_ context.Context) *common.ComponentHealth {
return &common.ComponentHealth{Name: "stub", Status: common.HealthStatusDegraded}
}

type stubUnhealthyComponent struct{}

func (s *stubUnhealthyComponent) HealthCheck(_ context.Context) *common.ComponentHealth {
return &common.ComponentHealth{Name: "stub", Status: common.HealthStatusUnhealthy}
}

func newTestLogger(t *testing.T) logger.SugaredLogger {
t.Helper()
lggr, err := logger.NewWith(logging.DevelopmentConfig(zapcore.WarnLevel))
if err != nil {
t.Fatalf("failed to create logger: %v", err)
}
return logger.Sugared(lggr)
}

func TestHTTPHealthServer_Liveness(t *testing.T) {
m := NewManager()
h := NewHTTPHealthServer(m, "0", newTestLogger(t))

req := httptest.NewRequest("GET", "/health/live", nil)
rr := httptest.NewRecorder()
h.handleLiveness(rr, req)

if rr.Code != 200 {
t.Fatalf("expected 200, got %d", rr.Code)
}
var payload common.ComponentHealth
if err := json.Unmarshal(rr.Body.Bytes(), &payload); err != nil {
t.Fatalf("invalid json: %v", err)
}
if payload.Status != common.HealthStatusHealthy {
t.Fatalf("expected healthy, got %s", payload.Status)
}
}

func TestHTTPHealthServer_Readiness_StatusCodes(t *testing.T) {
// Healthy
{
m := NewManager()
m.Register(&stubHealthyComponent{})
h := NewHTTPHealthServer(m, "0", newTestLogger(t))
req := httptest.NewRequest("GET", "/health/ready", nil)
rr := httptest.NewRecorder()
h.handleReadiness(rr, req)
if rr.Code != 200 {
t.Fatalf("expected 200 for healthy, got %d", rr.Code)
}
}
// Degraded
{
m := NewManager()
m.Register(&stubDegradedComponent{})
h := NewHTTPHealthServer(m, "0", newTestLogger(t))
req := httptest.NewRequest("GET", "/health/ready", nil)
rr := httptest.NewRecorder()
h.handleReadiness(rr, req)
if rr.Code != 200 {
t.Fatalf("expected 200 for degraded, got %d", rr.Code)
}
}
// Unhealthy
{
m := NewManager()
m.Register(&stubUnhealthyComponent{})
h := NewHTTPHealthServer(m, "0", newTestLogger(t))
req := httptest.NewRequest("GET", "/health/ready", nil)
rr := httptest.NewRecorder()
h.handleReadiness(rr, req)
if rr.Code != 503 {
t.Fatalf("expected 503 for unhealthy, got %d", rr.Code)
}
}
}
47 changes: 47 additions & 0 deletions aggregator/pkg/middlewares/metric_middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package middlewares

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/mock"
"google.golang.org/grpc"

aggregation_mocks "github.com/smartcontractkit/chainlink-ccv/aggregator/internal/aggregation_mocks"
)

func TestMetricMiddleware_RecordsSuccessAndDuration(t *testing.T) {
metric := aggregation_mocks.NewMockAggregatorMetricLabeler(t)
monitoring := aggregation_mocks.NewMockAggregatorMonitoring(t)

monitoring.EXPECT().Metrics().Return(metric)
metric.EXPECT().With("apiName", "/svc/Method").Return(metric).Maybe()
metric.EXPECT().IncrementActiveRequestsCounter(context.Background())
metric.EXPECT().DecrementActiveRequestsCounter(context.Background())
metric.EXPECT().RecordAPIRequestDuration(context.Background(), mock.Anything)

mm := NewMetricMiddleware(monitoring)
info := &grpc.UnaryServerInfo{FullMethod: "/svc/Method"}
handler := func(ctx context.Context, req any) (any, error) { return "ok", nil }

_, _ = mm.Intercept(context.Background(), nil, info, handler)
}

func TestMetricMiddleware_RecordsError(t *testing.T) {
metric := aggregation_mocks.NewMockAggregatorMetricLabeler(t)
monitoring := aggregation_mocks.NewMockAggregatorMonitoring(t)

monitoring.EXPECT().Metrics().Return(metric)
metric.EXPECT().With("apiName", "/svc/Err").Return(metric).Maybe()
metric.EXPECT().IncrementActiveRequestsCounter(context.Background())
metric.EXPECT().DecrementActiveRequestsCounter(context.Background())
metric.EXPECT().RecordAPIRequestDuration(context.Background(), mock.Anything)
metric.EXPECT().IncrementAPIRequestErrors(context.Background())

mm := NewMetricMiddleware(monitoring)
info := &grpc.UnaryServerInfo{FullMethod: "/svc/Err"}
handler := func(ctx context.Context, req any) (any, error) { return nil, errors.New("boom") }

_, _ = mm.Intercept(context.Background(), nil, info, handler)
}
32 changes: 32 additions & 0 deletions aggregator/pkg/middlewares/scoping_middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package middlewares

import (
"context"
"testing"

"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/scope"

aggregation_mocks "github.com/smartcontractkit/chainlink-ccv/aggregator/internal/aggregation_mocks"
)

func TestScopingMiddleware_SetsAPINameInContext(t *testing.T) {
m := NewScopingMiddleware()

// Prepare a mock labeler to assert AugmentMetrics adds apiName label
labeler := aggregation_mocks.NewMockAggregatorMetricLabeler(t)
// Expect apiName to be added by middleware
fullMethod := "/chainlink_ccv.v1.VerifierResultAPI/GetMessagesSince"
labeler.EXPECT().With("apiName", fullMethod).Return(labeler)

info := &grpc.UnaryServerInfo{FullMethod: fullMethod}

handler := func(ctx context.Context, req any) (any, error) {
// Scoping middleware should have placed apiName in context
_ = scope.AugmentMetrics(ctx, labeler)
return nil, nil
}

_, _ = m.Intercept(context.Background(), nil, info, handler)
}
81 changes: 81 additions & 0 deletions aggregator/pkg/model/config_validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package model

import (
"testing"
)

func TestAggregatorConfig_Validate_ErrorScenarios(t *testing.T) {
tests := []struct {
name string
mutate func(c *AggregatorConfig)
wantErrContains string
}{
{
name: "chainStatuses must be >0",
mutate: func(c *AggregatorConfig) { c.ChainStatuses.MaxChainStatusesPerRequest = -1 },
wantErrContains: "chain status configuration error",
},
{
name: "batch size must be >0",
mutate: func(c *AggregatorConfig) { c.MaxMessageIDsPerBatch = -1 },
wantErrContains: "batch configuration error",
},
{
name: "batch size cannot exceed 1000",
mutate: func(c *AggregatorConfig) { c.MaxMessageIDsPerBatch = 2000 },
wantErrContains: "batch configuration error",
},
{
name: "aggregation.channelBufferSize must be >0",
mutate: func(c *AggregatorConfig) { c.Aggregation.ChannelBufferSize = -1 },
wantErrContains: "aggregation configuration error",
},
{
name: "aggregation.backgroundWorkerCount must be >0",
mutate: func(c *AggregatorConfig) { c.Aggregation.BackgroundWorkerCount = -1 },
wantErrContains: "aggregation configuration error",
},
{
name: "storage.pageSize must be >0",
mutate: func(c *AggregatorConfig) { c.Storage.PageSize = -1 },
wantErrContains: "storage configuration error",
},
{
name: "invalid API key empty id",
mutate: func(c *AggregatorConfig) {
c.APIKeys.Clients["abc"] = &APIClient{ClientID: ""}
},
wantErrContains: "api key configuration error",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
cfg := &AggregatorConfig{
Storage: &StorageConfig{PageSize: 10},
APIKeys: APIKeyConfig{Clients: map[string]*APIClient{}},
ChainStatuses: ChainStatusConfig{MaxChainStatusesPerRequest: 1},
Aggregation: AggregationConfig{ChannelBufferSize: 1, BackgroundWorkerCount: 1},
MaxMessageIDsPerBatch: 1,
}
tc.mutate(cfg)
if err := cfg.Validate(); err == nil {
t.Fatalf("expected error containing %q", tc.wantErrContains)
}
})
}
}

func TestAggregatorConfig_Validate_Success(t *testing.T) {
cfg := &AggregatorConfig{
Storage: &StorageConfig{PageSize: 10},
APIKeys: APIKeyConfig{Clients: map[string]*APIClient{"key1": {ClientID: "client1"}}},
ChainStatuses: ChainStatusConfig{MaxChainStatusesPerRequest: 1},
Aggregation: AggregationConfig{ChannelBufferSize: 10, BackgroundWorkerCount: 2},
MaxMessageIDsPerBatch: 10,
RateLimiting: RateLimitingConfig{GroupLimits: map[string]map[string]RateLimitConfig{}},
}
if err := cfg.Validate(); err != nil {
t.Fatalf("expected no error, got %v", err)
}
}
26 changes: 26 additions & 0 deletions aggregator/pkg/monitoring/noop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package monitoring

import (
"context"
"testing"
"time"
)

func TestNoopAggregatorMonitoring_DoesNotPanic(t *testing.T) {
m := NewNoopAggregatorMonitoring()
lbl := m.Metrics()

ctx := context.Background()
_ = lbl.With("key", "value")
lbl.IncrementActiveRequestsCounter(ctx)
lbl.DecrementActiveRequestsCounter(ctx)
lbl.IncrementCompletedAggregations(ctx)
lbl.RecordAPIRequestDuration(ctx, 10*time.Millisecond)
lbl.IncrementAPIRequestErrors(ctx)
lbl.RecordMessageSinceNumberOfRecordsReturned(ctx, 5)
lbl.IncrementPendingAggregationsChannelBuffer(ctx, 2)
lbl.DecrementPendingAggregationsChannelBuffer(ctx, 1)
lbl.RecordStorageLatency(ctx, 5*time.Millisecond)
lbl.IncrementStorageError(ctx)
lbl.RecordTimeToAggregation(ctx, time.Second)
}
Loading
Loading