From 0b19a8045f937aa8dc679d1ac1c59facc1c1b223 Mon Sep 17 00:00:00 2001 From: Ankur Shrivastava Date: Mon, 18 May 2026 11:11:51 +0800 Subject: [PATCH] docs: Server-Sent Events how-to + built-in SSE marshaler references MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds howto/server-sent-events.md as a dedicated page for browser-consumable streaming over the gateway — covers wire format, EventSource and curl usage, handler patterns for AI/LLM workloads (context cancellation, TTFT), opt-out via DISABLE_SSE_MARSHALER, custom marshaler registration, and common pitfalls (compression, proxy buffering, EventSource GET-only, gateway message wrapping). Updates cross-cutting pages: - config-reference.md gains a DISABLE_SSE_MARSHALER row in HTTP Gateway. - streaming-rpcs.md gateway section now lists SSE as a default behaviour alongside NDJSON, with a Related link to the new page. - gateway-extensions.md built-ins list calls out the auto-registered text/event-stream marshaler and how to override it. Nav order shifts: server-sent-events at 22, database/cache/messaging bumped one each. Playwright pages.ts updated to match. --- config-reference.md | 1 + howto/cache.md | 2 +- howto/database.md | 2 +- howto/gateway-extensions.md | 1 + howto/messaging.md | 2 +- howto/server-sent-events.md | 156 ++++++++++++++++++++++++++++++++++++ howto/streaming-rpcs.md | 6 +- tests/pages.ts | 7 +- 8 files changed, 169 insertions(+), 8 deletions(-) create mode 100644 howto/server-sent-events.md diff --git a/config-reference.md b/config-reference.md index 7d9fcd5..1f3ef8a 100644 --- a/config-reference.md +++ b/config-reference.md @@ -89,6 +89,7 @@ To allow connections to remain open indefinitely, set both `GRPC_SERVER_MAX_CONN | `DISABLE_DEBUG` | bool | `false` | Disable pprof debug endpoints at `/debug/` | | `USE_JSON_BUILTIN_MARSHALLER` | bool | `false` | Use `encoding/json` instead of the default protojson marshaller for `application/json` | | `JSON_BUILTIN_MARSHALLER_MIME` | string | `application/json` | Content-Type for the JSON builtin marshaller | +| `DISABLE_SSE_MARSHALER` | bool | `false` | Disable the auto-registered `text/event-stream` marshaler. When `false` (default), server-streaming RPCs are consumable as Server-Sent Events by clients that send `Accept: text/event-stream` — see [Server-Sent Events](/howto/server-sent-events/) | | `HTTP_HEADER_PREFIXES` | []string | `""` | HTTP header prefixes to forward as gRPC metadata (comma-separated) | | `TRACE_HEADER_NAME` | string | `x-trace-id` | HTTP header name for trace ID propagation to log/trace contexts | | `DISABLE_HTTP_COMPRESSION` | bool | `false` | Disable gzip/zstd compression for HTTP gateway responses | diff --git a/howto/cache.md b/howto/cache.md index 3e36759..2454c15 100644 --- a/howto/cache.md +++ b/howto/cache.md @@ -2,7 +2,7 @@ layout: default title: "Cache" parent: "How To" -nav_order: 23 +nav_order: 24 description: "How to wire a Redis or Valkey cache into a ColdBrew service: client init in PreStart, drain in Stop, cache-aside, and tracing via NewDatastoreSpan" --- # Cache diff --git a/howto/database.md b/howto/database.md index dc0b203..1d0b2d3 100644 --- a/howto/database.md +++ b/howto/database.md @@ -2,7 +2,7 @@ layout: default title: "Database" parent: "How To" -nav_order: 22 +nav_order: 23 description: "How to wire a database connection pool into a ColdBrew service: pool init in PreStart, drain in Stop, and tracing via NewDatastoreSpan. Library-agnostic with a pgx Postgres example" --- # Database diff --git a/howto/gateway-extensions.md b/howto/gateway-extensions.md index 933c6fb..dfddced 100644 --- a/howto/gateway-extensions.md +++ b/howto/gateway-extensions.md @@ -38,6 +38,7 @@ Registered options are applied **after** ColdBrew's built-ins. Built-ins include - The incoming-header matcher derived from `HTTP_HEADER_PREFIXES` - Marshalers for `application/proto` and `application/protobuf` - The internal `spanRouteMiddleware` (sets the OTEL span name + `http.route` attribute) +- A `text/event-stream` marshaler (`core.SSEMarshaler`) so server-streaming RPCs are browser `EventSource`-consumable out of the box. Set `DISABLE_SSE_MARSHALER=true` to suppress, or register your own marshaler for `text/event-stream` to override the default. See [Server-Sent Events](/howto/server-sent-events/) for the framing details - Optionally the JSON builtin marshaler when `USE_JSON_BUILTIN_MARSHALLER=true` Because grpc-gateway's option model is last-write-wins for some options and additive for others, the practical effect is: diff --git a/howto/messaging.md b/howto/messaging.md index 7474c93..b2aa1eb 100644 --- a/howto/messaging.md +++ b/howto/messaging.md @@ -2,7 +2,7 @@ layout: default title: "Messaging" parent: "How To" -nav_order: 24 +nav_order: 25 description: "How to run Kafka or NATS consumers in a ColdBrew service via workers.Worker, with graceful drain on shutdown and built-in tracing" --- # Messaging diff --git a/howto/server-sent-events.md b/howto/server-sent-events.md new file mode 100644 index 0000000..034e218 --- /dev/null +++ b/howto/server-sent-events.md @@ -0,0 +1,156 @@ +--- +layout: default +title: "Server-Sent Events (SSE)" +parent: "How To" +nav_order: 22 +description: "Server-Sent Events over the ColdBrew HTTP gateway — browser-consumable streaming for AI/LLM tokens, progress feeds, and live updates with built-in EventSource support and per-token cancellation" +--- +# Server-Sent Events (SSE) + +## Table of contents +{: .no_toc .text-delta } + +1. TOC +{:toc} + +--- + +ColdBrew exposes every server-streaming gRPC method as Server-Sent Events for free. A browser `EventSource(...)` can consume any `rpc Foo(Req) returns (stream Resp)` endpoint directly — no per-service wiring, no proto changes, no custom HTTP handler. This is the path of least resistance for AI/LLM token streams, progress feeds, change notifications, and any other server → client push that benefits from staying on plain HTTP. + +The marshaler is registered by default. There is nothing to import in your service code. Clients pick SSE by sending `Accept: text/event-stream`; everything else continues to receive newline-delimited JSON as before. + +## When to use SSE + +| Use SSE for | Use something else for | +|---|---| +| AI/LLM token-by-token streaming | One-shot responses (unary RPC) | +| Server → client push (notifications, live counters, progress) | Bidirectional or high-frequency client → server messaging (WebSocket) | +| Browser clients you don't want to ship a gRPC-web library for | Server-to-server streams (use native gRPC) | +| Anything where a `curl -N` or `EventSource(...)` consumer is good enough | Binary streams (use the proto/protobuf gateway marshaler) | + +If you need true bidi over HTTP, SSE is the wrong primitive — register a WebSocket handler via [HTTP Gateway Extensions](/howto/gateway-extensions). For server-only push, prefer SSE: it reuses your existing gRPC stream + gateway plumbing instead of doubling the surface area. + +## Wire format + +Each streamed gRPC message becomes one SSE frame: + +``` +data: {"result":{"token":"hello","index":0}} + +data: {"result":{"token":"world","index":1}} + +``` + +Two newlines (`\n\n`) terminate a frame, matching the SSE spec. The JSON payload uses protojson (same field naming and well-known-type handling as the gateway's default `application/json` responses). + +{: .note } +grpc-gateway wraps server-streaming responses in `{"result": }` over HTTP — this is the documented gateway convention, not an SSE artifact. Native gRPC clients still see the unwrapped message. If you need full control over the wire bytes (no `"result"` wrapper, custom `event:`/`id:` fields), use `google.api.HttpBody` as the stream's response type and marshal the SSE frame yourself in the handler. + +## Defining a streaming endpoint + +A streaming method is just a `stream` response in your `.proto`. Nothing changes for SSE specifically: + +```protobuf +rpc StreamTokens(StreamTokensRequest) returns (stream Token) { + option (google.api.http) = { + post: "/api/v1/stream/tokens" + body: "*" + }; +} + +message Token { + string text = 1; + int32 index = 2; +} +``` + +Implement using `grpc.ServerStreamingServer[Token]`: + +```go +func (s *svc) StreamTokens(req *proto.StreamTokensRequest, stream grpc.ServerStreamingServer[proto.Token]) error { + ctx := stream.Context() + start := time.Now() + + for i, tok := range produce(ctx, req) { + // Stop generating (and stop paying for) tokens when the client disconnects. + // ctx.Err() goes non-nil when the HTTP connection drops — this is the + // load-bearing safety property for AI/LLM workloads. + if err := ctx.Err(); err != nil { + return errors.Wrap(err, "stream canceled") + } + if err := stream.Send(&proto.Token{Text: tok, Index: int32(i)}); err != nil { + return errors.Wrap(err, "stream send") + } + if i == 0 { + metrics.ObserveTTFT(time.Since(start)) + } + } + return nil +} +``` + +Two things matter for production: + +1. **Check `stream.Context().Err()` before every `Send`.** A browser tab closing cancels the HTTP context, which cancels the gRPC stream context. Pass the same context into your LLM SDK call so cancellation propagates to the upstream provider — otherwise the model keeps generating (and billing) after the user is gone. +2. **Record time-to-first-token (TTFT) as a distinct metric.** Total stream duration mixes upstream latency with generation throughput. Separating TTFT surfaces which one is degrading. Record it once per stream, on the first successful `Send`. + +## Calling a streaming endpoint + +### Browser (`EventSource`) + +```javascript +const events = new EventSource("/api/v1/stream/tokens"); +events.onmessage = (e) => { + const frame = JSON.parse(e.data); + console.log(frame.result.text); // unwrap the gateway envelope +}; +events.onerror = () => events.close(); +``` + +`EventSource` is the standard browser API — auto-reconnects on transient network failures, available in every modern browser, no dependencies. Note: it always sends `GET`. For `POST` streams (most non-trivial endpoints), use `fetch(..., { method: "POST" })` and a streaming response reader, or use a small library like [@microsoft/fetch-event-source](https://github.com/Azure/fetch-event-source) that handles POST + SSE parsing. + +### curl + +```console +$ # Default — newline-delimited JSON: +$ curl -N -X POST -H 'Content-Type: application/json' \ + -d '{"msg":"hello world"}' \ + http://localhost:9091/api/v1/stream/tokens + +$ # SSE — request text/event-stream: +$ curl -N -X POST -H 'Content-Type: application/json' -H 'Accept: text/event-stream' \ + -d '{"msg":"hello world"}' \ + http://localhost:9091/api/v1/stream/tokens +``` + +`-N` (no-buffer) is required — without it, curl will hold the response until the stream completes. + +### Native gRPC + +The same method is reachable as a native gRPC server-streaming call. SSE is purely a gateway concern; gRPC clients see plain proto-encoded `Token` messages with no `{"result": ...}` wrapping. + +## Disabling or replacing the default marshaler + +Two opt-out paths: + +| Goal | How | +|---|---| +| Turn off SSE entirely (force JSON for all streams) | Set `DISABLE_SSE_MARSHALER=true` | +| Keep SSE but customize the framing | Register your own `text/event-stream` marshaler from `PreStart` — see [HTTP Gateway Extensions](/howto/gateway-extensions). Service-registered marshalers win over ColdBrew's defaults on the same MIME | + +A custom marshaler is the right answer when you need richer SSE features (named events via `event:`, IDs for client-side dedup via `id:`, multi-line `data:` fields). Embed `core.SSEMarshaler` and override `Marshal` to add the extra fields. + +## Common pitfalls + +- **Compression buffers SSE.** gzip/zstd over an event stream stalls frame delivery because compressors hold bytes until they have enough to flush. ColdBrew's HTTP compression wrapper automatically excludes `text/event-stream`, so this Just Works — but if you put a reverse proxy in front (Nginx, Cloudflare, an in-house CDN) and it re-applies compression, you'll see stalls. Send `Content-Encoding: identity` or configure the proxy to skip SSE. +- **Reverse proxies also buffer responses.** Nginx in particular holds chunks until ~4KB by default. Set `X-Accel-Buffering: no` on the response, or `proxy_buffering off;` for the upstream block. Cloudflare typically passes SSE through but check your zone settings. +- **`EventSource` can only `GET`.** If your RPC's HTTP annotation is `post`, use `fetch` with a streaming reader, or [microsoft/fetch-event-source](https://github.com/Azure/fetch-event-source). The streaming format on the wire is identical. +- **Mid-stream errors render in the trailers.** gRPC stream errors arrive after the last `Send`, encoded as a final SSE frame (or as HTTP trailers, depending on gateway config). For nicer client behavior — explicit error events the JS side can handle — use `runtime.WithStreamErrorHandler` to control the format. +- **The `{"result": ...}` wrapper is gateway-imposed.** It applies to every streaming RPC over HTTP, not just SSE. Either unwrap on the client (`JSON.parse(e.data).result`) or use `google.api.HttpBody` as the response type for full control. + +## Related + +- [Streaming RPCs](/howto/streaming-rpcs/) — Proto definitions, handler patterns, deadline propagation, and the gateway's behavior for every gRPC method shape. +- [HTTP Gateway Extensions](/howto/gateway-extensions/) — Registering custom marshalers, error handlers, middleware, and additional routes on the gateway. +- [Configuration Reference](/config-reference/) — `DISABLE_SSE_MARSHALER` and related HTTP gateway options. +- [Metrics](/howto/Metrics/) — Where to surface TTFT and per-stream counters alongside ColdBrew's default Prometheus metrics. diff --git a/howto/streaming-rpcs.md b/howto/streaming-rpcs.md index 871b982..9fc6fd3 100644 --- a/howto/streaming-rpcs.md +++ b/howto/streaming-rpcs.md @@ -184,15 +184,16 @@ grpc-gateway translates streaming RPCs to HTTP, but the four shapes don't map cl | gRPC shape | HTTP behaviour through the gateway | |---|---| | Unary | Standard request/response. | -| Server-streaming | Newline-delimited JSON (NDJSON) over a chunked HTTP response. Clients read line-by-line. | +| Server-streaming | Newline-delimited JSON (NDJSON) over a chunked HTTP response by default, or Server-Sent Events when the client sends `Accept: text/event-stream`. | | Client-streaming | Limited — the gateway buffers and forwards as a unary gRPC stream. Don't rely on this for large or long-lived uploads. | | Bidirectional | Limited — no true concurrent interleaving over HTTP/1.1. Avoid for HTTP clients. | Practical things to know: +- **Server-Sent Events work out of the box.** ColdBrew registers a `text/event-stream` marshaler by default so any server-streaming RPC is consumable by a browser `EventSource(...)` without extra code. See [Server-Sent Events](/howto/server-sent-events/) for the framing, opt-out, and AI/LLM patterns; set `DISABLE_SSE_MARSHALER=true` to suppress. - **Reverse proxies buffer streamed responses.** Nginx, Cloudflare, and similar will hold chunks until they have "enough." Set `X-Accel-Buffering: no` on the response (or the upstream config) to disable buffering when you actually need server-streaming over HTTP. - **Stream errors need a handler.** Use `runtime.WithStreamErrorHandler` when registering the gateway to control how mid-stream errors render in the HTTP response. The default trailers-only behaviour is awkward to consume from a JSON client. -- **Native HTTP alternatives.** If your HTTP clients need true bidirectional or high-frequency push, consider a separate WebSocket or Server-Sent Events endpoint registered via [HTTP Gateway Extensions](/howto/gateway-extensions). Keep the gRPC stream as the canonical implementation and have the WebSocket handler delegate to it. +- **Bidirectional or push-style HTTP.** True bidi over HTTP/1.1 isn't viable through the gateway. For server → client push, prefer Server-Sent Events (above) over a separate WebSocket endpoint — it reuses the existing gRPC stream + gateway plumbing. WebSockets are still the right answer when you need client → server messages on the same connection; see [HTTP Gateway Extensions](/howto/gateway-extensions) for the registration recipe. See the [grpc-gateway streaming examples](https://github.com/grpc-ecosystem/grpc-gateway/tree/main/examples/internal/proto/examplepb) for the full HTTP semantics. @@ -200,5 +201,6 @@ See the [grpc-gateway streaming examples](https://github.com/grpc-ecosystem/grpc - [APIs how-to](/howto/APIs) — Defining gRPC and HTTP endpoints from proto. - [Interceptors](/howto/interceptors) — The stream interceptor chain and how to add your own. +- [Server-Sent Events](/howto/server-sent-events/) — Browser-consumable streams over the gateway, ideal for AI/LLM token streaming. - [HTTP Gateway Extensions](/howto/gateway-extensions) — Custom marshalers and routes when the gateway alone isn't enough. - [Tracing](/howto/Tracing) — Trace IDs propagate through stream contexts the same way they do for unary. diff --git a/tests/pages.ts b/tests/pages.ts index d7c33e9..053cf7d 100644 --- a/tests/pages.ts +++ b/tests/pages.ts @@ -22,7 +22,8 @@ export const allHowtoPages = [ "/howto/auth/", // 19 "/howto/gateway-extensions/", // 20 "/howto/streaming-rpcs/", // 21 - "/howto/database/", // 22 - "/howto/cache/", // 23 - "/howto/messaging/", // 24 + "/howto/server-sent-events/", // 22 + "/howto/database/", // 23 + "/howto/cache/", // 24 + "/howto/messaging/", // 25 ];