Skip to content

feat: HTTP streaming serialization for large responses #39

@prestwich

Description

@prestwich

[Claude Code]

Problem

Large JSON-RPC responses (e.g., eth_getLogs returning thousands of entries) currently materialize the entire serialized result as a Box<RawValue> — a complete in-memory string — before any bytes are sent to the client. For a 500MB response, that's 500MB of heap.

Proposal

Add streaming serialization — writing the JSON-RPC response incrementally to the HTTP body via chunked transfer encoding. The response is still a complete, spec-compliant JSON-RPC 2.0 response. The chunks are a transport-level concern, invisible at the protocol level.

User-facing change: route_streaming() instead of route() for methods that may produce large responses. Same handler signatures. The framework handles everything else.

use ajj::{Router, StreamArray};

async fn get_logs(
    params: GetLogsParams,
    state: DbHandle,
) -> Result<StreamArray<impl Iterator<Item = Log>>, MyError> {
    let logs = state.query_logs(&params)?;
    Ok(StreamArray(logs.into_iter()))
}

let router = Router::<DbHandle>::new()
    .route("eth_blockNumber", get_block_number)      // normal
    .route_streaming("eth_getLogs", get_logs)         // streaming
    .with_state(db);

Over HTTP: eth_getLogs writes the response incrementally via chunked transfer encoding. Over WebSocket/IPC: it materializes normally (WS requires complete messages per JSON-RPC spec).

Design

Core idea

  1. Add write_to(&mut dyn Write) to RpcSend — write-based serialization with a default impl that round-trips through into_raw_value. The blanket impl for Serialize types overrides this to use serde_json::to_writer (no intermediate String).
  2. Add StreamArray<I> helper — wraps an iterator, serializes elements incrementally to the writer.
  3. Add StreamingHandler<T, S> trait — parallel to Handler, returns a deferred (unserialized) payload instead of Box<RawValue>.
  4. Extend Method<S> and Routerroute_streaming() registration, streaming-aware dispatch.
  5. Axum handler — detects streaming methods on single requests, writes JSON-RPC envelope + payload incrementally to chunked HTTP body via DuplexStream + spawn_blocking + SyncIoBridge.
  6. Non-HTTP transports — streaming methods fall back to materialization automatically.

Why a separate StreamingHandler trait

The existing Handler<T, S> trait constrains Future::Output = Option<Box<RawValue>>. Serialization happens inside the handler's future (in impl_handler_call!(@finish) which calls Response::build_response()), so by the time the transport sees the result, it's fully materialized. To defer serialization, we need a different future output type, which requires a separate trait.

The user's handler closures are identical — same signatures, same return types. Only the registration call changes (route_streaming vs route).

What does NOT change

  • The Handler<T, S> trait
  • The Route / RouteFuture / BatchFuture types
  • All existing handler blanket impls
  • All existing transport code paths
  • The ResponsePayload / ErrorPayload types
  • Cargo.toml dependencies (tokio-util with ["io", "rt"] is already non-optional)

Implementation plan

Phase 1: Core types (no behavior change)

src/primitives.rswrite_to on RpcSend + StreamArray<I>

Add provided method to RpcSend:

fn write_to(self, writer: &mut dyn std::io::Write) -> serde_json::Result<()>
where Self: Sized
{
    let rv = self.into_raw_value()?;
    writer.write_all(rv.get().as_bytes()).map_err(serde_json::Error::io)
}

Override in blanket impl to use serde_json::to_writer. Add StreamArray<I> with custom write_to that iterates and serializes one element at a time.

src/types/resp/streaming.rs (new) — DeferredPayload + StreamingOutput

pub(crate) trait DeferredPayload: Send {
    fn write_response(self: Box<Self>, id: &RawValue, writer: &mut dyn Write) -> serde_json::Result<()>;
    fn materialize(self: Box<Self>, id: &RawValue) -> Box<RawValue>;
}

pub(crate) enum StreamingOutput {
    Deferred { id: Box<RawValue>, payload: Box<dyn DeferredPayload> },
    Materialized(Box<RawValue>),
}

DeferredPayload impl for ResponsePayload<T, E>:

  • write_response: writes {"jsonrpc":"2.0","id":ID,"result": then calls T::write_to(writer), then }. For errors, serializes error payload normally.
  • materialize: delegates to existing Response::build_response.

Phase 2: Streaming handler infrastructure

src/macros.rsimpl_streaming_handler_call!

New macro parallel to impl_handler_call!. Identical structure but @finish wraps ResponsePayload<T, E> in Box<dyn DeferredPayload> and returns Option<StreamingOutput> instead of calling Response::build_response().

src/routes/streaming.rs (new) — StreamingHandler trait + type erasure

pub(crate) trait StreamingHandler<T, S>: Clone + Send + Sync + Sized + 'static {
    type Future: Future<Output = Option<StreamingOutput>> + Send + 'static;
    fn call_with_state(self, args: HandlerArgs, state: S) -> Self::Future;
}

8 blanket impls (4 argument patterns × 2 return types):

Arguments Result ResponsePayload
(P)
(HandlerCtx, P)
(P, S)
(HandlerCtx, P, S)

Type erasure following existing erased.rs pattern: ErasedIntoStreamingRoute<S>, BoxedIntoStreamingRoute<S>, MakeErasedStreamingHandler<H, S>.

StreamingRoute is NOT a tower Service — no middleware benefit for an HTTP-only streaming path. Simple cloneable boxed async fn.

src/routes/method.rs — Extend Method<S>

pub(crate) enum Method<S> {
    Needs(BoxedIntoRoute<S>),
    Ready(Route),
    StreamingNeeds(BoxedIntoStreamingRoute<S>),
    StreamingReady(StreamingRoute),
}
  • is_streaming(), with_state() update, call_streaming_with_state()
  • Existing call_with_state() for streaming variants materializes the deferred output (ensures WS/IPC work automatically)

Phase 3: Router integration

src/router.rsroute_streaming() + streaming dispatch

pub fn route_streaming<H, T>(self, method: impl Into<Cow<'static, str>>, handler: H) -> Self
where
    H: StreamingHandler<T, S>,
    T: Send + 'static,
    S: Clone + Send + Sync + 'static,
  • is_streaming_method() and call_streaming_with_state() on RouterInner
  • Update nest(), merge(), with_state() to handle new Method variants

Phase 4: Axum HTTP streaming

src/axum.rs — Streaming response path

After parsing InboundData:

  1. Peek: req.single() + iterate to check method name + is_streaming_method(). InboundData::iter() borrows, so req remains available for fallback.
  2. If streaming:
    • Re-parse single request, create HandlerArgs, call call_streaming_with_state
    • For StreamingOutput::Deferred { id, payload }:
      • tokio::io::duplex(65536) creates bounded pipe
      • spawn_blocking: SyncIoBridge wraps write half, payload.write_response(&id, &mut bridge)
      • ReaderStream wraps read half → Body::from_stream() → chunked HTTP response
    • For StreamingOutput::Materialized(rv): existing Box<str> response path
  3. If not streaming: existing call_batch_with_state path unchanged

Phase 5: Tests and documentation

  • Unit tests: StreamArray::write_to, StreamArray::into_raw_value, DeferredPayload::write_response vs materialize equivalence
  • Integration test (axum feature): streaming handler end-to-end over HTTP
  • Rustdoc for StreamArray, route_streaming(), RpcSend::write_to

Notes

  • Batch requests: streaming methods in batch requests materialize (batch responses require a complete JSON array). Only single HTTP requests get streaming.
  • Error mid-stream: if write_response fails, the DuplexStream writer drops, the HTTP response terminates with incomplete JSON (no final zero-length chunk). The client detects this as a failed transfer. Same behavior as any server crash mid-response.
  • Backpressure: DuplexStream buffer (64KB) backpressures the serializer via SyncIoBridge. If the client reads slowly, the spawn_blocking thread blocks. This is expected and acceptable.
  • OTEL metrics: @sent byte count for streaming responses is deferred/omitted initially. Can be added via a CountingWriter wrapper in a follow-up.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions