Class StreamingBody
- java.lang.Object
-
- org.mockserver.model.StreamingBody
-
public class StreamingBody extends Object
A chunk sink for streaming response bodies. When MockServer proxies a streaming response (Server-Sent Events or chunked without Content-Length), the response head is delivered immediately while chunks flow through this sink. The sink captures bytes into a bounded buffer (up tomaxCaptureBytes) for the event log / dashboard while forwarding every chunk to the subscribed consumer.Thread-safety:
addCompletionListener(java.lang.Runnable)may be called from a scheduler thread (e.g.HttpActionHandler.writeStreamingForwardActionResponse) whilecomplete()anderror(Throwable)are called from the Netty event loop. All mutation of thecompletedflag andcompletionListenerslist is guarded bysynchronized(lock)to ensure a listener is always fired exactly once — immediately if already completed, otherwise when completion/error occurs.The chunk path (
subscribe(java.util.function.Consumer<io.netty.buffer.ByteBuf>, java.lang.Runnable, java.util.function.Consumer<java.lang.Throwable>),addChunk(io.netty.buffer.ByteBuf),complete(),error) is serialised on the upstream channel's event loop to avoid races between chunk arrival and subscription. Any chunks that arrive beforesubscribe()are buffered asbyte[]arrays in a pending list and drained in order when the subscriber connects.
-
-
Constructor Summary
Constructors Constructor Description StreamingBody(int maxCaptureBytes)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidaddChunk(io.netty.buffer.ByteBuf chunk)Feed a chunk into the sink.voidaddCompletionListener(Runnable listener)Add a listener that is called when the stream completes (either successfully or with an error).byte[]capturedBytes()voidcomplete()Signal that the stream is complete (last chunk received).voiderror(Throwable cause)Signal that the stream was interrupted by an error.ThrowablegetError()booleanisCompleted()booleanisTruncated()voidrequestMore()Request the next chunk from the upstream channel.voidsetEventLoop(io.netty.channel.EventLoop eventLoop)Set the upstream channel's event loop.voidsetRequestMoreCallback(Runnable callback)Set a callback that is invoked to request the next chunk from the upstream channel.voidsubscribe(Consumer<io.netty.buffer.ByteBuf> onChunk, Runnable onComplete, Consumer<Throwable> onError)Subscribe to receive streaming chunks.
-
-
-
Method Detail
-
setEventLoop
public void setEventLoop(io.netty.channel.EventLoop eventLoop)
Set the upstream channel's event loop. Must be called before any chunks arrive (typically by the handler that creates the body). All chunk-path operations (subscribe(java.util.function.Consumer<io.netty.buffer.ByteBuf>, java.lang.Runnable, java.util.function.Consumer<java.lang.Throwable>),addChunk(io.netty.buffer.ByteBuf),complete(),error) are serialised on this event loop.- Parameters:
eventLoop- the upstream channel's event loop
-
subscribe
public void subscribe(Consumer<io.netty.buffer.ByteBuf> onChunk, Runnable onComplete, Consumer<Throwable> onError)
Subscribe to receive streaming chunks. If an event loop has been set viasetEventLoop(EventLoop)and the caller is not on that event loop, the subscribe body is marshalled onto the event loop to serialise with addChunk/complete/error. Any chunks that arrived before subscription are drained in order and then the first upstream read is triggered viarequestMore().- Parameters:
onChunk- called for eachByteBufchunk; the consumer must NOT release the bufferonComplete- called when the last chunk has been receivedonError- called if the stream is interrupted (channel close, timeout, etc.)
-
addChunk
public void addChunk(io.netty.buffer.ByteBuf chunk)
Feed a chunk into the sink. The chunk bytes are appended to the bounded capture buffer and forwarded to the subscriber. If no subscriber has connected yet, the full chunk bytes are copied into a pending list so they can be drained whensubscribe(java.util.function.Consumer<io.netty.buffer.ByteBuf>, java.lang.Runnable, java.util.function.Consumer<java.lang.Throwable>)runs.- Parameters:
chunk- the chunk content (caller retains ownership of the buffer)
-
complete
public void complete()
Signal that the stream is complete (last chunk received).
-
error
public void error(Throwable cause)
Signal that the stream was interrupted by an error.- Parameters:
cause- the cause of the interruption
-
addCompletionListener
public void addCompletionListener(Runnable listener)
Add a listener that is called when the stream completes (either successfully or with an error). If the stream has already completed, the listener is called immediately. This method is safe to call from any thread.- Parameters:
listener- the completion listener
-
setRequestMoreCallback
public void setRequestMoreCallback(Runnable callback)
Set a callback that is invoked to request the next chunk from the upstream channel. Used for backpressure: the downstream writer callsrequestMore()after each chunk write completes, which triggers an upstreamctx.read().- Parameters:
callback- the callback to request the next upstream chunk
-
requestMore
public void requestMore()
Request the next chunk from the upstream channel. Called by the downstream writer after a chunk write completes to implement backpressure.
-
isTruncated
public boolean isTruncated()
- Returns:
- true if the capture buffer was capped and does not contain the full body
-
isCompleted
public boolean isCompleted()
- Returns:
- true if the stream has completed (successfully or with an error)
-
getError
public Throwable getError()
- Returns:
- the error that interrupted the stream, or null if it completed normally
-
capturedBytes
public byte[] capturedBytes()
- Returns:
- the captured bytes (may be fewer than the full body if truncated)
-
-