From d3bb33d69699e5b575280d23b42af4a063d374cd Mon Sep 17 00:00:00 2001 From: grigory Date: Wed, 13 May 2026 23:57:37 +0100 Subject: [PATCH 1/2] Add instrumentation tests for chunked/streaming HTTP response handling New JUnit 5 test class NettyChunkedResponseTest with a real Netty server that writes chunked responses manually (HttpResponse + HttpContent* + LastHttpContent), exercising the code path that HttpObjectAggregator-based tests never reach. Four test cases: - chunkedResponseSpanIncludesFullStreamDuration: span covers full stream time (~1s for 5 chunks x 200ms), not just header-send time (~0ms) - fullResponseStillFinishesSpanImmediately: FullHttpResponse regression - keepAliveSequentialChunkedRequestsGetCorrectSpans: STREAMING_CONTEXT_KEY lifecycle across back-to-back keep-alive requests - connectionDropDuringChunkedResponseFinishesSpan: span finished with error when client disconnects mid-stream --- .../server/NettyChunkedResponseTest.java | 372 ++++++++++++++++++ 1 file changed, 372 insertions(+) create mode 100644 dd-java-agent/instrumentation/netty/netty-4.1/src/test/java/datadog/trace/instrumentation/netty41/server/NettyChunkedResponseTest.java diff --git a/dd-java-agent/instrumentation/netty/netty-4.1/src/test/java/datadog/trace/instrumentation/netty41/server/NettyChunkedResponseTest.java b/dd-java-agent/instrumentation/netty/netty-4.1/src/test/java/datadog/trace/instrumentation/netty41/server/NettyChunkedResponseTest.java new file mode 100644 index 00000000000..2c36510a4ea --- /dev/null +++ b/dd-java-agent/instrumentation/netty/netty-4.1/src/test/java/datadog/trace/instrumentation/netty41/server/NettyChunkedResponseTest.java @@ -0,0 +1,372 @@ +package datadog.trace.instrumentation.netty41.server; + +import static datadog.trace.agent.test.assertions.Matchers.any; +import static datadog.trace.agent.test.assertions.Matchers.is; +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TagsMatcher.defaultTags; +import static datadog.trace.agent.test.assertions.TagsMatcher.tag; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.regex.Pattern; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +/** + * Tests that the Netty HTTP server instrumentation correctly handles chunked (streaming) responses. + * + *

The existing Netty41ServerTest uses HttpObjectAggregator which converts all responses to + * FullHttpResponse — so the chunked path (HttpResponse + HttpContent* + LastHttpContent) is never + * exercised. This test fills that gap. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class NettyChunkedResponseTest extends AbstractInstrumentationTest { + + private static final long CHUNK_DELAY_MS = 200; + private static final int CHUNK_COUNT = 5; + private static final Pattern NETTY_REQUEST = Pattern.compile("netty\\.request"); + private static final Pattern GET_CHUNKED = Pattern.compile("GET /chunked"); + private static final Pattern GET_FULL = Pattern.compile("GET /full"); + + private EventLoopGroup eventLoopGroup; + private int port; + + @BeforeAll + void startServer() throws Exception { + eventLoopGroup = new NioEventLoopGroup(); + ServerBootstrap bootstrap = + new ServerBootstrap() + .group(eventLoopGroup) + .channel(NioServerSocketChannel.class) + .childHandler( + new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new HttpServerCodec()); + ch.pipeline().addLast(new HttpObjectAggregator(65536)); + ch.pipeline().addLast(new ChunkedTestHandler()); + } + }); + Channel channel = bootstrap.bind(0).sync().channel(); + port = ((InetSocketAddress) channel.localAddress()).getPort(); + } + + @AfterAll + void stopServer() { + if (eventLoopGroup != null) { + eventLoopGroup.shutdownGracefully(); + } + } + + /** + * Verifies that the span for a chunked HTTP response covers the full streaming duration, not just + * the time to send headers. Without the fix in HttpServerResponseTracingHandler, the span would + * finish when HttpResponse (headers) is written (~0ms), ignoring the time spent writing + * HttpContent chunks and LastHttpContent. + */ + @Test + void chunkedResponseSpanIncludesFullStreamDuration() throws Exception { + String body = doGet("/chunked"); + assertEquals("chunk0chunk1chunk2chunk3chunk4", body); + + long expectedMinDurationMs = CHUNK_DELAY_MS * CHUNK_COUNT; + + assertTraces( + trace( + span() + .root() + .operationName(NETTY_REQUEST) + .resourceName(GET_CHUNKED) + .durationLongerThan(Duration.ofMillis(expectedMinDurationMs)) + .type("web") + .tags( + defaultTags(), + tag("http.status_code", is(200)), + tag("http.method", any()), + tag("http.url", any()), + tag("http.hostname", any()), + tag("http.useragent", any()), + tag("component", any()), + tag("span.kind", any()), + tag("peer.port", any()), + tag("peer.ipv4", any())))); + } + + /** + * Regression test: a non-chunked FullHttpResponse must still finish the span immediately. This + * ensures the instanceof ordering fix (FullHttpResponse checked before HttpResponse and + * LastHttpContent) does not break the standard single-message response path. + */ + @Test + void fullResponseStillFinishesSpanImmediately() throws Exception { + String body = doGet("/full"); + assertEquals("full-response", body); + + assertTraces( + trace( + span() + .root() + .operationName(NETTY_REQUEST) + .resourceName(GET_FULL) + .durationShorterThan(Duration.ofMillis(500)) + .type("web") + .tags( + defaultTags(), + tag("http.status_code", is(200)), + tag("http.method", any()), + tag("http.url", any()), + tag("http.hostname", any()), + tag("http.useragent", any()), + tag("component", any()), + tag("span.kind", any()), + tag("peer.port", any()), + tag("peer.ipv4", any())))); + } + + /** + * Verifies that two sequential chunked requests each produce a correctly-timed span. This + * exercises the STREAMING_CONTEXT_KEY lifecycle across multiple requests on the same connection: + * each request must set and clear the key independently. Note: HttpURLConnection sends requests + * sequentially (no pipelining), so this does not reproduce the concurrent race condition — it + * validates that the streaming context bookkeeping works correctly for back-to-back requests. + */ + @Test + void keepAliveSequentialChunkedRequestsGetCorrectSpans() throws Exception { + URL url = new URL("http://localhost:" + port + "/chunked"); + + HttpURLConnection conn1 = (HttpURLConnection) url.openConnection(); + conn1.setRequestProperty("Connection", "keep-alive"); + String body1 = readResponse(conn1); + assertEquals("chunk0chunk1chunk2chunk3chunk4", body1); + conn1.disconnect(); + + HttpURLConnection conn2 = (HttpURLConnection) url.openConnection(); + conn2.setRequestProperty("Connection", "keep-alive"); + String body2 = readResponse(conn2); + assertEquals("chunk0chunk1chunk2chunk3chunk4", body2); + conn2.disconnect(); + + long expectedMinDurationMs = CHUNK_DELAY_MS * CHUNK_COUNT; + + assertTraces( + trace( + span() + .root() + .operationName(NETTY_REQUEST) + .resourceName(GET_CHUNKED) + .durationLongerThan(Duration.ofMillis(expectedMinDurationMs)) + .type("web") + .tags( + defaultTags(), + tag("http.status_code", is(200)), + tag("http.method", any()), + tag("http.url", any()), + tag("http.hostname", any()), + tag("http.useragent", any()), + tag("component", any()), + tag("span.kind", any()), + tag("peer.port", any()), + tag("peer.ipv4", any()))), + trace( + span() + .root() + .operationName(NETTY_REQUEST) + .resourceName(GET_CHUNKED) + .durationLongerThan(Duration.ofMillis(expectedMinDurationMs)) + .type("web") + .tags( + defaultTags(), + tag("http.status_code", is(200)), + tag("http.method", any()), + tag("http.url", any()), + tag("http.hostname", any()), + tag("http.useragent", any()), + tag("component", any()), + tag("span.kind", any()), + tag("peer.port", any()), + tag("peer.ipv4", any())))); + } + + /** + * Verifies that a streaming span is properly finished (not leaked) when the client disconnects + * mid-stream. Without the channelInactive fix in HttpServerRequestTracingHandler, the span stored + * in STREAMING_CONTEXT_KEY would never be finished if LastHttpContent is never written because + * the connection dropped. + */ + @Test + void connectionDropDuringChunkedResponseFinishesSpan() throws Exception { + try (Socket socket = new Socket("localhost", port)) { + socket.setSoTimeout(5000); + socket + .getOutputStream() + .write("GET /slow-chunked HTTP/1.1\r\nHost: localhost\r\n\r\n".getBytes()); + socket.getOutputStream().flush(); + // Read until we get at least the first chunk — synchronization point before closing + byte[] buf = new byte[512]; + socket.getInputStream().read(buf); + } + // Socket closed — channelInactive should fire and finish the streaming span + + Thread.sleep(1500); + + // The span must be finished (not leaked) and marked as error since the channel + // closed before the response completed. Duration should be much shorter than + // the full 10s streaming time since we disconnected early. + assertTraces( + trace( + span() + .root() + .operationName(NETTY_REQUEST) + .resourceName(Pattern.compile("GET /slow-chunked")) + .type("web") + .error() + .durationShorterThan(Duration.ofMillis(5000)) + .tags( + defaultTags(), + tag("http.status_code", is(200)), + tag("http.method", any()), + tag("http.url", any()), + tag("http.hostname", any()), + tag("http.useragent", any()), + tag("component", any()), + tag("span.kind", any()), + tag("peer.port", any()), + tag("peer.ipv4", any()), + tag("error.type", any()), + tag("error.message", any()), + tag("error.stack", any())))); + } + + private String doGet(String path) throws Exception { + URL url = new URL("http://localhost:" + port + path); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + try { + return readResponse(conn); + } finally { + conn.disconnect(); + } + } + + private String readResponse(HttpURLConnection conn) throws Exception { + StringBuilder sb = new StringBuilder(); + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + sb.append(line); + } + } + return sb.toString(); + } + + static class ChunkedTestHandler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { + String uri = request.uri(); + if ("/chunked".equals(uri)) { + handleChunked(ctx); + } else if ("/slow-chunked".equals(uri)) { + handleSlowChunked(ctx); + } else if ("/full".equals(uri)) { + handleFull(ctx); + } else { + DefaultFullHttpResponse resp = + new DefaultFullHttpResponse( + HTTP_1_1, + HttpResponseStatus.NOT_FOUND, + Unpooled.copiedBuffer("not found", StandardCharsets.UTF_8)); + resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, resp.content().readableBytes()); + ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE); + } + } + + private void handleChunked(ChannelHandlerContext ctx) { + DefaultHttpResponse headers = new DefaultHttpResponse(HTTP_1_1, OK); + headers.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + ctx.writeAndFlush(headers); + + ctx.executor() + .execute( + () -> { + try { + for (int i = 0; i < CHUNK_COUNT; i++) { + Thread.sleep(CHUNK_DELAY_MS); + byte[] data = ("chunk" + i).getBytes(StandardCharsets.UTF_8); + ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(data))); + } + ctx.writeAndFlush(new DefaultLastHttpContent()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + ctx.close(); + } + }); + } + + private void handleSlowChunked(ChannelHandlerContext ctx) { + DefaultHttpResponse headers = new DefaultHttpResponse(HTTP_1_1, OK); + headers.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + ctx.writeAndFlush(headers); + + // Long streaming — 20 chunks × 500ms = 10s. The test will close the client socket + // after the first chunk, triggering channelInactive before LastHttpContent is sent. + ctx.executor() + .execute( + () -> { + try { + for (int i = 0; i < 20; i++) { + if (!ctx.channel().isActive()) { + return; + } + Thread.sleep(500); + byte[] data = ("slow" + i).getBytes(StandardCharsets.UTF_8); + ctx.writeAndFlush(new DefaultHttpContent(Unpooled.wrappedBuffer(data))); + } + ctx.writeAndFlush(new DefaultLastHttpContent()); + } catch (Exception e) { + // Channel closed — expected + } + }); + } + + private void handleFull(ChannelHandlerContext ctx) { + byte[] body = "full-response".getBytes(StandardCharsets.UTF_8); + DefaultFullHttpResponse resp = + new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(body)); + resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, body.length); + ctx.writeAndFlush(resp); + } + } +} From 77cc414ea5747e842950dd68be65e3e82ced37bd Mon Sep 17 00:00:00 2001 From: grigory Date: Wed, 13 May 2026 23:57:46 +0100 Subject: [PATCH 2/2] Fix Netty HTTP span lifecycle for chunked/streaming responses HttpServerResponseTracingHandler: route by message type (FullHttpResponse, HttpResponse, LastHttpContent) instead of finishing every span on HttpResponse. FullHttpResponse finishes immediately; HttpResponse defers to LastHttpContent via STREAMING_CONTEXT_KEY to avoid keep-alive race. WebSocket upgrades and bodyless responses (204, 205, 304) finish immediately since they never produce LastHttpContent. HttpServerRequestTracingHandler: channelInactive now checks STREAMING_CONTEXT_KEY and finishes leaked spans when channel closes mid-stream. AttributeKeys: added STREAMING_CONTEXT_KEY for chunked response context. --- .../HttpServerRequestTracingHandler.java | 18 +- .../HttpServerResponseTracingHandler.java | 157 +++++++++++++++++- .../netty41/AttributeKeys.java | 10 ++ 3 files changed, 177 insertions(+), 8 deletions(-) diff --git a/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerRequestTracingHandler.java b/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerRequestTracingHandler.java index 251b157b22f..0a30a612213 100644 --- a/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerRequestTracingHandler.java +++ b/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerRequestTracingHandler.java @@ -6,6 +6,7 @@ import static datadog.trace.instrumentation.netty41.AttributeKeys.CONTEXT_ATTRIBUTE_KEY; import static datadog.trace.instrumentation.netty41.AttributeKeys.PARENT_CONTEXT_ATTRIBUTE_KEY; import static datadog.trace.instrumentation.netty41.AttributeKeys.REQUEST_HEADERS_ATTRIBUTE_KEY; +import static datadog.trace.instrumentation.netty41.AttributeKeys.STREAMING_CONTEXT_KEY; import static datadog.trace.instrumentation.netty41.server.NettyHttpServerDecorator.DECORATE; import datadog.context.Context; @@ -88,11 +89,26 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { super.channelInactive(ctx); } finally { + // Finish any in-flight streaming span first — during chunked responses the span context + // is stored in STREAMING_CONTEXT_KEY, and CONTEXT_ATTRIBUTE_KEY may already belong to + // the next keep-alive request. + try { + final Context streamingContext = ctx.channel().attr(STREAMING_CONTEXT_KEY).getAndRemove(); + if (streamingContext != null) { + final AgentSpan streamingSpan = spanFromContext(streamingContext); + if (streamingSpan != null) { + DECORATE.onError( + streamingSpan, new Exception("Channel closed before response completed")); + DECORATE.beforeFinish(streamingContext); + streamingSpan.finish(); + } + } + } catch (final Throwable ignored) { + } try { final Context storedContext = ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).getAndRemove(); final AgentSpan span = spanFromContext(storedContext); if (span != null && span.phasedFinish()) { - // at this point we can just publish this span to avoid loosing the rest of the trace span.publish(); } } catch (final Throwable ignored) { diff --git a/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerResponseTracingHandler.java b/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerResponseTracingHandler.java index 1cf31f91eae..7910f37d545 100644 --- a/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerResponseTracingHandler.java +++ b/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerResponseTracingHandler.java @@ -2,6 +2,7 @@ import static datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge.spanFromContext; import static datadog.trace.instrumentation.netty41.AttributeKeys.CONTEXT_ATTRIBUTE_KEY; +import static datadog.trace.instrumentation.netty41.AttributeKeys.STREAMING_CONTEXT_KEY; import static datadog.trace.instrumentation.netty41.AttributeKeys.WEBSOCKET_SENDER_HANDLER_CONTEXT; import static datadog.trace.instrumentation.netty41.server.NettyHttpServerDecorator.DECORATE; @@ -13,9 +14,11 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; @ChannelHandler.Sharable public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdapter { @@ -26,36 +29,176 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann final Context storedContext = ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get(); final AgentSpan span = spanFromContext(storedContext); - if (span == null || !(msg instanceof HttpResponse)) { - ctx.write(msg, prm); + // FullHttpResponse must be checked BEFORE LastHttpContent and HttpResponse, + // because FullHttpResponse extends both LastHttpContent and HttpResponse. + if (msg instanceof FullHttpResponse) { + handleFullHttpResponse(ctx, storedContext, span, (FullHttpResponse) msg, prm); return; } - try (final ContextScope scope = storedContext.attach()) { - final HttpResponse response = (HttpResponse) msg; + // Handle HttpResponse (headers only — start of chunked/streaming response). + // Must be checked BEFORE LastHttpContent/HttpContent. + if (msg instanceof HttpResponse) { + handleHttpResponse(ctx, storedContext, span, (HttpResponse) msg, prm); + return; + } + + // Handle LastHttpContent (end of chunked/streaming response). + // Must be checked BEFORE HttpContent (LastHttpContent extends HttpContent). + // IMPORTANT: Use STREAMING_CONTEXT_KEY to avoid keep-alive race condition where + // channelRead for the next request may overwrite CONTEXT_ATTRIBUTE_KEY before + // this LastHttpContent write task runs on the EventLoop. + if (msg instanceof LastHttpContent) { + Context streamingContext = ctx.channel().attr(STREAMING_CONTEXT_KEY).getAndRemove(); + Context contextForLastContent = streamingContext != null ? streamingContext : storedContext; + AgentSpan spanForLastContent = + streamingContext != null ? spanFromContext(streamingContext) : span; + handleLastHttpContent( + ctx, contextForLastContent, spanForLastContent, (LastHttpContent) msg, prm); + return; + } + + // Intermediate HttpContent chunks — pass through without touching the span. + ctx.write(msg, prm); + } + /** Complete response in a single message (non-streaming). Finish span immediately. */ + private void handleFullHttpResponse( + final ChannelHandlerContext ctx, + final Context storedContext, + final AgentSpan span, + final FullHttpResponse response, + final ChannelPromise prm) { + + if (span == null) { + ctx.write(response, prm); + return; + } + + try (final ContextScope scope = storedContext.attach()) { try { - ctx.write(msg, prm); + ctx.write(response, prm); } catch (final Throwable throwable) { DECORATE.onError(span, throwable); span.setHttpStatusCode(500); - span.finish(); // Finish the span manually since finishSpanOnClose was false + span.finish(); ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); throw throwable; } + final boolean isWebsocketUpgrade = response.status() == HttpResponseStatus.SWITCHING_PROTOCOLS && "websocket".equals(response.headers().get(HttpHeaderNames.UPGRADE)); + if (isWebsocketUpgrade) { ctx.channel() .attr(WEBSOCKET_SENDER_HANDLER_CONTEXT) .set(new HandlerContext.Sender(span, ctx.channel().id().asShortText())); } + if (response.status() != HttpResponseStatus.CONTINUE && (response.status() != HttpResponseStatus.SWITCHING_PROTOCOLS || isWebsocketUpgrade)) { DECORATE.onResponse(span, response); DECORATE.beforeFinish(scope.context()); - span.finish(); // Finish the span manually since finishSpanOnClose was false + span.finish(); + ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); + } + } + } + + /** + * Chunked response headers — record status but do NOT finish the span yet. The span will be + * finished when the corresponding LastHttpContent is written. Context is saved to + * STREAMING_CONTEXT_KEY so that a keep-alive channelRead for the next request cannot overwrite it + * before LastHttpContent arrives. + */ + private void handleHttpResponse( + final ChannelHandlerContext ctx, + final Context storedContext, + final AgentSpan span, + final HttpResponse response, + final ChannelPromise prm) { + + if (span == null) { + ctx.write(response, prm); + return; + } + + try (final ContextScope scope = storedContext.attach()) { + try { + ctx.write(response, prm); + } catch (final Throwable throwable) { + DECORATE.onError(span, throwable); + span.setHttpStatusCode(500); + span.finish(); + ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); + throw throwable; + } + + final boolean isWebsocketUpgrade = + response.status() == HttpResponseStatus.SWITCHING_PROTOCOLS + && "websocket".equals(response.headers().get(HttpHeaderNames.UPGRADE)); + + if (isWebsocketUpgrade) { + ctx.channel() + .attr(WEBSOCKET_SENDER_HANDLER_CONTEXT) + .set(new HandlerContext.Sender(span, ctx.channel().id().asShortText())); + } + + if (response.status() != HttpResponseStatus.CONTINUE + && (response.status() != HttpResponseStatus.SWITCHING_PROTOCOLS || isWebsocketUpgrade)) { + DECORATE.onResponse(span, response); + + int statusCode = response.status().code(); + boolean isBodyless = statusCode == 204 || statusCode == 205 || statusCode == 304; + + if (isWebsocketUpgrade || isBodyless) { + // WebSocket upgrades and bodyless responses (204, 205, 304) don't produce + // LastHttpContent — finish span immediately. + DECORATE.beforeFinish(scope.context()); + span.finish(); + ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); + } else { + ctx.channel().attr(STREAMING_CONTEXT_KEY).set(storedContext); + ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); + // Span finish is deferred to handleLastHttpContent. + } + } + } + } + + /** End of chunked/streaming response — finish the span now that the full duration is known. */ + private void handleLastHttpContent( + final ChannelHandlerContext ctx, + final Context storedContext, + final AgentSpan span, + final LastHttpContent msg, + final ChannelPromise prm) { + + if (span == null) { + ctx.write(msg, prm); + return; + } + + try (final ContextScope scope = storedContext.attach()) { + try { + ctx.write(msg, prm); + } catch (final Throwable throwable) { + DECORATE.onError(span, throwable); + span.setHttpStatusCode(500); + span.finish(); + if (ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get() == storedContext) { + ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); + } + throw throwable; + } + + DECORATE.beforeFinish(scope.context()); + span.finish(); + // Only remove CONTEXT_ATTRIBUTE_KEY if it still holds our context. + // Under keep-alive a new request's channelRead may have already replaced it. + // All channel ops run on the same EventLoop thread so this check is race-free. + if (ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get() == storedContext) { ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); } } diff --git a/dd-java-agent/instrumentation/netty/netty-common/src/main/java/datadog/trace/instrumentation/netty41/AttributeKeys.java b/dd-java-agent/instrumentation/netty/netty-common/src/main/java/datadog/trace/instrumentation/netty41/AttributeKeys.java index 8640ae9557c..de5531f4dfb 100644 --- a/dd-java-agent/instrumentation/netty/netty-common/src/main/java/datadog/trace/instrumentation/netty41/AttributeKeys.java +++ b/dd-java-agent/instrumentation/netty/netty-common/src/main/java/datadog/trace/instrumentation/netty41/AttributeKeys.java @@ -20,6 +20,16 @@ public final class AttributeKeys { public static final AttributeKey CONTEXT_ATTRIBUTE_KEY = attributeKey(DD_CONTEXT_ATTRIBUTE); + /** + * Stores the context of the currently-streaming (chunked) response. Set when the HTTP response + * headers are sent, cleared when LastHttpContent is processed. Using a separate key (instead of + * CONTEXT_ATTRIBUTE_KEY) avoids a keep-alive race: Netty can process the next request's + * channelRead before the current response's LastHttpContent write task runs, overwriting + * CONTEXT_ATTRIBUTE_KEY with the new request's span. + */ + public static final AttributeKey STREAMING_CONTEXT_KEY = + attributeKey("datadog.server.streaming.context"); + public static final AttributeKey CLIENT_PARENT_ATTRIBUTE_KEY = attributeKey("datadog.client.parent.span");