diff --git a/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerRequestTracingHandler.java b/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerRequestTracingHandler.java index 251b157b22f..0a30a612213 100644 --- a/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerRequestTracingHandler.java +++ b/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerRequestTracingHandler.java @@ -6,6 +6,7 @@ import static datadog.trace.instrumentation.netty41.AttributeKeys.CONTEXT_ATTRIBUTE_KEY; import static datadog.trace.instrumentation.netty41.AttributeKeys.PARENT_CONTEXT_ATTRIBUTE_KEY; import static datadog.trace.instrumentation.netty41.AttributeKeys.REQUEST_HEADERS_ATTRIBUTE_KEY; +import static datadog.trace.instrumentation.netty41.AttributeKeys.STREAMING_CONTEXT_KEY; import static datadog.trace.instrumentation.netty41.server.NettyHttpServerDecorator.DECORATE; import datadog.context.Context; @@ -88,11 +89,26 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { super.channelInactive(ctx); } finally { + // Finish any in-flight streaming span first — during chunked responses the span context + // is stored in STREAMING_CONTEXT_KEY, and CONTEXT_ATTRIBUTE_KEY may already belong to + // the next keep-alive request. + try { + final Context streamingContext = ctx.channel().attr(STREAMING_CONTEXT_KEY).getAndRemove(); + if (streamingContext != null) { + final AgentSpan streamingSpan = spanFromContext(streamingContext); + if (streamingSpan != null) { + DECORATE.onError( + streamingSpan, new Exception("Channel closed before response completed")); + DECORATE.beforeFinish(streamingContext); + streamingSpan.finish(); + } + } + } catch (final Throwable ignored) { + } try { final Context storedContext = ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).getAndRemove(); final AgentSpan span = spanFromContext(storedContext); if (span != null && span.phasedFinish()) { - // at this point we can just publish this span to avoid loosing the rest of the trace span.publish(); } } catch (final Throwable ignored) { diff --git a/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerResponseTracingHandler.java b/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerResponseTracingHandler.java index 1cf31f91eae..7910f37d545 100644 --- a/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerResponseTracingHandler.java +++ b/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerResponseTracingHandler.java @@ -2,6 +2,7 @@ import static datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge.spanFromContext; import static datadog.trace.instrumentation.netty41.AttributeKeys.CONTEXT_ATTRIBUTE_KEY; +import static datadog.trace.instrumentation.netty41.AttributeKeys.STREAMING_CONTEXT_KEY; import static datadog.trace.instrumentation.netty41.AttributeKeys.WEBSOCKET_SENDER_HANDLER_CONTEXT; import static datadog.trace.instrumentation.netty41.server.NettyHttpServerDecorator.DECORATE; @@ -13,9 +14,11 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; @ChannelHandler.Sharable public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdapter { @@ -26,36 +29,176 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann final Context storedContext = ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get(); final AgentSpan span = spanFromContext(storedContext); - if (span == null || !(msg instanceof HttpResponse)) { - ctx.write(msg, prm); + // FullHttpResponse must be checked BEFORE LastHttpContent and HttpResponse, + // because FullHttpResponse extends both LastHttpContent and HttpResponse. + if (msg instanceof FullHttpResponse) { + handleFullHttpResponse(ctx, storedContext, span, (FullHttpResponse) msg, prm); return; } - try (final ContextScope scope = storedContext.attach()) { - final HttpResponse response = (HttpResponse) msg; + // Handle HttpResponse (headers only — start of chunked/streaming response). + // Must be checked BEFORE LastHttpContent/HttpContent. + if (msg instanceof HttpResponse) { + handleHttpResponse(ctx, storedContext, span, (HttpResponse) msg, prm); + return; + } + + // Handle LastHttpContent (end of chunked/streaming response). + // Must be checked BEFORE HttpContent (LastHttpContent extends HttpContent). + // IMPORTANT: Use STREAMING_CONTEXT_KEY to avoid keep-alive race condition where + // channelRead for the next request may overwrite CONTEXT_ATTRIBUTE_KEY before + // this LastHttpContent write task runs on the EventLoop. + if (msg instanceof LastHttpContent) { + Context streamingContext = ctx.channel().attr(STREAMING_CONTEXT_KEY).getAndRemove(); + Context contextForLastContent = streamingContext != null ? streamingContext : storedContext; + AgentSpan spanForLastContent = + streamingContext != null ? spanFromContext(streamingContext) : span; + handleLastHttpContent( + ctx, contextForLastContent, spanForLastContent, (LastHttpContent) msg, prm); + return; + } + + // Intermediate HttpContent chunks — pass through without touching the span. + ctx.write(msg, prm); + } + /** Complete response in a single message (non-streaming). Finish span immediately. */ + private void handleFullHttpResponse( + final ChannelHandlerContext ctx, + final Context storedContext, + final AgentSpan span, + final FullHttpResponse response, + final ChannelPromise prm) { + + if (span == null) { + ctx.write(response, prm); + return; + } + + try (final ContextScope scope = storedContext.attach()) { try { - ctx.write(msg, prm); + ctx.write(response, prm); } catch (final Throwable throwable) { DECORATE.onError(span, throwable); span.setHttpStatusCode(500); - span.finish(); // Finish the span manually since finishSpanOnClose was false + span.finish(); ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); throw throwable; } + final boolean isWebsocketUpgrade = response.status() == HttpResponseStatus.SWITCHING_PROTOCOLS && "websocket".equals(response.headers().get(HttpHeaderNames.UPGRADE)); + if (isWebsocketUpgrade) { ctx.channel() .attr(WEBSOCKET_SENDER_HANDLER_CONTEXT) .set(new HandlerContext.Sender(span, ctx.channel().id().asShortText())); } + if (response.status() != HttpResponseStatus.CONTINUE && (response.status() != HttpResponseStatus.SWITCHING_PROTOCOLS || isWebsocketUpgrade)) { DECORATE.onResponse(span, response); DECORATE.beforeFinish(scope.context()); - span.finish(); // Finish the span manually since finishSpanOnClose was false + span.finish(); + ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); + } + } + } + + /** + * Chunked response headers — record status but do NOT finish the span yet. The span will be + * finished when the corresponding LastHttpContent is written. Context is saved to + * STREAMING_CONTEXT_KEY so that a keep-alive channelRead for the next request cannot overwrite it + * before LastHttpContent arrives. + */ + private void handleHttpResponse( + final ChannelHandlerContext ctx, + final Context storedContext, + final AgentSpan span, + final HttpResponse response, + final ChannelPromise prm) { + + if (span == null) { + ctx.write(response, prm); + return; + } + + try (final ContextScope scope = storedContext.attach()) { + try { + ctx.write(response, prm); + } catch (final Throwable throwable) { + DECORATE.onError(span, throwable); + span.setHttpStatusCode(500); + span.finish(); + ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); + throw throwable; + } + + final boolean isWebsocketUpgrade = + response.status() == HttpResponseStatus.SWITCHING_PROTOCOLS + && "websocket".equals(response.headers().get(HttpHeaderNames.UPGRADE)); + + if (isWebsocketUpgrade) { + ctx.channel() + .attr(WEBSOCKET_SENDER_HANDLER_CONTEXT) + .set(new HandlerContext.Sender(span, ctx.channel().id().asShortText())); + } + + if (response.status() != HttpResponseStatus.CONTINUE + && (response.status() != HttpResponseStatus.SWITCHING_PROTOCOLS || isWebsocketUpgrade)) { + DECORATE.onResponse(span, response); + + int statusCode = response.status().code(); + boolean isBodyless = statusCode == 204 || statusCode == 205 || statusCode == 304; + + if (isWebsocketUpgrade || isBodyless) { + // WebSocket upgrades and bodyless responses (204, 205, 304) don't produce + // LastHttpContent — finish span immediately. + DECORATE.beforeFinish(scope.context()); + span.finish(); + ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); + } else { + ctx.channel().attr(STREAMING_CONTEXT_KEY).set(storedContext); + ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); + // Span finish is deferred to handleLastHttpContent. + } + } + } + } + + /** End of chunked/streaming response — finish the span now that the full duration is known. */ + private void handleLastHttpContent( + final ChannelHandlerContext ctx, + final Context storedContext, + final AgentSpan span, + final LastHttpContent msg, + final ChannelPromise prm) { + + if (span == null) { + ctx.write(msg, prm); + return; + } + + try (final ContextScope scope = storedContext.attach()) { + try { + ctx.write(msg, prm); + } catch (final Throwable throwable) { + DECORATE.onError(span, throwable); + span.setHttpStatusCode(500); + span.finish(); + if (ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get() == storedContext) { + ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); + } + throw throwable; + } + + DECORATE.beforeFinish(scope.context()); + span.finish(); + // Only remove CONTEXT_ATTRIBUTE_KEY if it still holds our context. + // Under keep-alive a new request's channelRead may have already replaced it. + // All channel ops run on the same EventLoop thread so this check is race-free. + if (ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get() == storedContext) { ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove(); } } diff --git a/dd-java-agent/instrumentation/netty/netty-4.1/src/test/java/datadog/trace/instrumentation/netty41/server/NettyChunkedResponseTest.java b/dd-java-agent/instrumentation/netty/netty-4.1/src/test/java/datadog/trace/instrumentation/netty41/server/NettyChunkedResponseTest.java new file mode 100644 index 00000000000..2c36510a4ea --- /dev/null +++ b/dd-java-agent/instrumentation/netty/netty-4.1/src/test/java/datadog/trace/instrumentation/netty41/server/NettyChunkedResponseTest.java @@ -0,0 +1,372 @@ +package datadog.trace.instrumentation.netty41.server; + +import static datadog.trace.agent.test.assertions.Matchers.any; +import static datadog.trace.agent.test.assertions.Matchers.is; +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TagsMatcher.defaultTags; +import static datadog.trace.agent.test.assertions.TagsMatcher.tag; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.regex.Pattern; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +/** + * Tests that the Netty HTTP server instrumentation correctly handles chunked (streaming) responses. + * + *
The existing Netty41ServerTest uses HttpObjectAggregator which converts all responses to
+ * FullHttpResponse — so the chunked path (HttpResponse + HttpContent* + LastHttpContent) is never
+ * exercised. This test fills that gap.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class NettyChunkedResponseTest extends AbstractInstrumentationTest {
+
+ private static final long CHUNK_DELAY_MS = 200;
+ private static final int CHUNK_COUNT = 5;
+ private static final Pattern NETTY_REQUEST = Pattern.compile("netty\\.request");
+ private static final Pattern GET_CHUNKED = Pattern.compile("GET /chunked");
+ private static final Pattern GET_FULL = Pattern.compile("GET /full");
+
+ private EventLoopGroup eventLoopGroup;
+ private int port;
+
+ @BeforeAll
+ void startServer() throws Exception {
+ eventLoopGroup = new NioEventLoopGroup();
+ ServerBootstrap bootstrap =
+ new ServerBootstrap()
+ .group(eventLoopGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(
+ new ChannelInitializer