Skip to content

Commit d9c72e7

Browse files
authored
Merge branch 'main' into custom-msg-jsonpb-marshal
2 parents fe41c40 + 17bfd8d commit d9c72e7

File tree

9 files changed

+250
-134
lines changed

9 files changed

+250
-134
lines changed

pkg/capabilities/consensus/ocr3/datafeeds/feeds_aggregator.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ const (
4343
)
4444

4545
type aggregatorConfig struct {
46-
Feeds map[datastreams.FeedID]feedConfig
46+
Feeds map[datastreams.FeedID]FeedConfig
4747
// AllowedPartialStaleness is an optional optimization that tries to maximize batching.
4848
// Once any deviation or heartbeat threshold hits, we will include all other feeds that are
4949
// within the AllowedPartialStaleness range of their own heartbeat.
@@ -52,18 +52,30 @@ type aggregatorConfig struct {
5252
AllowedPartialStalenessStr string `mapstructure:"allowedPartialStaleness"`
5353
}
5454

55-
type feedConfig struct {
56-
Deviation decimal.Decimal `mapstructure:"-"`
57-
Heartbeat int // seconds
58-
DeviationString string `mapstructure:"deviation"`
59-
RemappedIDHex string `mapstructure:"remappedId"`
60-
RemappedID []byte `mapstructure:"-"`
55+
// FeedConfig defines the configuration for each individual feed used by the aggregator.
56+
// It's map representation is used directly in user-defined workflows to specify the configuration for each feed.
57+
type FeedConfig struct {
58+
Heartbeat int // seconds
59+
Deviation string `mapstructure:"deviation"`
60+
RemappedIDHex string `mapstructure:"remappedId"` // DO NOT CHANGE THIS. It's user facing in existing DataFeeds configurations and should be kept consistent for backward compatibility.
61+
// internal fields set by [ParseConfig] after parsing the config
62+
// work around mapstructure limitations to allow for decimal.Decimal and byte slices
63+
parsedDeviation decimal.Decimal
64+
remappedID []byte
6165
}
6266

63-
func (c feedConfig) HeartbeatNanos() int64 {
67+
func (c FeedConfig) HeartbeatNanos() int64 {
6468
return int64(c.Heartbeat) * time.Second.Nanoseconds()
6569
}
6670

71+
func (c FeedConfig) DeviationAsDecimal() decimal.Decimal {
72+
return c.parsedDeviation
73+
}
74+
75+
func (c FeedConfig) RemappedID() []byte {
76+
return c.remappedID
77+
}
78+
6779
type dataFeedsAggregator struct {
6880
config aggregatorConfig
6981
reportCodec datastreams.ReportCodec
@@ -146,10 +158,10 @@ func (a *dataFeedsAggregator) Aggregate(lggr logger.Logger, previousOutcome *typ
146158
"oldPrice", oldPrice,
147159
"newPrice", newPrice,
148160
"currDeviation", currDeviation,
149-
"deviation", config.Deviation.InexactFloat64(),
161+
"deviation", config.DeviationAsDecimal().InexactFloat64(),
150162
)
151163
if currStaleness > int64(config.Heartbeat) ||
152-
currDeviation > config.Deviation.InexactFloat64() {
164+
currDeviation > config.DeviationAsDecimal().InexactFloat64() {
153165
previousReportInfo.ObservationTimestamp = latestReport.ObservationTimestamp
154166
previousReportInfo.BenchmarkPrice = latestReport.BenchmarkPrice
155167
reportsNeedingUpdate = append(reportsNeedingUpdate, latestReport)
@@ -177,7 +189,7 @@ func (a *dataFeedsAggregator) Aggregate(lggr logger.Logger, previousOutcome *typ
177189
toWrap := make([]any, 0, len(reportsNeedingUpdate))
178190
for _, report := range reportsNeedingUpdate {
179191
feedID := datastreams.FeedID(report.FeedID).Bytes()
180-
remappedID := a.config.Feeds[datastreams.FeedID(report.FeedID)].RemappedID
192+
remappedID := a.config.Feeds[datastreams.FeedID(report.FeedID)].RemappedID()
181193
if len(remappedID) == 0 { // fall back to original ID
182194
remappedID = feedID[:]
183195
}
@@ -348,15 +360,15 @@ func ParseConfig(config values.Map) (aggregatorConfig, error) {
348360
}
349361

350362
for feedID, feedCfg := range parsedConfig.Feeds {
351-
if feedCfg.DeviationString != "" {
363+
if feedCfg.Deviation != "" {
352364
if _, err := datastreams.NewFeedID(feedID.String()); err != nil {
353365
return aggregatorConfig{}, fmt.Errorf("cannot parse feedID config for feed %s: %w", feedID, err)
354366
}
355-
dec, err := decimal.NewFromString(feedCfg.DeviationString)
367+
dec, err := decimal.NewFromString(feedCfg.Deviation)
356368
if err != nil {
357369
return aggregatorConfig{}, fmt.Errorf("cannot parse deviation config for feed %s: %w", feedID, err)
358370
}
359-
feedCfg.Deviation = dec
371+
feedCfg.parsedDeviation = dec
360372
parsedConfig.Feeds[feedID] = feedCfg
361373
}
362374
trimmed, nonEmpty := strings.CutPrefix(feedCfg.RemappedIDHex, "0x")
@@ -365,7 +377,7 @@ func ParseConfig(config values.Map) (aggregatorConfig, error) {
365377
if err != nil {
366378
return aggregatorConfig{}, fmt.Errorf("cannot parse remappedId config for feed %s: %w", feedID, err)
367379
}
368-
feedCfg.RemappedID = rawRemappedID
380+
feedCfg.remappedID = rawRemappedID
369381
parsedConfig.Feeds[feedID] = feedCfg
370382
}
371383
}

pkg/capabilities/consensus/ocr3/datafeeds/feeds_aggregator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,9 @@ func TestDataFeedsAggregator_ParseConfig(t *testing.T) {
206206
config := getConfig(t, feedIDA.String(), "0.1", heartbeatA)
207207
parsedConfig, err := datafeeds.ParseConfig(*config)
208208
require.NoError(t, err)
209-
require.Equal(t, deviationA, parsedConfig.Feeds[feedIDA].Deviation)
209+
require.Equal(t, deviationA, parsedConfig.Feeds[feedIDA].DeviationAsDecimal())
210210
require.Equal(t, heartbeatA, parsedConfig.Feeds[feedIDA].Heartbeat)
211-
require.Equal(t, deviationB, parsedConfig.Feeds[feedIDB].Deviation)
211+
require.Equal(t, deviationB, parsedConfig.Feeds[feedIDB].DeviationAsDecimal())
212212
require.Equal(t, heartbeatB, parsedConfig.Feeds[feedIDB].Heartbeat)
213213
require.InEpsilon(t, allowedPartialStaleness, parsedConfig.AllowedPartialStaleness, math.SmallestNonzeroFloat64)
214214
})
Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,7 @@
11
package datafeeds
22

3-
import (
4-
"strconv"
5-
"testing"
6-
7-
"github.com/stretchr/testify/require"
8-
9-
"github.com/smartcontractkit/chainlink-common/pkg/values"
10-
)
11-
123
// exported for testing only
134
var LLOStreamPrices = lloStreamPrices
145

156
var DecimalToBigInt = decimalToBigInt
167
var BigIntToDecimal = bigIntToDecimal
17-
18-
type FeedConfig = feedConfig
19-
20-
// helper function to create a map of feed configs
21-
func NewLLOconfig(t *testing.T, m map[uint32]FeedConfig, opts ...lloConfigOpt) values.Map {
22-
unwrappedConfig := map[string]any{
23-
"streams": map[string]any{},
24-
}
25-
26-
for feedID, cfg := range m {
27-
unwrappedConfig["streams"].(map[string]any)[strconv.FormatUint(uint64(feedID), 10)] = map[string]any{
28-
"deviation": cfg.Deviation.String(),
29-
"heartbeat": cfg.Heartbeat,
30-
"remappedID": cfg.RemappedIDHex,
31-
}
32-
}
33-
for _, opt := range opts {
34-
opt(t, unwrappedConfig)
35-
}
36-
config, err := values.NewMap(unwrappedConfig)
37-
require.NoError(t, err)
38-
return *config
39-
}
40-
41-
type lloConfigOpt = func(t *testing.T, m map[string]any)
42-
43-
func LLOConfigAllowStaleness(staleness float64) lloConfigOpt {
44-
return func(t *testing.T, m map[string]any) {
45-
m["allowedPartialStaleness"] = strconv.FormatFloat(staleness, 'f', -1, 64)
46-
}
47-
}

pkg/capabilities/consensus/ocr3/datafeeds/llo_aggregator.go

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,25 +45,48 @@ var (
4545
type LLOAggregatorConfig struct {
4646
// workaround for the fact that mapstructure doesn't support uint32 keys
4747
//streams map[uint32]feedConfig `mapstructure:"-"`
48-
StreamsStr map[string]feedConfig `mapstructure:"streams"`
48+
Streams map[string]FeedConfig `mapstructure:"streams"`
4949
// allowedPartialStaleness is an optional optimization that tries to maximize batching.
5050
// Once any deviation or heartbeat threshold hits, we will include all other feeds that are
5151
// within the allowedPartialStaleness range of their own heartbeat.
5252
// For example, setting 0.2 will include all feeds that are within 20% of their heartbeat.
5353
//allowedPartialStaleness float64 `mapstructure:"-"`
5454
// workaround for the fact that mapstructure doesn't support float64 keys
55-
AllowedPartialStalenessStr string `mapstructure:"allowedPartialStaleness"`
55+
AllowedPartialStaleness string `mapstructure:"allowedPartialStaleness"`
56+
}
57+
58+
// ToMap converts the LLOAggregatorConfig to a values.Map, which is suitable for the
59+
// [NewAggegator] function in the OCR3 Aggregator interface.
60+
func (c LLOAggregatorConfig) ToMap() (*values.Map, error) {
61+
v, err := values.WrapMap(c)
62+
if err != nil {
63+
// this should never happen since we are wrapping a struct
64+
return &values.Map{}, fmt.Errorf("failed to wrap LLOAggregatorConfig: %w", err)
65+
}
66+
return v, nil
67+
}
68+
69+
func NewLLOConfig(m values.Map) (LLOAggregatorConfig, error) {
70+
// Create a default LLOAggregatorConfig
71+
config := LLOAggregatorConfig{
72+
Streams: make(map[string]FeedConfig),
73+
}
74+
if err := m.UnwrapTo(&config); err != nil {
75+
return LLOAggregatorConfig{}, fmt.Errorf("failed to unwrap values.Map to LLOAggregatorConfig: %w", err)
76+
}
77+
78+
return config, nil
5679
}
5780

5881
func (c LLOAggregatorConfig) convertToInternal() (parsedLLOAggregatorConfig, error) {
5982
parsedConfig := parsedLLOAggregatorConfig{
60-
streams: make(map[uint32]feedConfig),
83+
streams: make(map[uint32]FeedConfig),
6184
}
6285
cfgErr := func(err error) error {
6386
cfgErr := fmt.Errorf("llo aggregator config: %w", ErrInvalidConfig)
6487
return errors.Join(cfgErr, err)
6588
}
66-
for s, cfg := range c.StreamsStr {
89+
for s, cfg := range c.Streams {
6790
id, err := strconv.ParseUint(s, 10, 32)
6891
if err != nil {
6992
// this should never happen since we are using a mapstructure-compatible config
@@ -77,25 +100,25 @@ func (c LLOAggregatorConfig) convertToInternal() (parsedLLOAggregatorConfig, err
77100
if cfg.RemappedIDHex == "" {
78101
return parsedConfig, cfgErr(fmt.Errorf("remappedID is required for stream %d", streamID))
79102
}
80-
if cfg.DeviationString != "" {
81-
dec, err := decimal.NewFromString(cfg.DeviationString)
103+
if cfg.Deviation != "" {
104+
dec, err := decimal.NewFromString(cfg.Deviation)
82105
if err != nil {
83106
return parsedConfig, cfgErr(fmt.Errorf("cannot parse deviation config for feed %d: %w", streamID, err))
84107
}
85-
cfg.Deviation = dec
108+
cfg.parsedDeviation = dec
86109
parsedConfig.streams[streamID] = cfg
87110
}
88111
trimmed := strings.TrimPrefix(cfg.RemappedIDHex, "0x")
89112
rawRemappedID, err := hex.DecodeString(trimmed)
90113
if err != nil {
91114
return parsedConfig, cfgErr(fmt.Errorf("cannot parse remappedId config for feed %d: %w", streamID, err))
92115
}
93-
cfg.RemappedID = rawRemappedID
116+
cfg.remappedID = rawRemappedID
94117
parsedConfig.streams[streamID] = cfg
95118
}
96119
// convert allowedPartialStaleness from string to float64
97-
if c.AllowedPartialStalenessStr != "" {
98-
allowedPartialStaleness, err := decimal.NewFromString(c.AllowedPartialStalenessStr)
120+
if c.AllowedPartialStaleness != "" {
121+
allowedPartialStaleness, err := decimal.NewFromString(c.AllowedPartialStaleness)
99122
if err != nil {
100123
return parsedConfig, cfgErr(fmt.Errorf("cannot parse allowedPartialStaleness: %w", err))
101124
}
@@ -108,7 +131,7 @@ func (c LLOAggregatorConfig) convertToInternal() (parsedLLOAggregatorConfig, err
108131
// the separation is because mapstructure only supports string keys.
109132
// the are exposed in LLOAggregatorConfig for the config which is then processed into this internal representation.
110133
type parsedLLOAggregatorConfig struct {
111-
streams map[uint32]feedConfig
134+
streams map[uint32]FeedConfig
112135
allowedPartialStaleness float64
113136
}
114137

@@ -118,6 +141,8 @@ type LLOAggregator struct {
118141
config parsedLLOAggregatorConfig
119142
}
120143

144+
// NewLLOAggregator creates a new LLOAggregator instance based on the provided configuration.
145+
// The config should be a [values.Map] that has represents from the [LLOAggregatorConfig]. See [LLOAggreagatorConfig.ToMap]
121146
func NewLLOAggregator(config values.Map) (types.Aggregator, error) {
122147
parsedConfig, err := parseLLOConfig(config)
123148
if err != nil {
@@ -180,10 +205,10 @@ func (a *LLOAggregator) Aggregate(lggr logger.Logger, previousOutcome *types.Agg
180205
"oldPrice", oldPrice,
181206
"newPrice", newPrice,
182207
"currDeviation", priceDeviation,
183-
"deviation", config.Deviation.InexactFloat64(),
208+
"deviation", config.DeviationAsDecimal().InexactFloat64(),
184209
)
185210
if timeDiffNs > config.HeartbeatNanos() ||
186-
priceDeviation > config.Deviation.InexactFloat64() {
211+
priceDeviation > config.DeviationAsDecimal().InexactFloat64() {
187212
// this stream needs an update
188213
previousStreamInfo.Timestamp = observationTimestamp.UnixNano()
189214
var err2 error
@@ -214,7 +239,7 @@ func (a *LLOAggregator) Aggregate(lggr logger.Logger, previousOutcome *types.Agg
214239

215240
toWrap := make([]*EVMEncodableStreamUpdate, 0, len(mustUpdateIDs))
216241
for _, streamID := range mustUpdateIDs {
217-
remappedID := a.config.streams[streamID].RemappedID
242+
remappedID := a.config.streams[streamID].RemappedID()
218243
newPrice := prices[streamID]
219244
w := &EVMEncodableStreamUpdate{
220245
StreamID: streamID,
@@ -413,7 +438,7 @@ func lloStreamPrices(lggr logger.Logger, wantStreamIDs []uint32, lloEvents map[o
413438
// parseLLOConfig parses the user-facing, type-less, LLO aggregator in the internal typed config.
414439
func parseLLOConfig(config values.Map) (parsedLLOAggregatorConfig, error) {
415440
converter := LLOAggregatorConfig{
416-
StreamsStr: make(map[string]feedConfig),
441+
Streams: make(map[string]FeedConfig),
417442
}
418443
if err := config.UnwrapTo(&converter); err != nil {
419444
return parsedLLOAggregatorConfig{}, err

0 commit comments

Comments
 (0)