Class StreamingBody
maxCaptureBytes) for the event log / dashboard while forwarding
every chunk to the subscribed consumer.
Thread-safety: addCompletionListener(java.lang.Runnable) may be called from a scheduler thread
(e.g. HttpActionHandler.writeStreamingForwardActionResponse) while
complete() and error(Throwable) are called from the Netty event loop.
All mutation of the completed flag and completionListeners list is
guarded by synchronized(lock) to ensure a listener is always fired exactly once —
immediately if already completed, otherwise when completion/error occurs.
The chunk path (subscribe(java.util.function.Consumer<io.netty.buffer.ByteBuf>, java.lang.Runnable, java.util.function.Consumer<java.lang.Throwable>), addChunk(io.netty.buffer.ByteBuf), complete(), error)
is serialised on the upstream channel's event loop to avoid races between chunk arrival
and subscription. Any chunks that arrive before subscribe() are buffered as
byte[] arrays in a pending list and drained in order when the subscriber connects.
-
Constructor Summary
ConstructorsConstructorDescriptionStreamingBody(int maxCaptureBytes) StreamingBody(int maxCaptureBytes, boolean captureChunkTimestamps) -
Method Summary
Modifier and TypeMethodDescriptionvoidaddChunk(io.netty.buffer.ByteBuf chunk) Feed a chunk into the sink.voidaddCompletionListener(Runnable listener) Add a listener that is called when the stream completes (either successfully or with an error).byte[]voidcomplete()Signal that the stream is complete (last chunk received).voidSignal that the stream was interrupted by an error.getError()Compute inter-chunk delays in milliseconds from the captured monotonic timestamps.booleanbooleanvoidRequest the next chunk from the upstream channel.voidsetEventLoop(io.netty.channel.EventLoop eventLoop) Set the upstream channel's event loop.voidsetRequestMoreCallback(Runnable callback) Set a callback that is invoked to request the next chunk from the upstream channel.voidsubscribe(Consumer<io.netty.buffer.ByteBuf> onChunk, Runnable onComplete, Consumer<Throwable> onError) Subscribe to receive streaming chunks.
-
Constructor Details
-
StreamingBody
public StreamingBody(int maxCaptureBytes) -
StreamingBody
public StreamingBody(int maxCaptureBytes, boolean captureChunkTimestamps) - Parameters:
maxCaptureBytes- maximum bytes to capture for the event logcaptureChunkTimestamps- when true, records aSystem.nanoTime()per chunk for later per-chunk replay timing
-
-
Method Details
-
setEventLoop
public void setEventLoop(io.netty.channel.EventLoop eventLoop) Set the upstream channel's event loop. Must be called before any chunks arrive (typically by the handler that creates the body). All chunk-path operations (subscribe(java.util.function.Consumer<io.netty.buffer.ByteBuf>, java.lang.Runnable, java.util.function.Consumer<java.lang.Throwable>),addChunk(io.netty.buffer.ByteBuf),complete(),error) are serialised on this event loop.- Parameters:
eventLoop- the upstream channel's event loop
-
subscribe
public void subscribe(Consumer<io.netty.buffer.ByteBuf> onChunk, Runnable onComplete, Consumer<Throwable> onError) Subscribe to receive streaming chunks. If an event loop has been set viasetEventLoop(EventLoop)and the caller is not on that event loop, the subscribe body is marshalled onto the event loop to serialise with addChunk/complete/error. Any chunks that arrived before subscription are drained in order and then the first upstream read is triggered viarequestMore().- Parameters:
onChunk- called for eachByteBufchunk; the consumer must NOT release the bufferonComplete- called when the last chunk has been receivedonError- called if the stream is interrupted (channel close, timeout, etc.)
-
addChunk
public void addChunk(io.netty.buffer.ByteBuf chunk) Feed a chunk into the sink. The chunk bytes are appended to the bounded capture buffer and forwarded to the subscriber. If no subscriber has connected yet, the full chunk bytes are copied into a pending list so they can be drained whensubscribe(java.util.function.Consumer<io.netty.buffer.ByteBuf>, java.lang.Runnable, java.util.function.Consumer<java.lang.Throwable>)runs.- Parameters:
chunk- the chunk content (caller retains ownership of the buffer)
-
complete
public void complete()Signal that the stream is complete (last chunk received). -
error
Signal that the stream was interrupted by an error.- Parameters:
cause- the cause of the interruption
-
addCompletionListener
Add a listener that is called when the stream completes (either successfully or with an error). If the stream has already completed, the listener is called immediately. This method is safe to call from any thread.- Parameters:
listener- the completion listener
-
setRequestMoreCallback
Set a callback that is invoked to request the next chunk from the upstream channel. Used for backpressure: the downstream writer callsrequestMore()after each chunk write completes, which triggers an upstreamctx.read().- Parameters:
callback- the callback to request the next upstream chunk
-
requestMore
public void requestMore()Request the next chunk from the upstream channel. Called by the downstream writer after a chunk write completes to implement backpressure. -
isTruncated
public boolean isTruncated()- Returns:
- true if the capture buffer was capped and does not contain the full body
-
isCompleted
public boolean isCompleted()- Returns:
- true if the stream has completed (successfully or with an error)
-
getError
- Returns:
- the error that interrupted the stream, or null if it completed normally
-
capturedBytes
public byte[] capturedBytes()- Returns:
- the captured bytes (may be fewer than the full body if truncated)
-
interChunkDelaysMillis
Compute inter-chunk delays in milliseconds from the captured monotonic timestamps. The first element is always 0 (no delay before the first chunk). Subsequent elements represent the wall-clock gap between consecutive chunk arrivals.- Returns:
- a list of inter-chunk delays in milliseconds, or
nullif timestamps were not captured (i.e.captureChunkTimestampswas false)
-