Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static datadog.trace.instrumentation.netty41.AttributeKeys.CONTEXT_ATTRIBUTE_KEY;
import static datadog.trace.instrumentation.netty41.AttributeKeys.PARENT_CONTEXT_ATTRIBUTE_KEY;
import static datadog.trace.instrumentation.netty41.AttributeKeys.REQUEST_HEADERS_ATTRIBUTE_KEY;
import static datadog.trace.instrumentation.netty41.AttributeKeys.STREAMING_CONTEXT_KEY;
import static datadog.trace.instrumentation.netty41.server.NettyHttpServerDecorator.DECORATE;

import datadog.context.Context;
Expand Down Expand Up @@ -88,11 +89,26 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
super.channelInactive(ctx);
} finally {
// Finish any in-flight streaming span first — during chunked responses the span context
// is stored in STREAMING_CONTEXT_KEY, and CONTEXT_ATTRIBUTE_KEY may already belong to
// the next keep-alive request.
try {
final Context streamingContext = ctx.channel().attr(STREAMING_CONTEXT_KEY).getAndRemove();
if (streamingContext != null) {
final AgentSpan streamingSpan = spanFromContext(streamingContext);
if (streamingSpan != null) {
DECORATE.onError(
streamingSpan, new Exception("Channel closed before response completed"));
DECORATE.beforeFinish(streamingContext);
streamingSpan.finish();
}
}
} catch (final Throwable ignored) {
}
try {
final Context storedContext = ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).getAndRemove();
final AgentSpan span = spanFromContext(storedContext);
if (span != null && span.phasedFinish()) {
// at this point we can just publish this span to avoid loosing the rest of the trace
span.publish();
}
} catch (final Throwable ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge.spanFromContext;
import static datadog.trace.instrumentation.netty41.AttributeKeys.CONTEXT_ATTRIBUTE_KEY;
import static datadog.trace.instrumentation.netty41.AttributeKeys.STREAMING_CONTEXT_KEY;
import static datadog.trace.instrumentation.netty41.AttributeKeys.WEBSOCKET_SENDER_HANDLER_CONTEXT;
import static datadog.trace.instrumentation.netty41.server.NettyHttpServerDecorator.DECORATE;

Expand All @@ -13,9 +14,11 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;

@ChannelHandler.Sharable
public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdapter {
Expand All @@ -26,36 +29,176 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
final Context storedContext = ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get();
final AgentSpan span = spanFromContext(storedContext);

if (span == null || !(msg instanceof HttpResponse)) {
ctx.write(msg, prm);
// FullHttpResponse must be checked BEFORE LastHttpContent and HttpResponse,
// because FullHttpResponse extends both LastHttpContent and HttpResponse.
if (msg instanceof FullHttpResponse) {
handleFullHttpResponse(ctx, storedContext, span, (FullHttpResponse) msg, prm);
return;
}

try (final ContextScope scope = storedContext.attach()) {
final HttpResponse response = (HttpResponse) msg;
// Handle HttpResponse (headers only — start of chunked/streaming response).
// Must be checked BEFORE LastHttpContent/HttpContent.
if (msg instanceof HttpResponse) {
handleHttpResponse(ctx, storedContext, span, (HttpResponse) msg, prm);
return;
}

// Handle LastHttpContent (end of chunked/streaming response).
// Must be checked BEFORE HttpContent (LastHttpContent extends HttpContent).
// IMPORTANT: Use STREAMING_CONTEXT_KEY to avoid keep-alive race condition where
// channelRead for the next request may overwrite CONTEXT_ATTRIBUTE_KEY before
// this LastHttpContent write task runs on the EventLoop.
if (msg instanceof LastHttpContent) {
Context streamingContext = ctx.channel().attr(STREAMING_CONTEXT_KEY).getAndRemove();
Context contextForLastContent = streamingContext != null ? streamingContext : storedContext;
AgentSpan spanForLastContent =
streamingContext != null ? spanFromContext(streamingContext) : span;
handleLastHttpContent(
ctx, contextForLastContent, spanForLastContent, (LastHttpContent) msg, prm);
return;
}

// Intermediate HttpContent chunks — pass through without touching the span.
ctx.write(msg, prm);
}

/** Complete response in a single message (non-streaming). Finish span immediately. */
private void handleFullHttpResponse(
final ChannelHandlerContext ctx,
final Context storedContext,
final AgentSpan span,
final FullHttpResponse response,
final ChannelPromise prm) {

if (span == null) {
ctx.write(response, prm);
return;
}

try (final ContextScope scope = storedContext.attach()) {
try {
ctx.write(msg, prm);
ctx.write(response, prm);
} catch (final Throwable throwable) {
DECORATE.onError(span, throwable);
span.setHttpStatusCode(500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
span.finish();
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
throw throwable;
}

final boolean isWebsocketUpgrade =
response.status() == HttpResponseStatus.SWITCHING_PROTOCOLS
&& "websocket".equals(response.headers().get(HttpHeaderNames.UPGRADE));

if (isWebsocketUpgrade) {
ctx.channel()
.attr(WEBSOCKET_SENDER_HANDLER_CONTEXT)
.set(new HandlerContext.Sender(span, ctx.channel().id().asShortText()));
}

if (response.status() != HttpResponseStatus.CONTINUE
&& (response.status() != HttpResponseStatus.SWITCHING_PROTOCOLS || isWebsocketUpgrade)) {
DECORATE.onResponse(span, response);
DECORATE.beforeFinish(scope.context());
span.finish(); // Finish the span manually since finishSpanOnClose was false
span.finish();
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
}
}
}

/**
* Chunked response headers — record status but do NOT finish the span yet. The span will be
* finished when the corresponding LastHttpContent is written. Context is saved to
* STREAMING_CONTEXT_KEY so that a keep-alive channelRead for the next request cannot overwrite it
* before LastHttpContent arrives.
*/
private void handleHttpResponse(
final ChannelHandlerContext ctx,
final Context storedContext,
final AgentSpan span,
final HttpResponse response,
final ChannelPromise prm) {

if (span == null) {
ctx.write(response, prm);
return;
}

try (final ContextScope scope = storedContext.attach()) {
try {
ctx.write(response, prm);
} catch (final Throwable throwable) {
DECORATE.onError(span, throwable);
span.setHttpStatusCode(500);
span.finish();
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
throw throwable;
}

final boolean isWebsocketUpgrade =
response.status() == HttpResponseStatus.SWITCHING_PROTOCOLS
&& "websocket".equals(response.headers().get(HttpHeaderNames.UPGRADE));

if (isWebsocketUpgrade) {
ctx.channel()
.attr(WEBSOCKET_SENDER_HANDLER_CONTEXT)
.set(new HandlerContext.Sender(span, ctx.channel().id().asShortText()));
}

if (response.status() != HttpResponseStatus.CONTINUE
&& (response.status() != HttpResponseStatus.SWITCHING_PROTOCOLS || isWebsocketUpgrade)) {
DECORATE.onResponse(span, response);

int statusCode = response.status().code();
boolean isBodyless = statusCode == 204 || statusCode == 205 || statusCode == 304;

if (isWebsocketUpgrade || isBodyless) {
// WebSocket upgrades and bodyless responses (204, 205, 304) don't produce
// LastHttpContent — finish span immediately.
DECORATE.beforeFinish(scope.context());
span.finish();
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
} else {
ctx.channel().attr(STREAMING_CONTEXT_KEY).set(storedContext);
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
// Span finish is deferred to handleLastHttpContent.
}
}
}
}

/** End of chunked/streaming response — finish the span now that the full duration is known. */
private void handleLastHttpContent(
final ChannelHandlerContext ctx,
final Context storedContext,
final AgentSpan span,
final LastHttpContent msg,
final ChannelPromise prm) {

if (span == null) {
ctx.write(msg, prm);
return;
}

try (final ContextScope scope = storedContext.attach()) {
try {
ctx.write(msg, prm);
} catch (final Throwable throwable) {
DECORATE.onError(span, throwable);
span.setHttpStatusCode(500);
span.finish();
if (ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get() == storedContext) {
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
}
throw throwable;
}

DECORATE.beforeFinish(scope.context());
span.finish();
// Only remove CONTEXT_ATTRIBUTE_KEY if it still holds our context.
// Under keep-alive a new request's channelRead may have already replaced it.
// All channel ops run on the same EventLoop thread so this check is race-free.
if (ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get() == storedContext) {
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
}
}
Expand Down
Loading
Loading