-
Notifications
You must be signed in to change notification settings - Fork 463
Add Replay Transaction Capture Support for Pass-through HTTP Transport #2430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
WalkthroughThis PR adds a replay transaction feature to the HTTP transport: it captures HTTP request payloads during writes, buffers ReplayRecord entries in a bounded queue, and asynchronously persists them via a pluggable ReplayDataWriter implementation; feature flags and sizing are configurable via passthru-http.properties. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant LoggingNHttpClientConnection
participant ReplayIOSession
participant ReplayByteChannel
participant ReplayDispatcher
participant ReplayDataWriter
Client->>LoggingNHttpClientConnection: send I/O request
alt REPLAY_TXN_ENABLED = true
LoggingNHttpClientConnection->>ReplayIOSession: bind session
end
Client->>ReplayIOSession: write(ByteBuffer)
ReplayIOSession->>ReplayByteChannel: write(ByteBuffer)
rect rgb(230,240,255)
Note over ReplayByteChannel: Capture Phase\n(extract MessageContext, check method/flag, duplicate buffer, update chunk order, build metadata)
end
ReplayByteChannel->>ReplayDispatcher: addReplayRecord(messageId, metadata, data)
rect rgb(230,255,230)
Note over ReplayDispatcher: Async Persistence Phase\n(enqueue, worker threads poll and call ReplayDataWriter.write)
end
ReplayDataWriter->>ReplayDataWriter: persist record
ReplayDispatcher-->>ReplayByteChannel: ack (async)
ReplayByteChannel-->>Client: return write result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (6)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDataWriter.java (1)
36-58: Document thread‑safety expectations for ReplayDataWriter implementationsGiven how
ReplayDispatcheruses this interface, a singleReplayDataWriterinstance is shared across multiple worker threads. Each worker callswrite(...)concurrently.It would be good to make this explicit in the contract to prevent subtle bugs in custom implementations (e.g., using non‑thread‑safe writers or mutable buffers):
- Clarify in the class‑level Javadoc that implementations must be thread‑safe, as
writemay be invoked concurrently by multiple threads.- Optionally, mention that any internal buffering or resource management should be designed for concurrent access.
This small doc update will save implementers from accidental data races when plugging in file/DB/telemetry writers.
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughConstants.java (1)
19-23: Replay constants and allowed methods look consistent (optional reuse of existing HTTP_ constants)*The new replay configuration keys and
ALLOWED_WRITE_METHODS/CHUNK_ORDER_PROPERTYare consistent with the rest ofPassThroughConstantsand match how replay components are wired.If you want to avoid literal duplication, you could optionally reuse the existing HTTP method constants defined earlier in this class:
- public static final Set<String> ALLOWED_WRITE_METHODS = - Collections.unmodifiableSet(new HashSet<>(Arrays.asList("POST", "PUT", "PATCH", "DELETE"))); + public static final Set<String> ALLOWED_WRITE_METHODS = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(HTTP_POST, HTTP_PUT, "PATCH", HTTP_DELETE)));But functionally the current form is fine.
Also applies to: 286-313
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDispatcher.java (1)
161-173: Current code is safe; the ByteBuffer assumptions are always satisfiedThe calling pattern in
ReplayIOSession.java:257guarantees that thebyteWrittenassumption is always met. The buffer is positioned such thatremaining() == byteWrittenexactly (not just ≤), and the position advancement is harmless sincecopyBufferis a duplicate that's not reused.The review's suggestion to document these constraints remains valuable for maintainability and clarity, but the code is not currently broken. The optional defensive check is a reasonable guard against future misuse if the calling pattern changes.
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayIOSession.java (3)
41-182: Wrapper and delegation logic looks consistent with IOSession expectationsThe constructor, ID generation, and the various delegating IOSession methods (status, addresses, event mask, timeout, attributes, close/shutdown, buffer status) are straightforward and mirror the underlying
IOSessionbehavior while adding debug logging. Nothing blocking here.If you want to tighten things slightly, you could:
- Declare
COUNTasprivate static final AtomicLong COUNTto signal immutability.- Consider using a logger for
ReplayIOSession.classinstead of the wrapped session’s class if you prefer a clear category for replay‑related logs.Both are minor style tweaks, not required changes.
228-257: Buffer slicing logic is correct but metadata collection may be expensive per chunkTwo focused points on this block:
- Buffer duplication and windowing
ByteBuffer copyBuffer = src.duplicate(); int currentPos = copyBuffer.position(); copyBuffer.limit(currentPos); copyBuffer.position(currentPos - byteWritten);Given that
duplicate()is called after the write,currentPosisoldPosition + byteWritten, so settingposition(currentPos - byteWritten)andlimit(currentPos)correctly creates a window over exactly the bytes just written. AssumingReplayDispatcher.addReplayRecordcopies out the bytes synchronously (which is implied by the immutableReplayRecorddesign), this approach is correct and safe.No change needed here.
- Per‑chunk full metadata map construction
Map<String, Object> metadata = new HashMap<>(); for (Iterator<String> it = mc.getPropertyNames(); it.hasNext(); ) { String key = it.next(); Object value = mc.getProperty(key); if (value != null) { metadata.put(key, value); } } ReplayDispatcher.getInstance().addReplayRecord(copyBuffer, byteWritten, messageID, metadata);Building a fresh
HashMapcontaining allMessageContextproperties on everywritecall can be costly for:
- Large messages split into many small chunks.
- MessageContexts with many properties (synapse / transport / mediation metadata).
- Custom replay writers that only need a subset of metadata.
If replay is enabled on high‑throughput APIs, this can turn into a noticeable CPU and allocation hot path.
Depending on your requirements, consider one of these approaches:
- Capture full metadata only on the first chunk (e.g.,
chunkOrder == 1), and for subsequent chunks either:
- Reuse the same metadata map, or
- Only include chunk‑specific fields (like
CHUNK_ORDER_PROPERTY, content length, etc.).- Whitelist essential keys (e.g., message ID, transport headers, API/resource identifiers, replay enable flag, HTTP method) instead of copying every property.
This is a performance/footprint optimization rather than a correctness issue, but it will matter if replay capture is widely enabled.
267-270: Minor helper method tweaks for clarity and consistency
isAllowedWriteMethodis clear and does the right thing:private boolean isAllowedWriteMethod(String method) { if (method == null) return false; return PassThroughConstants.ALLOWED_WRITE_METHODS.contains(method.toUpperCase()); }Two optional improvements:
- Use
method.toUpperCase(Locale.ROOT)to avoid locale‑dependent behavior.- Consider inlining the null check into a single return for slightly tighter code:
return method != null && PassThroughConstants.ALLOWED_WRITE_METHODS.contains(method.toUpperCase(Locale.ROOT));Purely cosmetic; current implementation is correct.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/LoggingNHttpClientConnection.java(4 hunks)modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDataWriter.java(1 hunks)modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDispatcher.java(1 hunks)modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayIOSession.java(1 hunks)modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayRecord.java(1 hunks)modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughConstants.java(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayIOSession.java (3)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughConstants.java (1)
PassThroughConstants(24-314)modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetContext.java (1)
TargetContext(34-258)modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDispatcher.java (1)
ReplayDispatcher(45-230)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDispatcher.java (1)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughConstants.java (1)
PassThroughConstants(24-314)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/LoggingNHttpClientConnection.java (2)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughConstants.java (1)
PassThroughConstants(24-314)modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/TargetHandler.java (1)
TargetHandler(68-1307)
🔇 Additional comments (1)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayIOSession.java (1)
184-193: Read path interception is minimal and correctly delegated
ReplayByteChannel.readcleanly delegates tosession.channel().read(dst)and only adds debug logging of bytes read. There is no replay side‑effect on the inbound path, which matches the stated goal of capturing outbound request content only.No issues from a correctness or performance standpoint here.
...nhttp/src/main/java/org/apache/synapse/transport/http/conn/LoggingNHttpClientConnection.java
Show resolved
Hide resolved
...sports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDispatcher.java
Outdated
Show resolved
Hide resolved
...nsports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayIOSession.java
Show resolved
Hide resolved
...transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayRecord.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDispatcher.java (3)
38-44: Javadoc promises shutdown behavior that is not implemented.The Javadoc states the class "supports graceful shutdown by closing the writer and stopping the background thread," but no
shutdown()method exists and there is no mechanism to signal workers to stop or retain the executor for lifecycle management.
179-228: Critical: Worker loop exits prematurely, breaking replay functionality.The worker loop condition at line 195 (
while (!replayQueue.isEmpty())) will exit as soon as the queue is empty, which will typically happen immediately at startup or whenever the queue drains. Once workers exit:
- New replay records added to the queue will never be processed
replayDataWriter.close()at line 221 will be called prematurely (when queue first becomes empty, not at shutdown)- No mechanism exists to restart workers or keep them alive
Additionally:
- The
ExecutorServiceis stored only in a local variable (line 181), so there's no way to shut it down properly during transport lifecycle- Lines 193-227 manually submit
CORE_POOL_SIZEtasks, which is not the correct pattern forThreadPoolExecutor(the executor creates threads on demand)- Lines 222, 224: Log messages reference "BufferDataWriter" instead of "ReplayDataWriter"
Apply this comprehensive fix:
public class ReplayDispatcher { private static final Log log = LogFactory.getLog(ReplayDispatcher.class); private static volatile ReplayDispatcher instance; + private volatile boolean running = true; private final AtomicLong droppedCount = new AtomicLong(0); private final ReplayDataWriter replayDataWriter; + private final ExecutorService replayWorkerThreadPool; private ReplayDispatcher(ReplayDataWriter replayDataWriter) { this.replayDataWriter = replayDataWriter; - log.info("Initializing OverflowBufferManager with buffer size: " + REPLAY_MAX_BUFFER_SIZE); - startReplayWorker(); + log.info("Initializing ReplayDispatcher with buffer size: " + REPLAY_MAX_BUFFER_SIZE); + this.replayWorkerThreadPool = startReplayWorker(); } + /** + * Initiates graceful shutdown: stops accepting new records, drains the queue, + * and closes the underlying ReplayDataWriter. + */ + public void shutdown() { + running = false; + replayWorkerThreadPool.shutdown(); + try { + if (!replayWorkerThreadPool.awaitTermination(30, TimeUnit.SECONDS)) { + replayWorkerThreadPool.shutdownNow(); + } + } catch (InterruptedException e) { + replayWorkerThreadPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } - private void startReplayWorker() { + private ExecutorService startReplayWorker() { ExecutorService replayWorkerThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME_MILLIS, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() { private final AtomicLong threadIndex = new AtomicLong(1); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "ReplayTransaction-Writer-" + threadIndex.getAndIncrement()); thread.setDaemon(true); return thread; } }); - for (int i = 0; i < ReplayDispatcher.CORE_POOL_SIZE; i++) { - replayWorkerThreadPool.submit(() -> { - while (!replayQueue.isEmpty()) { - ReplayRecord replayRecord = replayQueue.poll(); - if (replayRecord != null) { + // Submit a single long-running task per core thread; executor will scale up to MAX_POOL_SIZE under load + for (int i = 0; i < CORE_POOL_SIZE; i++) { + replayWorkerThreadPool.execute(() -> { + while (running) { + try { + // Blocking poll with timeout to allow periodic running flag check + ReplayRecord replayRecord = replayQueue.poll(REPLAY_BUFFER_POLL_INTERVAL, TimeUnit.MILLISECONDS); + if (replayRecord != null) { - try { - replayDataWriter.write(replayRecord); - if (log.isDebugEnabled()) { - log.debug("Successfully wrote buffer data for messageID: " + replayRecord.getMessageId()); - } - } catch (IOException e) { - log.error("Failed to write buffered data for messageID: " + replayRecord.getMessageId(), e); - } - } else { - // No data, prevent tight spinning - try { - Thread.sleep(REPLAY_BUFFER_POLL_INTERVAL); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - if (log.isDebugEnabled()) { - log.debug("ReplayTransaction-BufferDataWriter thread interrupted during sleep"); + try { + replayDataWriter.write(replayRecord); + if (log.isDebugEnabled()) { + log.debug("Successfully wrote replay data for messageID: " + replayRecord.getMessageId()); + } + } catch (IOException e) { + log.error("Failed to write replay data for messageID: " + replayRecord.getMessageId(), e); } - // Exit loop if interrupted while stopping - break; } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (log.isDebugEnabled()) { + log.debug("ReplayTransaction-Writer thread interrupted"); + } + break; } } + + // Close writer after worker exits (when running becomes false) try { replayDataWriter.close(); - log.info("ReplayTransaction-BufferDataWriter closed BufferDataWriter cleanly"); + log.info("ReplayTransaction-Writer closed ReplayDataWriter cleanly"); } catch (IOException e) { - log.error("Failed to close BufferDataWriter during thread termination", e); + log.error("Failed to close ReplayDataWriter during shutdown", e); } }); } + return replayWorkerThreadPool; } }Also ensure that
ReplayDispatcher.shutdown()is called from the transport lifecycle (e.g., in the HTTP transport'sdestroy()orstop()method).
81-85: Fix log message to use correct class name.The log message references "OverflowBufferManager," but the class is named
ReplayDispatcher.Apply this diff:
- log.info("Initializing OverflowBufferManager with buffer size: " + REPLAY_MAX_BUFFER_SIZE); + log.info("Initializing ReplayDispatcher with buffer size: " + REPLAY_MAX_BUFFER_SIZE);
🧹 Nitpick comments (2)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDispatcher.java (2)
56-72: Consider adding error handling for configuration property parsing.The configuration properties are parsed using
Integer.parseInt()andLong.parseLong()without try-catch blocks. Malformed property values will causeNumberFormatExceptionto propagate uncaught, resulting in unclear failure modes during transport initialization.Wrap parsing in try-catch blocks and provide meaningful error messages with property names and values, or use helper methods that supply defaults on parse failure.
168-172: ByteBuffer mutation side effect.Line 170 advances the buffer's position via
dataBuffer.get(data). If callers expect to reuse the buffer afterward, this side effect could cause issues.Consider using
dataBuffer.duplicate().get(data)to avoid mutating the original buffer position, or document the side effect in the method Javadoc.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDispatcher.java(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDispatcher.java (1)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/passthru/PassThroughConstants.java (1)
PassThroughConstants(24-314)
🔇 Additional comments (1)
modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/http/conn/ReplayDispatcher.java (1)
140-158: LGTM!The drop handling logic is well-designed: non-blocking
offer()for bounded queue semantics, atomic drop counter, and frequency-based logging to avoid log spam during overflow conditions.
Description
This PR adds the capability to capture and replay HTTP transactions in the WSO2 Pass-through HTTP transport layer. The feature enables buffering HTTP request data along with metadata and asynchronously writing the replay records via a pluggable writer component.
Key Functionality:
Replay Transaction Integration
Major Code Additions:
ReplayIOSession.javaWraps existing I/O sessions and intercepts write operations to extract message data for replay recording.
ReplayDispatcher.javaBuffers replay records in a configurable queue and writes them asynchronously with overflow handling and thread‑safe shutdown support.
ReplayDataWriter.javaInterface defining a generic persistence contract for replay records, enabling custom outputs such as files or external systems.
ReplayRecord.javaImmutable holder for the message ID, metadata, and payload of each replay transaction.
Key Highlights:
deployment.tomlconfiguration to allow on/off toggling.Deployment Configuration
Add replay configuration under
[transport.http]indeployment.toml:Replace
<FULLY_QUALIFIED_CLASS_NAME>with a custom writer class such as"org.wso2.carbon.test.CustomFileReplayWriter"for own implementation.Maven Bundle Sample (pom.xml)
Lightweight template for custom writer bundle:
Usage - Enable/Disable Per API or Resource
Add the following mediation policy to selectively enable the replay feature on APIs or resources as needed:
Attach this as a policy to API/resource flows where replay recording is required.
Additional File Changes
Default property values and toml property mappings will be added to the
default.jsonandkey-mapping.jsonfiles.Combined, these changes introduce a replay transaction framework inside the HTTP transport layer, offering:
This feature enhances observability and troubleshooting within the API Gateway runtime, enabling detailed traffic replay without impacting request latency or throughput.
Summary by CodeRabbit