Skip to content

Support custom timeout strategies in StreamMessage.timeout(), with built-in support for a token-bucket timeout-limiting strategy #6237

@sjy982

Description

@sjy982

Currently, StreamMessage.timeout() only supports fixed timeouts.
I’d like to propose adding support for custom timeout strategies via a StreamTimeoutStrategy interface.

Users would implement their own strategy and pass it to timeout(...), for example:

public interface StreamTimeoutStrategy {
     /**
     * Compares the current time (`currentTimeNanos`) with the last event time (`lastEventTimeNanos`)
     * and evaluates whether a timeout has occurred and when the next check should be scheduled.
     */
    StreamTimeoutDecision evaluateTimeout(long currentTimeNanos, long lastEventTimeNanos);
}
public final class StreamTimeoutDecision {
    private static final long NO_NEXT_TIMEOUT = -1L;
    private final boolean timedOut;
    private final long nextScheduleTimeNanos;
}

Basically, I would like to suggest the following two strategies.

  1. Default mode (StreamTimeoutMode + duration)

    StreamMessage.timeout(StreamTimeoutMode.UNTIL_NEXT, Duration.ofSeconds(10));
    // Internally:
    new DefaultTimeoutStrategy(StreamTimeoutMode.UNTIL_NEXT, timeoutNanos);

  2. A strategy based on the token bucket rate-limiting algorithm. (token-bucket timeout-limiting strategy)
    Allows occasional network jitter without terminating the stream, but triggers a timeout when delays occur consecutively.
    This prevents unnecessary stream termination due to transient network hiccups.

    StreamMessage.timeout(new TokenBucketTimeoutStrategy(...));

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions