Skip to content

Commit a8e8ac7

Browse files
authored
feat(flow): add support for allowed lateness in sliding window (#177)
1 parent 062262b commit a8e8ac7

File tree

2 files changed

+235
-99
lines changed

2 files changed

+235
-99
lines changed

flow/sliding_window.go

Lines changed: 125 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ type SlidingWindowOpts[T any] struct {
2222
// full window duration has elapsed. If false, the first window will only be
2323
// emitted after the full window duration.
2424
EmitPartialWindow bool
25+
// AllowedLateness provides a grace period after the window closes, during which
26+
// late data is still processed. This prevents data loss and improves the
27+
// completeness of results. If AllowedLateness is not specified, records belonging
28+
// to a closed window that arrive late will be discarded.
29+
//
30+
// The specified value must be no larger than the window sliding interval.
31+
AllowedLateness time.Duration
2532
}
2633

2734
// timedElement stores an incoming element along with its event time.
@@ -40,7 +47,9 @@ type SlidingWindow[T any] struct {
4047
mu sync.Mutex
4148
windowSize time.Duration
4249
slidingInterval time.Duration
43-
queue []timedElement[T]
50+
51+
lowerBoundary time.Time
52+
queue []timedElement[T]
4453

4554
in chan any
4655
out chan any
@@ -57,8 +66,8 @@ var _ streams.Flow = (*SlidingWindow[any])(nil)
5766
// respective operation.
5867
// T specifies the incoming element type, and the outgoing element type is []T.
5968
//
60-
// windowSize is the Duration of generated windows.
61-
// slidingInterval is the sliding interval of generated windows.
69+
// windowSize is the duration of each full window.
70+
// slidingInterval is the interval at which new windows are created and emitted.
6271
//
6372
// NewSlidingWindow panics if slidingInterval is larger than windowSize.
6473
func NewSlidingWindow[T any](windowSize, slidingInterval time.Duration) *SlidingWindow[T] {
@@ -69,17 +78,20 @@ func NewSlidingWindow[T any](windowSize, slidingInterval time.Duration) *Sliding
6978
// provided configuration options.
7079
// T specifies the incoming element type, and the outgoing element type is []T.
7180
//
72-
// windowSize is the Duration of generated windows.
73-
// slidingInterval is the sliding interval of generated windows.
81+
// windowSize is the duration of each full window.
82+
// slidingInterval is the interval at which new windows are created and emitted.
7483
// opts are the sliding window configuration options.
7584
//
76-
// NewSlidingWindowWithOpts panics if slidingInterval is larger than windowSize.
85+
// NewSlidingWindowWithOpts panics if slidingInterval is larger than windowSize,
86+
// or the allowed lateness is larger than slidingInterval.
7787
func NewSlidingWindowWithOpts[T any](
7888
windowSize, slidingInterval time.Duration, opts SlidingWindowOpts[T]) *SlidingWindow[T] {
79-
80-
if windowSize < slidingInterval {
89+
if slidingInterval > windowSize {
8190
panic("sliding interval is larger than window size")
8291
}
92+
if opts.AllowedLateness > slidingInterval {
93+
panic("allowed lateness is larger than sliding interval")
94+
}
8395

8496
slidingWindow := &SlidingWindow[T]{
8597
windowSize: windowSize,
@@ -90,10 +102,8 @@ func NewSlidingWindowWithOpts[T any](
90102
opts: opts,
91103
}
92104

93-
// start buffering incoming stream elements
94-
go slidingWindow.receive()
95-
// capture and emit a new window every sliding interval
96-
go slidingWindow.emit()
105+
// start processing stream elements
106+
go slidingWindow.stream()
97107

98108
return slidingWindow
99109
}
@@ -138,56 +148,106 @@ func (sw *SlidingWindow[T]) eventTime(element T) time.Time {
138148
return sw.opts.EventTimeExtractor(element)
139149
}
140150

141-
// receive buffers the incoming elements by pushing them into the queue,
151+
// stream buffers the incoming elements by pushing them into the internal queue,
142152
// wrapping the original item into a timedElement along with its event time.
143-
func (sw *SlidingWindow[T]) receive() {
144-
for element := range sw.in {
145-
eventTime := sw.eventTime(element.(T))
153+
// It starts a goroutine to capture and emit a new window every sliding interval
154+
// after receiving the first element.
155+
func (sw *SlidingWindow[T]) stream() {
156+
processElement := func(element T) {
157+
eventTime := sw.eventTime(element)
146158

147159
sw.mu.Lock()
160+
defer sw.mu.Unlock()
161+
162+
// skip events older than the window lower boundary
163+
if eventTime.Before(sw.lowerBoundary) {
164+
return
165+
}
166+
167+
// add the element to the internal queue
148168
timed := timedElement[T]{
149-
element: element.(T),
169+
element: element,
150170
eventTime: eventTime,
151171
}
152172
sw.queue = append(sw.queue, timed)
153-
sw.mu.Unlock()
154173
}
174+
175+
// Read the first element from the input channel. Its event time will determine
176+
// the lower boundary for the first sliding window.
177+
element, ok := <-sw.in
178+
if !ok {
179+
// The input channel has been closed by the upstream operator, indicating
180+
// that no more data will be received. Signal the completion of the sliding
181+
// window by closing the output channel and return.
182+
close(sw.out)
183+
return
184+
}
185+
186+
// calculate the window start time and process the element
187+
eventTime := sw.eventTime(element.(T))
188+
var delta time.Duration
189+
sw.lowerBoundary, delta = sw.calculateWindowStart(eventTime)
190+
processElement(element.(T))
191+
192+
// start a goroutine to capture and emit a new window every
193+
// sliding interval
194+
go sw.emit(delta)
195+
196+
// process incoming stream elements
197+
for element := range sw.in {
198+
processElement(element.(T))
199+
}
200+
201+
// signal upstream completion
155202
close(sw.done)
156203
}
157204

158-
// emit captures and emits a new window every sw.slidingInterval.
159-
func (sw *SlidingWindow[T]) emit() {
205+
// emit periodically captures and emits completed sliding windows every
206+
// sw.slidingInterval.
207+
// The emission process begins after an initial delay calculated based on
208+
// AllowedLateness, EmitPartialWindow, and the time difference between the
209+
// start of the first window and the current time (delta).
210+
func (sw *SlidingWindow[T]) emit(delta time.Duration) {
160211
defer close(sw.out)
161212

213+
// calculate the initial delay
214+
initialDelay := sw.opts.AllowedLateness
162215
if !sw.opts.EmitPartialWindow {
163-
timer := time.NewTimer(sw.windowSize - sw.slidingInterval)
164-
select {
165-
case <-timer.C:
166-
case <-sw.done:
167-
timer.Stop()
168-
return
169-
}
216+
initialDelay += sw.windowSize - sw.slidingInterval - delta
217+
}
218+
219+
// Wait for the first window iteration, using a timer.
220+
// If sw.done is signaled before the timer expires, the function returns.
221+
timer := time.NewTimer(initialDelay)
222+
select {
223+
case <-timer.C:
224+
case <-sw.done:
225+
timer.Stop()
226+
return
170227
}
171228

172-
lastTick := time.Now()
229+
// create a ticker for periodic emission of sliding windows
173230
ticker := time.NewTicker(sw.slidingInterval)
174231
defer ticker.Stop()
175232

176233
for {
177234
select {
178-
case lastTick = <-ticker.C:
179-
sw.dispatchWindow(lastTick)
235+
case <-ticker.C:
236+
// dispatch the current window
237+
sw.dispatchWindow()
180238

181239
case <-sw.done:
182-
sw.dispatchWindow(lastTick.Add(sw.slidingInterval))
240+
// on shutdown, dispatch one final window to ensure all remaining
241+
// data is processed and return
242+
sw.dispatchWindow()
183243
return
184244
}
185245
}
186246
}
187247

188248
// dispatchWindow is responsible for sending the elements in the current
189249
// window to the output channel and moving the window to the next position.
190-
func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
250+
func (sw *SlidingWindow[T]) dispatchWindow() {
191251
sw.mu.Lock()
192252
defer sw.mu.Unlock()
193253

@@ -197,7 +257,7 @@ func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
197257
})
198258

199259
// extract current window elements
200-
windowElements := sw.extractWindowElements(tick)
260+
windowElements := sw.extractWindowElements()
201261

202262
// send elements downstream if the current window is not empty
203263
if len(windowElements) > 0 {
@@ -208,21 +268,26 @@ func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
208268
// extractWindowElements extracts and returns elements from the sliding window that
209269
// fall within the current window. Elements newer than tick will not be included.
210270
// The sliding window queue is updated to remove previous interval elements.
211-
func (sw *SlidingWindow[T]) extractWindowElements(tick time.Time) []T {
212-
// calculate the next window start time
213-
nextWindowStartTime := tick.Add(-sw.windowSize).Add(sw.slidingInterval)
271+
func (sw *SlidingWindow[T]) extractWindowElements() []T {
272+
// Calculate the upper boundary of the current sliding window.
273+
// Elements with the event time less than or equal to this boundary will be
274+
// included.
275+
upperBoundary := sw.lowerBoundary.Add(sw.windowSize)
276+
// Advance the lower boundary of the sliding window by the sliding interval to
277+
// define the start of the next window.
278+
sw.lowerBoundary = sw.lowerBoundary.Add(sw.slidingInterval)
214279

215280
elements := make([]T, 0, len(sw.queue))
216281
var remainingElements []timedElement[T]
217282
queueLoop:
218283
for i, element := range sw.queue {
219-
if remainingElements == nil && element.eventTime.After(nextWindowStartTime) {
284+
if remainingElements == nil && element.eventTime.After(sw.lowerBoundary) {
220285
// copy remaining elements
221286
remainingElements = make([]timedElement[T], len(sw.queue)-i)
222287
_ = copy(remainingElements, sw.queue[i:])
223288
}
224289
switch {
225-
case element.eventTime.Before(tick):
290+
case !element.eventTime.After(upperBoundary):
226291
elements = append(elements, element.element)
227292
default:
228293
break queueLoop // we can break since the queue is ordered
@@ -234,3 +299,25 @@ queueLoop:
234299

235300
return elements
236301
}
302+
303+
// calculateWindowStart calculates the start time of the sliding window to
304+
// which the event belongs, and the duration between the event time and the
305+
// start time of that window.
306+
func (sw *SlidingWindow[T]) calculateWindowStart(
307+
eventTime time.Time,
308+
) (time.Time, time.Duration) {
309+
if eventTime.IsZero() {
310+
return eventTime, 0
311+
}
312+
313+
// convert the event time to a Unix Nano timestamp
314+
// (nanoseconds since epoch)
315+
eventTimeNanos := eventTime.UnixNano()
316+
317+
// calculate the window start in nanoseconds
318+
delta := eventTimeNanos % sw.slidingInterval.Nanoseconds()
319+
windowStartNanos := eventTimeNanos - delta
320+
321+
return time.Unix(0, windowStartNanos).In(eventTime.Location()),
322+
time.Duration(delta)
323+
}

0 commit comments

Comments
 (0)