Class StreamingBody


  • public class StreamingBody
    extends Object
    A chunk sink for streaming response bodies. When MockServer proxies a streaming response (Server-Sent Events or chunked without Content-Length), the response head is delivered immediately while chunks flow through this sink. The sink captures bytes into a bounded buffer (up to maxCaptureBytes) for the event log / dashboard while forwarding every chunk to the subscribed consumer.

    Thread-safety: addCompletionListener(java.lang.Runnable) may be called from a scheduler thread (e.g. HttpActionHandler.writeStreamingForwardActionResponse) while complete() and error(Throwable) are called from the Netty event loop. All mutation of the completed flag and completionListeners list is guarded by synchronized(lock) to ensure a listener is always fired exactly once — immediately if already completed, otherwise when completion/error occurs.

    The chunk path (subscribe(java.util.function.Consumer<io.netty.buffer.ByteBuf>, java.lang.Runnable, java.util.function.Consumer<java.lang.Throwable>), addChunk(io.netty.buffer.ByteBuf), complete(), error) is serialised on the upstream channel's event loop to avoid races between chunk arrival and subscription. Any chunks that arrive before subscribe() are buffered as byte[] arrays in a pending list and drained in order when the subscriber connects.

    • Constructor Detail

      • StreamingBody

        public StreamingBody​(int maxCaptureBytes)
    • Method Detail

      • subscribe

        public void subscribe​(Consumer<io.netty.buffer.ByteBuf> onChunk,
                              Runnable onComplete,
                              Consumer<Throwable> onError)
        Subscribe to receive streaming chunks. If an event loop has been set via setEventLoop(EventLoop) and the caller is not on that event loop, the subscribe body is marshalled onto the event loop to serialise with addChunk/complete/error. Any chunks that arrived before subscription are drained in order and then the first upstream read is triggered via requestMore().
        Parameters:
        onChunk - called for each ByteBuf chunk; the consumer must NOT release the buffer
        onComplete - called when the last chunk has been received
        onError - called if the stream is interrupted (channel close, timeout, etc.)
      • complete

        public void complete()
        Signal that the stream is complete (last chunk received).
      • error

        public void error​(Throwable cause)
        Signal that the stream was interrupted by an error.
        Parameters:
        cause - the cause of the interruption
      • addCompletionListener

        public void addCompletionListener​(Runnable listener)
        Add a listener that is called when the stream completes (either successfully or with an error). If the stream has already completed, the listener is called immediately. This method is safe to call from any thread.
        Parameters:
        listener - the completion listener
      • setRequestMoreCallback

        public void setRequestMoreCallback​(Runnable callback)
        Set a callback that is invoked to request the next chunk from the upstream channel. Used for backpressure: the downstream writer calls requestMore() after each chunk write completes, which triggers an upstream ctx.read().
        Parameters:
        callback - the callback to request the next upstream chunk
      • requestMore

        public void requestMore()
        Request the next chunk from the upstream channel. Called by the downstream writer after a chunk write completes to implement backpressure.
      • isTruncated

        public boolean isTruncated()
        Returns:
        true if the capture buffer was capped and does not contain the full body
      • isCompleted

        public boolean isCompleted()
        Returns:
        true if the stream has completed (successfully or with an error)
      • getError

        public Throwable getError()
        Returns:
        the error that interrupted the stream, or null if it completed normally
      • capturedBytes

        public byte[] capturedBytes()
        Returns:
        the captured bytes (may be fewer than the full body if truncated)