Class StreamFrameBreakpointRegistry
Thread-safe: uses a ConcurrentHashMap internally. Designed to be called
from Netty event-loop threads (the chunk callback) and from scheduler/control-plane
threads (the resolve methods). NEVER blocks the calling thread — all operations are
non-blocking.
Frame ordering: frames within a single stream (identified by streamId)
are assigned monotonically increasing sequence numbers. The registry enforces that
frames are resolved in order — attempting to resolve a frame whose predecessor is still
held returns false.
DoS rail: the registry enforces a hard cap on concurrently held frames
(shared with the request/response breakpoint cap via
Configuration.breakpointMaxHeld(), default 50). When the cap is reached,
new frame breakpoints are skipped (the frame is written immediately).
Timeout rail: each paused frame auto-continues if not resolved within
Configuration.breakpointTimeoutMillis() (default 30 000 ms).
Stream-close eviction: when a stream is evicted (channel closed, error, or explicit close), all held frames for that stream are auto-continued and released.
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionReturns all active stream IDs that have held frames.entries()Returns a snapshot of all currently held frames.voidevictStream(String streamId) Evict all held frames for a given stream.framesForStream(String streamId) Returns a snapshot of all currently held frames for a specific stream.intnextSequenceNumber(String streamId) Returns the next monotonic sequence number for the given stream without parking a frame.pauseFrame(String streamId, byte[] chunkBytes, String requestMethod, String requestPath, Configuration configuration) Park a streaming response frame at a breakpoint (OUTBOUND direction).pauseFrame(String streamId, byte[] chunkBytes, String requestMethod, String requestPath, Configuration configuration, PausedStreamFrame.Direction direction) Park a streaming frame at a breakpoint with an explicit direction.voidreset()Auto-continues all held frames so their async continuations fire.booleanresolveClose(String frameId) Resolve a paused frame as CLOSE (end the stream: drop frame, send LastHttpContent).booleanresolveContinue(String frameId) Resolve a paused frame as CONTINUE (write the original frame body).booleanresolveDrop(String frameId) Resolve a paused frame as DROP (discard without writing to client).booleanresolveInject(String frameId, byte[] injectedBody) Resolve a paused frame as INJECT (write original, then also write an extra frame).booleanresolveModify(String frameId, byte[] replacementBody) Resolve a paused frame as MODIFY (write a replacement body).intsize()Number of currently held frames across all streams.
-
Constructor Details
-
StreamFrameBreakpointRegistry
public StreamFrameBreakpointRegistry()
-
-
Method Details
-
getInstance
-
pauseFrame
public PausedStreamFrame pauseFrame(String streamId, byte[] chunkBytes, String requestMethod, String requestPath, Configuration configuration) Park a streaming response frame at a breakpoint (OUTBOUND direction).If the held-frame cap is already reached, returns
null(the caller should write the frame immediately).- Parameters:
streamId- the stream correlation id (unique per forwarded streaming response)chunkBytes- a COPY of the frame payload bytes (caller retains the ByteBuf)requestMethod- the request method (for context/logging)requestPath- the request path (for context/logging)configuration- the active server configuration- Returns:
- the registered
PausedStreamFrame, ornullif the cap is reached
-
pauseFrame
public PausedStreamFrame pauseFrame(String streamId, byte[] chunkBytes, String requestMethod, String requestPath, Configuration configuration, PausedStreamFrame.Direction direction) Park a streaming frame at a breakpoint with an explicit direction.If the held-frame cap is already reached, returns
null(the caller should write/process the frame immediately).- Parameters:
streamId- the stream correlation idchunkBytes- a COPY of the frame payload bytes (caller retains the ByteBuf)requestMethod- the request method (for context/logging)requestPath- the request path (for context/logging)configuration- the active server configurationdirection- OUTBOUND (server-to-client) or INBOUND (client-to-server)- Returns:
- the registered
PausedStreamFrame, ornullif the cap is reached
-
resolveContinue
Resolve a paused frame as CONTINUE (write the original frame body).- Returns:
- true if the frame was found and resolved
-
resolveModify
Resolve a paused frame as MODIFY (write a replacement body).- Returns:
- true if the frame was found and resolved
-
resolveDrop
Resolve a paused frame as DROP (discard without writing to client).- Returns:
- true if the frame was found and resolved
-
resolveInject
Resolve a paused frame as INJECT (write original, then also write an extra frame).- Returns:
- true if the frame was found and resolved
-
resolveClose
Resolve a paused frame as CLOSE (end the stream: drop frame, send LastHttpContent).- Returns:
- true if the frame was found and resolved
-
evictStream
Evict all held frames for a given stream. Called when the channel closes, the stream ends, or on an explicit CLOSE action. Each held frame is resolved with DROP so any awaiting callbacks fire but do NOT attempt to write to the (possibly closing) channel. This prevents resource leaks, hanging futures, and out-of-order writes after LastHttpContent.- Parameters:
streamId- the stream to evict
-
nextSequenceNumber
Returns the next monotonic sequence number for the given stream without parking a frame. Used by the WS-callback dispatch path which manages its own futures but needs to share the per-stream sequence counter for consistency with the REST-park path.- Parameters:
streamId- the stream to get the next sequence number for- Returns:
- the next sequence number (0-based, monotonically increasing)
-
entries
Returns a snapshot of all currently held frames. -
framesForStream
Returns a snapshot of all currently held frames for a specific stream. -
activeStreamIds
Returns all active stream IDs that have held frames. -
size
public int size()Number of currently held frames across all streams. -
reset
public void reset()Auto-continues all held frames so their async continuations fire. Called on server reset.
-