Class StreamingBody

java.lang.Object
org.mockserver.model.StreamingBody

public class StreamingBody extends Object
A chunk sink for streaming response bodies. When MockServer proxies a streaming response (Server-Sent Events or chunked without Content-Length), the response head is delivered immediately while chunks flow through this sink. The sink captures bytes into a bounded buffer (up to maxCaptureBytes) for the event log / dashboard while forwarding every chunk to the subscribed consumer.

Thread-safety: addCompletionListener(java.lang.Runnable) may be called from a scheduler thread (e.g. HttpActionHandler.writeStreamingForwardActionResponse) while complete() and error(Throwable) are called from the Netty event loop. All mutation of the completed flag and completionListeners list is guarded by synchronized(lock) to ensure a listener is always fired exactly once — immediately if already completed, otherwise when completion/error occurs.

The chunk path (subscribe(java.util.function.Consumer<io.netty.buffer.ByteBuf>, java.lang.Runnable, java.util.function.Consumer<java.lang.Throwable>), addChunk(io.netty.buffer.ByteBuf), complete(), error) is serialised on the upstream channel's event loop to avoid races between chunk arrival and subscription. Any chunks that arrive before subscribe() are buffered as byte[] arrays in a pending list and drained in order when the subscriber connects.

  • Constructor Details

    • StreamingBody

      public StreamingBody(int maxCaptureBytes)
    • StreamingBody

      public StreamingBody(int maxCaptureBytes, boolean captureChunkTimestamps)
      Parameters:
      maxCaptureBytes - maximum bytes to capture for the event log
      captureChunkTimestamps - when true, records a System.nanoTime() per chunk for later per-chunk replay timing
  • Method Details

    • setEventLoop

      public void setEventLoop(io.netty.channel.EventLoop eventLoop)
      Set the upstream channel's event loop. Must be called before any chunks arrive (typically by the handler that creates the body). All chunk-path operations (subscribe(java.util.function.Consumer<io.netty.buffer.ByteBuf>, java.lang.Runnable, java.util.function.Consumer<java.lang.Throwable>), addChunk(io.netty.buffer.ByteBuf), complete(), error) are serialised on this event loop.
      Parameters:
      eventLoop - the upstream channel's event loop
    • subscribe

      public void subscribe(Consumer<io.netty.buffer.ByteBuf> onChunk, Runnable onComplete, Consumer<Throwable> onError)
      Subscribe to receive streaming chunks. If an event loop has been set via setEventLoop(EventLoop) and the caller is not on that event loop, the subscribe body is marshalled onto the event loop to serialise with addChunk/complete/error. Any chunks that arrived before subscription are drained in order and then the first upstream read is triggered via requestMore().
      Parameters:
      onChunk - called for each ByteBuf chunk; the consumer must NOT release the buffer
      onComplete - called when the last chunk has been received
      onError - called if the stream is interrupted (channel close, timeout, etc.)
    • addChunk

      public void addChunk(io.netty.buffer.ByteBuf chunk)
      Feed a chunk into the sink. The chunk bytes are appended to the bounded capture buffer and forwarded to the subscriber. If no subscriber has connected yet, the full chunk bytes are copied into a pending list so they can be drained when subscribe(java.util.function.Consumer<io.netty.buffer.ByteBuf>, java.lang.Runnable, java.util.function.Consumer<java.lang.Throwable>) runs.
      Parameters:
      chunk - the chunk content (caller retains ownership of the buffer)
    • complete

      public void complete()
      Signal that the stream is complete (last chunk received).
    • error

      public void error(Throwable cause)
      Signal that the stream was interrupted by an error.
      Parameters:
      cause - the cause of the interruption
    • addCompletionListener

      public void addCompletionListener(Runnable listener)
      Add a listener that is called when the stream completes (either successfully or with an error). If the stream has already completed, the listener is called immediately. This method is safe to call from any thread.
      Parameters:
      listener - the completion listener
    • setRequestMoreCallback

      public void setRequestMoreCallback(Runnable callback)
      Set a callback that is invoked to request the next chunk from the upstream channel. Used for backpressure: the downstream writer calls requestMore() after each chunk write completes, which triggers an upstream ctx.read().
      Parameters:
      callback - the callback to request the next upstream chunk
    • requestMore

      public void requestMore()
      Request the next chunk from the upstream channel. Called by the downstream writer after a chunk write completes to implement backpressure.
    • isTruncated

      public boolean isTruncated()
      Returns:
      true if the capture buffer was capped and does not contain the full body
    • isCompleted

      public boolean isCompleted()
      Returns:
      true if the stream has completed (successfully or with an error)
    • getError

      public Throwable getError()
      Returns:
      the error that interrupted the stream, or null if it completed normally
    • capturedBytes

      public byte[] capturedBytes()
      Returns:
      the captured bytes (may be fewer than the full body if truncated)
    • interChunkDelaysMillis

      public List<Long> interChunkDelaysMillis()
      Compute inter-chunk delays in milliseconds from the captured monotonic timestamps. The first element is always 0 (no delay before the first chunk). Subsequent elements represent the wall-clock gap between consecutive chunk arrivals.
      Returns:
      a list of inter-chunk delays in milliseconds, or null if timestamps were not captured (i.e. captureChunkTimestamps was false)