Skip to content

Commit c251fb6

Browse files
authored
llo: calculated streams should skip observations and aggregations (#183)
1 parent 9b8efa6 commit c251fb6

File tree

3 files changed

+14
-0
lines changed

3 files changed

+14
-0
lines changed

llo/plugin_observation.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ func (p *Plugin) observation(ctx context.Context, outctx ocr3types.OutcomeContex
134134
obs.StreamValues = make(StreamValues)
135135
for _, channelDefinition := range previousOutcome.ChannelDefinitions {
136136
for _, strm := range channelDefinition.Streams {
137+
// Calculated streams have no values to observe
138+
if strm.Aggregator == llotypes.AggregatorCalculated {
139+
continue
140+
}
137141
obs.StreamValues[strm.StreamID] = nil
138142
}
139143
}

llo/plugin_outcome.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,12 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos
204204
for cid, cd := range outcome.ChannelDefinitions {
205205
for _, strm := range cd.Streams {
206206
sid, agg := strm.StreamID, strm.Aggregator
207+
208+
// Calculated streams are handled after all streams are aggregated
209+
if strm.Aggregator == llotypes.AggregatorCalculated {
210+
continue
211+
}
212+
207213
if _, exists := outcome.StreamAggregates[sid][agg]; exists {
208214
// Should only happen in the case of duplicate
209215
// streams, no need to aggregate twice.

llo/stream_calculated.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,10 @@ func (p *Plugin) ProcessCalculatedStreams(outcome *Outcome) {
436436
var err error
437437
env := NewEnv(outcome)
438438
for _, stream := range cd.Streams {
439+
if stream.Aggregator == llotypes.AggregatorCalculated {
440+
continue
441+
}
442+
439443
p.Logger.Debugw("setting stream value", "channelID", cid, "streamID", stream.StreamID, "aggregator", stream.Aggregator)
440444

441445
if err = env.SetStreamValue(stream.StreamID, outcome.StreamAggregates[stream.StreamID][stream.Aggregator]); err != nil {

0 commit comments

Comments
 (0)