Skip to content

Commit ec0e375

Browse files
committed
add deadlock timer
1 parent f6ee4a5 commit ec0e375

File tree

1 file changed

+16
-1
lines changed

1 file changed

+16
-1
lines changed

x/batcher/batcher.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func NewDestination[T any](f Flusher[T], e ErrorHandler[T], opts ...OptFunc) *De
125125
cfg.StopTimeout = 0
126126
}
127127

128-
return &Destination[T]{
128+
d := &Destination[T]{
129129
flushlen: cfg.FlushLength,
130130
flushq: make(chan struct{}, cfg.FlushParallelism),
131131
flusher: f,
@@ -139,6 +139,9 @@ func NewDestination[T any](f Flusher[T], e ErrorHandler[T], opts ...OptFunc) *De
139139
messages: make(chan msgAck[T]),
140140
}
141141

142+
slog.Info(fmt.Sprintf("batcher init, dest: %p, msgs: %p", d, d.messages))
143+
144+
return d
142145
}
143146

144147
type msgAck[T any] struct {
@@ -188,10 +191,16 @@ func (d *Destination[T]) Run(ctx context.Context) error {
188191
}
189192
d.syncMu.Unlock()
190193

194+
deadlockTimer := time.NewTimer(5 * time.Minute)
195+
191196
var err error
192197
loop:
193198
for {
194199
select {
200+
case <-deadlockTimer.C:
201+
slog.Info(fmt.Sprintf("batcher deadlock, dest: %p, msgs: %p", d, d.messages))
202+
return errDeadlock
203+
195204
case msg := <-d.messages: // Here
196205
d.count++
197206
if setTimer {
@@ -200,6 +209,12 @@ loop:
200209
time.AfterFunc(d.flushfreq, func() {
201210
epochC <- epc // Here
202211
})
212+
213+
if !deadlockTimer.Stop() {
214+
<-deadlockTimer.C
215+
}
216+
deadlockTimer.Reset(5 * time.Minute)
217+
203218
setTimer = false
204219
}
205220
d.buf = append(d.buf, msg)

0 commit comments

Comments
 (0)