diff --git a/doc/api/diagnostics_channel.md b/doc/api/diagnostics_channel.md index 99f7af3dd3e95b..00a44ec44cc4b8 100644 --- a/doc/api/diagnostics_channel.md +++ b/doc/api/diagnostics_channel.md @@ -266,6 +266,91 @@ const channelsByCollection = diagnostics_channel.tracingChannel({ }); ``` +#### `diagnostics_channel.syncTracingChannel(nameOrChannels)` + + + +> Stability: 1 - Experimental + +* `nameOrChannels` {string|SyncTracingChannel|Object} Channel name or + object containing the `start`, `end`, and `error` channels +* Returns: {SyncTracingChannel} + +Creates a [`SyncTracingChannel`][] wrapper. Named construction creates channels +using `tracing:${name}:start`, `tracing:${name}:end`, and +`tracing:${name}:error`. + +#### `diagnostics_channel.callbackTracingChannel(nameOrChannels)` + + + +> Stability: 1 - Experimental + +* `nameOrChannels` {string|CallbackTracingChannel|Object} Channel name or + object containing the `asyncStart`, `asyncEnd`, and `error` channels +* Returns: {CallbackTracingChannel} + +Creates a [`CallbackTracingChannel`][] wrapper. Named construction creates +channels using `tracing:${name}:asyncStart`, `tracing:${name}:asyncEnd`, and +`tracing:${name}:error`. + +#### `diagnostics_channel.promiseTracingChannel(nameOrChannels)` + + + +> Stability: 1 - Experimental + +* `nameOrChannels` {string|PromiseTracingChannel|Object} Channel name or object + containing the `asyncStart`, `asyncEnd`, and `error` channels +* Returns: {PromiseTracingChannel} + +Creates a [`PromiseTracingChannel`][] wrapper. Named construction creates +channels using `tracing:${name}:asyncStart`, `tracing:${name}:asyncEnd`, and +`tracing:${name}:error`. + +#### `diagnostics_channel.syncIteratorTracingChannel(nameOrChannels)` + + + +> Stability: 1 - Experimental + +* `nameOrChannels` {string|SyncIteratorTracingChannel|Object} Channel name or + object containing the `yield`, `return`, and `error` channels +* Returns: {SyncIteratorTracingChannel} + +Creates a [`SyncIteratorTracingChannel`][] wrapper. Named construction creates +channels using `tracing:${name}:syncYield`, `tracing:${name}:return`, and +`tracing:${name}:error`. + +#### `diagnostics_channel.asyncIteratorTracingChannel(nameOrChannels)` + + + +> Stability: 1 - Experimental + +* `nameOrChannels` {string|AsyncIteratorTracingChannel|Object} Channel name or + object containing the `yield`, `return`, and `error` channels +* Returns: {AsyncIteratorTracingChannel} + +Creates an [`AsyncIteratorTracingChannel`][] wrapper. Named construction creates +channels using `tracing:${name}:asyncYield`, `tracing:${name}:return`, and +`tracing:${name}:error`. + ```cjs const diagnostics_channel = require('node:diagnostics_channel'); @@ -754,7 +839,34 @@ simplify the process of producing events for tracing application flow. [`diagnostics_channel.tracingChannel()`][] is used to construct a `TracingChannel`. As with `Channel` it is recommended to create and reuse a single `TracingChannel` at the top-level of the file rather than creating them -dynamically. +dynamically. `TracingChannel` remains the aggregate compatibility wrapper for +the original tracing channels. + +#### `tracingChannel.wrap(fn[, options])` + + + +* `fn` {Function} Function to wrap +* `options` {Object} + * `context` {Object|Function} Context object or per-call context builder + * `wrapArgs` {Function} Rewrite arguments before the call + * `mapResult` {Function} Transform the immediate return value +* Returns: {Function} Wrapped function + +Wrap a function once and reuse the wrapped callable across multiple invocations. +This is the recommended API for composing tracing behavior. The `context` +option may be either a plain object or a builder function. If `context` is an +object, that same object is published for each operation. If `context` is a +function, it is called once per invocation with the wrapped receiver as `this` +and the original arguments. `wrapArgs` and `mapResult` run inside the +synchronous trace, so errors thrown from either are treated as call errors. + +This method is intended to compose the specialized tracers. For example, +`wrapArgs` can use `callbackTracingChannel.wrapArgs()` and `mapResult` can use +`promiseTracingChannel.wrap()` or one of the iterator wrappers. #### `tracingChannel.subscribe(subscribers)` @@ -1419,6 +1531,91 @@ is automatically published when disposal occurs at the end of the block. All events share the same context object, which can be extended with additional properties like `result` during scope execution. +### Class: `SyncTracingChannel` + +> Stability: 1 - Experimental + +`SyncTracingChannel` publishes `start`, `end`, and `error` events around the +synchronous call boundary of a wrapped function. + +#### `syncTracingChannel.wrap(fn[, options])` + +* `fn` {Function} Function to wrap +* `options` {Object} + * `context` {Object|Function} Context object or per-call context builder + * `wrapArgs` {Function} Rewrite arguments before the call + * `mapResult` {Function} Transform the immediate return value +* Returns: {Function} Wrapped function + +### Class: `CallbackTracingChannel` + +> Stability: 1 - Experimental + +`CallbackTracingChannel` wraps callback functions and callback arguments using +`asyncStart`, `asyncEnd`, and `error`. + +#### `callbackTracingChannel.wrap(callback[, options])` + +* `callback` {Function} Callback to wrap +* `options` {Object} + * `context` {Object|Function} Context object or callback-time context builder + * `mapOutcome` {Function} Map callback arguments to `{ error, result }` +* Returns: {Function} Wrapped callback + +#### `callbackTracingChannel.wrapArgs(args, index[, options])` + +* `args` {Array} Arguments array +* `index` {number} Index of the callback argument +* `options` {Object} + * `context` {Object|Function} Context object or callback-time context builder + * `mapOutcome` {Function} Map callback arguments to `{ error, result }` +* Returns: {Array} A shallow copy of `args` with the callback replaced + +### Class: `PromiseTracingChannel` + +> Stability: 1 - Experimental + +`PromiseTracingChannel` instruments a thenable or promise that has already been +produced and publishes `asyncStart`, `asyncEnd`, and `error` on settlement. + +#### `promiseTracingChannel.wrap(value[, options])` + +* `value` {any} Promise or thenable to wrap +* `options` {Object} + * `context` {Object|Function} Context object or context builder + * `mapResult` {Function} Transform the fulfilled value +* Returns: {any} The original value, or the result of calling `.then(...)` + +### Class: `SyncIteratorTracingChannel` + +> Stability: 1 - Experimental + +`SyncIteratorTracingChannel` instruments a synchronous iterator and publishes +`yield`, `return`, and `error` events for `next()`, `return()`, and `throw()`. + +#### `syncIteratorTracingChannel.wrap(iterator[, options])` + +* `iterator` {Object} Iterator to wrap +* `options` {Object} + * `context` {Object|Function} Context object or context builder + * `mapResult` {Function} Transform the produced `IteratorResult` +* Returns: {Object} Wrapped iterator + +### Class: `AsyncIteratorTracingChannel` + +> Stability: 1 - Experimental + +`AsyncIteratorTracingChannel` instruments an asynchronous iterator and publishes +`yield`, `return`, and `error` events for `next()`, `return()`, and `throw()`. + +#### `asyncIteratorTracingChannel.wrap(iterator[, options])` + +* `iterator` {Object} Async iterator to wrap +* `options` {Object} + * `context` {Object|Function} Context object or context builder + * `mapResult` {Function} Transform the produced `IteratorResult` +* Returns: {Object} Wrapped iterator + ### TracingChannel Channels A TracingChannel is a collection of several diagnostics\_channels representing @@ -1914,8 +2111,13 @@ Emitted when a new thread is created. [BoundedChannel Channels]: #boundedchannel-channels [TracingChannel Channels]: #tracingchannel-channels [`'uncaughtException'`]: process.md#event-uncaughtexception +[`AsyncIteratorTracingChannel`]: #class-asynciteratortracingchannel [`BoundedChannel`]: #class-boundedchannel [`TracingChannel`]: #class-tracingchannel +[`CallbackTracingChannel`]: #class-callbacktracingchannel +[`PromiseTracingChannel`]: #class-promisetracingchannel +[`SyncIteratorTracingChannel`]: #class-synciteratortracingchannel +[`SyncTracingChannel`]: #class-synctracingchannel [`asyncEnd` event]: #asyncendevent [`asyncStart` event]: #asyncstartevent [`boundedChannel.withScope(context)`]: #boundedchannelwithscopecontext diff --git a/doc/type-map.json b/doc/type-map.json index 4741264f60e46f..b3e416b402bdb7 100644 --- a/doc/type-map.json +++ b/doc/type-map.json @@ -6,6 +6,7 @@ "AsyncHook": "async_hooks.html#async_hookscreatehookoptions", "AsyncLocalStorage": "async_context.html#class-asynclocalstorage", "AsyncResource": "async_hooks.html#class-asyncresource", + "AsyncIteratorTracingChannel": "diagnostics_channel.html#class-asynciteratortracingchannel", "AesCbcParams": "webcrypto.html#class-aescbcparams", "AesCtrParams": "webcrypto.html#class-aesctrparams", "AesGcmParams": "webcrypto.html#class-aesgcmparams", @@ -16,6 +17,7 @@ "BroadcastChannel": "worker_threads.html#class-broadcastchannel-extends-eventtarget", "Buffer": "buffer.html#class-buffer", "ByteLengthQueuingStrategy": "webstreams.html#class-bytelengthqueuingstrategy", + "CallbackTracingChannel": "diagnostics_channel.html#class-callbacktracingchannel", "Channel": "diagnostics_channel.html#class-channel", "ChildProcess": "child_process.html#class-childprocess", "Cipher": "crypto.html#class-cipher", @@ -72,6 +74,7 @@ "PerformanceNodeTiming": "perf_hooks.html#class-performancenodetiming", "PerformanceObserver": "perf_hooks.html#class-performanceobserver", "PerformanceObserverEntryList": "perf_hooks.html#class-performanceobserverentrylist", + "PromiseTracingChannel": "diagnostics_channel.html#class-promisetracingchannel", "Readable": "stream.html#class-streamreadable", "ReadableByteStreamController": "webstreams.html#class-readablebytestreamcontroller", "ReadableStream": "webstreams.html#class-readablestream", @@ -92,6 +95,8 @@ "Sign": "crypto.html#class-sign", "Disposable": "https://tc39.es/proposal-explicit-resource-management/#sec-disposable-interface", "Session": "sqlite.html#class-session", + "SyncIteratorTracingChannel": "diagnostics_channel.html#class-synciteratortracingchannel", + "SyncTracingChannel": "diagnostics_channel.html#class-synctracingchannel", "StatementSync": "sqlite.html#class-statementsync", "Stream": "stream.html#stream", "SubtleCrypto": "webcrypto.html#class-subtlecrypto", diff --git a/lib/diagnostics_channel.js b/lib/diagnostics_channel.js index 8d2d374dc8e6ae..0d1dc430564a4b 100644 --- a/lib/diagnostics_channel.js +++ b/lib/diagnostics_channel.js @@ -7,6 +7,7 @@ const { ArrayPrototypePushApply, ArrayPrototypeSlice, ArrayPrototypeSplice, + FunctionPrototypeCall, ObjectDefineProperty, ObjectGetPrototypeOf, ObjectSetPrototypeOf, @@ -15,8 +16,10 @@ const { ReflectApply, SafeFinalizationRegistry, SafeMap, + SymbolAsyncIterator, SymbolDispose, SymbolHasInstance, + SymbolIterator, } = primordials; const { @@ -290,12 +293,34 @@ const boundedEvents = [ 'end', ]; +const traceEvents = [ + 'start', + 'end', + 'asyncStart', + 'asyncEnd', + 'error', +]; +const asyncTraceEvents = [ + 'asyncStart', + 'asyncEnd', + 'error', +]; +const iteratorTraceEvents = [ + 'yield', + 'return', + 'error', +]; + function assertChannel(value, name) { if (!(value instanceof Channel)) { throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value); } } +function isObjectLike(value) { + return typeof value === 'object' && value !== null; +} + function emitNonThenableWarning(fn) { process.emitWarning(`tracePromise was called with the function '${fn.name || ''}', ` + 'which returned a non-thenable.'); @@ -317,6 +342,16 @@ function channelFromMap(nameOrChannels, name, className) { nameOrChannels); } +function defineTracingChannels(target, nameOrChannels, eventNames) { + for (let i = 0; i < eventNames.length; ++i) { + const eventName = eventNames[i]; + ObjectDefineProperty(target, eventName, { + __proto__: null, + value: channelFromMap(nameOrChannels, eventName, 'TracingChannel'), + }); + } +} + class BoundedChannelScope { #context; #end; @@ -406,112 +441,572 @@ function boundedChannel(nameOrChannels) { return new BoundedChannel(nameOrChannels); } -class TracingChannel { - #callWindow; - #continuationWindow; +function normalizeContext(context, thisArg, args) { + if (context === undefined) { + return { __proto__: null }; + } + + if (typeof context === 'function') { + context = ReflectApply(context, thisArg, args); + if (context === undefined) { + return { __proto__: null }; + } + } + + if (isObjectLike(context)) { + return context; + } + + throw new ERR_INVALID_ARG_TYPE('options.context', ['Function', 'Object'], context); +} + +function getContextSection(context, name) { + if (!isObjectLike(context[name])) { + context[name] = { __proto__: null }; + } + + return context[name]; +} + +function normalizeCallbackOutcome(args, mapOutcome, thisArg) { + if (mapOutcome) { + return ReflectApply(mapOutcome, thisArg, args); + } + + const error = args[0]; + if (error != null) { + return { __proto__: null, error }; + } + + return { __proto__: null, result: args[1] }; +} + +function applyCallbackOutcome(callbackContext, outcome) { + if (!isObjectLike(outcome)) { + return; + } + + if ('error' in outcome) { + callbackContext.error = outcome.error; + } + + if ('result' in outcome) { + callbackContext.result = outcome.result; + } +} + +function tracingChannelHasSubscribers(target, eventNames) { + for (let i = 0; i < eventNames.length; ++i) { + if (target[eventNames[i]]?.hasSubscribers) { + return true; + } + } + + return false; +} + +function tracingChannelSubscribe(target, handlers, eventNames) { + for (let i = 0; i < eventNames.length; ++i) { + const name = eventNames[i]; + if (!handlers[name]) continue; + + target[name]?.subscribe(handlers[name]); + } +} + +function tracingChannelUnsubscribe(target, handlers, eventNames) { + let done = true; + + for (let i = 0; i < eventNames.length; ++i) { + const name = eventNames[i]; + if (!handlers[name]) continue; + if (!target[name]?.unsubscribe(handlers[name])) { + done = false; + } + } + + return done; +} + +class SyncTracingChannel extends BoundedChannel { constructor(nameOrChannels) { - // Create a BoundedChannel for start/end (call window) if (typeof nameOrChannels === 'string') { - this.#callWindow = new BoundedChannel(nameOrChannels); - this.#continuationWindow = new BoundedChannel({ + super(nameOrChannels); + } else { + super({ start: nameOrChannels.start, end: nameOrChannels.end }); + } + ObjectDefineProperty(this, 'error', { + __proto__: null, + value: channelFromMap(nameOrChannels, 'error', 'SyncTracingChannel'), + }); + } + + get hasSubscribers() { + return super.hasSubscribers || this.error?.hasSubscribers; + } + + subscribe(handlers) { + super.subscribe(handlers); + if (handlers.error) this.error?.subscribe(handlers.error); + } + + unsubscribe(handlers) { + let done = super.unsubscribe(handlers); + if (handlers.error && !this.error?.unsubscribe(handlers.error)) done = false; + return done; + } + + wrap(fn, options = { __proto__: null }) { + validateFunction(fn, 'fn'); + const { + context, + wrapArgs, + mapResult, + captureError, + captureResult, + } = options; + if (wrapArgs !== undefined) validateFunction(wrapArgs, 'options.wrapArgs'); + if (mapResult !== undefined) validateFunction(mapResult, 'options.mapResult'); + if (captureError !== undefined) validateFunction(captureError, 'options.captureError'); + if (captureResult !== undefined) validateFunction(captureResult, 'options.captureResult'); + const tracingChannel = this; + + return function wrapped(...args) { + const traceContext = normalizeContext(context, this, args); + const callContext = getContextSection(traceContext, 'call'); + + const invoke = () => { + const wrappedArgs = wrapArgs ? FunctionPrototypeCall(wrapArgs, this, args, traceContext) : args; + const result = ReflectApply(fn, this, wrappedArgs); + const finalResult = mapResult ? FunctionPrototypeCall(mapResult, this, result, traceContext) : result; + if (captureResult !== undefined) { + FunctionPrototypeCall(captureResult, this, finalResult, traceContext); + } + callContext.result = finalResult; + return finalResult; + }; + + if (!tracingChannel.hasSubscribers) { + return invoke(); + } + + // eslint-disable-next-line no-unused-vars + using scope = tracingChannel.withScope(traceContext); + try { + return invoke(); + } catch (err) { + if (captureError !== undefined) { + FunctionPrototypeCall(captureError, this, err, traceContext); + } + callContext.error = err; + tracingChannel.error.publish(traceContext); + throw err; + } + }; + } +} + +class CallbackTracingChannel extends BoundedChannel { + constructor(nameOrChannels) { + if (typeof nameOrChannels === 'string') { + super({ start: channel(`tracing:${nameOrChannels}:asyncStart`), end: channel(`tracing:${nameOrChannels}:asyncEnd`), }); - } else if (typeof nameOrChannels === 'object') { - this.#callWindow = new BoundedChannel({ - start: nameOrChannels.start, - end: nameOrChannels.end, - }); - this.#continuationWindow = new BoundedChannel({ + } else if (typeof nameOrChannels === 'object' && nameOrChannels !== null) { + super({ start: nameOrChannels.asyncStart, end: nameOrChannels.asyncEnd, }); + } else { + throw new ERR_INVALID_ARG_TYPE('nameOrChannels', + ['string', 'object', 'CallbackTracingChannel'], + nameOrChannels); } - - // Create individual channel for error + ObjectDefineProperty(this, 'asyncStart', { + __proto__: null, + get() { return this.start; }, + }); + ObjectDefineProperty(this, 'asyncEnd', { + __proto__: null, + get() { return this.end; }, + }); ObjectDefineProperty(this, 'error', { __proto__: null, - value: channelFromMap(nameOrChannels, 'error', 'TracingChannel'), + value: channelFromMap(nameOrChannels, 'error', 'CallbackTracingChannel'), }); } - get start() { - return this.#callWindow.start; + get hasSubscribers() { + return super.hasSubscribers || this.error?.hasSubscribers; } - get end() { - return this.#callWindow.end; + subscribe(handlers) { + super.subscribe({ start: handlers.asyncStart, end: handlers.asyncEnd }); + if (handlers.error) this.error?.subscribe(handlers.error); } - get asyncStart() { - return this.#continuationWindow.start; + unsubscribe(handlers) { + let done = super.unsubscribe({ start: handlers.asyncStart, end: handlers.asyncEnd }); + if (handlers.error && !this.error?.unsubscribe(handlers.error)) done = false; + return done; } - get asyncEnd() { - return this.#continuationWindow.end; + wrap(callback, options = { __proto__: null }) { + validateFunction(callback, 'callback'); + const { + context, + mapOutcome, + captureError, + captureResult, + } = options; + if (mapOutcome !== undefined) validateFunction(mapOutcome, 'options.mapOutcome'); + if (captureError !== undefined) validateFunction(captureError, 'options.captureError'); + if (captureResult !== undefined) validateFunction(captureResult, 'options.captureResult'); + const tracingChannel = this; + + return function wrapped(...args) { + const traceContext = normalizeContext(context, this, args); + const callbackContext = getContextSection(traceContext, 'callback'); + + try { + applyCallbackOutcome(callbackContext, normalizeCallbackOutcome(args, mapOutcome, this)); + if ('error' in callbackContext && captureError !== undefined) { + FunctionPrototypeCall(captureError, this, callbackContext.error, traceContext); + } + if ('result' in callbackContext && captureResult !== undefined) { + FunctionPrototypeCall(captureResult, this, callbackContext.result, traceContext); + } + } catch (err) { + callbackContext.error = err; + if (tracingChannel.hasSubscribers) { + tracingChannel.error.publish(traceContext); + } + throw err; + } + + if (tracingChannel.hasSubscribers && callbackContext.error != null) { + tracingChannel.error.publish(traceContext); + } + + if (!tracingChannel.hasSubscribers) { + return ReflectApply(callback, this, args); + } + + // Using withScope here enables manual context failure recovery + // eslint-disable-next-line no-unused-vars + using scope = tracingChannel.withScope(traceContext); + return ReflectApply(callback, this, args); + }; + } + + wrapArgs(args, index = -1, options = { __proto__: null }) { + const callback = ArrayPrototypeAt(args, index); + validateFunction(callback, 'callback'); + const wrappedArgs = ArrayPrototypeSlice(args); + ArrayPrototypeSplice(wrappedArgs, index, 1, this.wrap(callback, options)); + return wrappedArgs; + } +} + +class PromiseTracingChannel { + constructor(nameOrChannels) { + defineTracingChannels(this, nameOrChannels, asyncTraceEvents); } get hasSubscribers() { - return this.#callWindow.hasSubscribers || - this.#continuationWindow.hasSubscribers || - this.error?.hasSubscribers; + return tracingChannelHasSubscribers(this, asyncTraceEvents); } subscribe(handlers) { - // Subscribe to call window (start/end) - if (handlers.start || handlers.end) { - this.#callWindow.subscribe({ - start: handlers.start, - end: handlers.end, - }); + tracingChannelSubscribe(this, handlers, asyncTraceEvents); + } + + unsubscribe(handlers) { + return tracingChannelUnsubscribe(this, handlers, asyncTraceEvents); + } + + wrap(value, options = { __proto__: null }) { + const { + context, + mapResult, + captureError, + captureResult, + warningTarget, + } = options; + if (mapResult !== undefined) validateFunction(mapResult, 'options.mapResult'); + if (captureError !== undefined) validateFunction(captureError, 'options.captureError'); + if (captureResult !== undefined) validateFunction(captureResult, 'options.captureResult'); + const traceContext = normalizeContext(context, undefined, [value]); + const promiseContext = getContextSection(traceContext, 'promise'); + const tracingChannel = this; + + // If the return value is not a thenable, then return it with a warning. + // Do not publish to asyncStart/asyncEnd. + if (typeof value?.then !== 'function') { + emitNonThenableWarning(warningTarget ?? {}); + return value; + } + + function publishAsyncBoundary() { + tracingChannel.asyncStart.publish(traceContext); + // TODO: Is there a way to have asyncEnd _after_ the continuation? + tracingChannel.asyncEnd.publish(traceContext); + } + + function reject(err) { + if (captureError !== undefined) { + captureError(err, traceContext); + } + promiseContext.error = err; + if (tracingChannel.hasSubscribers) { + tracingChannel.error.publish(traceContext); + publishAsyncBoundary(); + } + throw err; } - // Subscribe to continuation window (asyncStart/asyncEnd) - if (handlers.asyncStart || handlers.asyncEnd) { - this.#continuationWindow.subscribe({ - start: handlers.asyncStart, - end: handlers.asyncEnd, + function resolve(result) { + try { + const finalResult = mapResult ? mapResult(result, traceContext) : result; + if (captureResult !== undefined) { + captureResult(finalResult, traceContext); + } + promiseContext.result = finalResult; + if (tracingChannel.hasSubscribers) { + publishAsyncBoundary(); + } + return finalResult; + } catch (err) { + if (captureError !== undefined) { + captureError(err, traceContext); + } + promiseContext.error = err; + if (tracingChannel.hasSubscribers) { + tracingChannel.error.publish(traceContext); + publishAsyncBoundary(); + } + throw err; + } + } + + return value.then(resolve, reject); + } +} + +function wrapIterator(tracingChannel, iterator, options, symbol) { + const { + context, + mapResult, + } = options; + if (mapResult !== undefined) validateFunction(mapResult, 'options.mapResult'); + const traceContext = normalizeContext(context, undefined, [iterator]); + const iteratorContext = getContextSection(traceContext, 'iterator'); + const wrapped = { __proto__: iterator }; + + function createMethod(name) { + const method = iterator?.[name]; + if (typeof method !== 'function') { + return; + } + + ObjectDefineProperty(wrapped, name, { + __proto__: null, + value(...args) { + iteratorContext.method = name; + iteratorContext.args = ArrayPrototypeSlice(args); + + function settleResult(result) { + try { + const finalResult = mapResult ? mapResult(result, traceContext) : result; + iteratorContext.result = finalResult; + if (tracingChannel.hasSubscribers) { + tracingChannel[finalResult?.done ? 'return' : 'yield'].publish(traceContext); + } + return finalResult; + } catch (err) { + iteratorContext.error = err; + if (tracingChannel.hasSubscribers) { + tracingChannel.error.publish(traceContext); + } + throw err; + } + } + + function handleError(err) { + iteratorContext.error = err; + if (tracingChannel.hasSubscribers) { + tracingChannel.error.publish(traceContext); + } + throw err; + } + + try { + const result = ReflectApply(method, iterator, args); + if (symbol === SymbolIterator || typeof result?.then !== 'function') { + return settleResult(result); + } + return result.then(settleResult, handleError); + } catch (err) { + return handleError(err); + } + }, + }); + } + + createMethod('next'); + createMethod('return'); + createMethod('throw'); + + ObjectDefineProperty(wrapped, symbol, { + __proto__: null, + value() { + return this; + }, + }); + + return wrapped; +} + +class SyncIteratorTracingChannel { + constructor(nameOrChannels) { + if (typeof nameOrChannels === 'string') { + ObjectDefineProperty(this, 'yield', { + __proto__: null, + value: channel(`tracing:${nameOrChannels}:syncYield`), }); + ObjectDefineProperty(this, 'return', { + __proto__: null, + value: channel(`tracing:${nameOrChannels}:return`), + }); + ObjectDefineProperty(this, 'error', { + __proto__: null, + value: channel(`tracing:${nameOrChannels}:error`), + }); + return; } - // Subscribe to error channel - if (handlers.error) { - this.error.subscribe(handlers.error); + if (typeof nameOrChannels === 'object' && nameOrChannels !== null) { + ObjectDefineProperty(this, 'yield', { + __proto__: null, + value: channelFromMap(nameOrChannels, 'yield', 'SyncIteratorTracingChannel'), + }); + ObjectDefineProperty(this, 'return', { + __proto__: null, + value: channelFromMap(nameOrChannels, 'return', 'SyncIteratorTracingChannel'), + }); + ObjectDefineProperty(this, 'error', { + __proto__: null, + value: channelFromMap(nameOrChannels, 'error', 'SyncIteratorTracingChannel'), + }); + return; } + + throw new ERR_INVALID_ARG_TYPE('nameOrChannels', + ['string', 'object', 'SyncIteratorTracingChannel'], + nameOrChannels); + } + + get hasSubscribers() { + return tracingChannelHasSubscribers(this, iteratorTraceEvents); + } + + subscribe(handlers) { + tracingChannelSubscribe(this, handlers, iteratorTraceEvents); } unsubscribe(handlers) { - let done = true; + return tracingChannelUnsubscribe(this, handlers, iteratorTraceEvents); + } - // Unsubscribe from call window - if (handlers.start || handlers.end) { - if (!this.#callWindow.unsubscribe({ - start: handlers.start, - end: handlers.end, - })) { - done = false; - } - } + wrap(iterator, options = { __proto__: null }) { + return wrapIterator(this, iterator, options, SymbolIterator); + } +} - // Unsubscribe from continuation window - if (handlers.asyncStart || handlers.asyncEnd) { - if (!this.#continuationWindow.unsubscribe({ - start: handlers.asyncStart, - end: handlers.asyncEnd, - })) { - done = false; - } +class AsyncIteratorTracingChannel { + constructor(nameOrChannels) { + if (typeof nameOrChannels === 'string') { + ObjectDefineProperty(this, 'yield', { + __proto__: null, + value: channel(`tracing:${nameOrChannels}:asyncYield`), + }); + ObjectDefineProperty(this, 'return', { + __proto__: null, + value: channel(`tracing:${nameOrChannels}:return`), + }); + ObjectDefineProperty(this, 'error', { + __proto__: null, + value: channel(`tracing:${nameOrChannels}:error`), + }); + return; } - // Unsubscribe from error channel - if (handlers.error) { - if (!this.error.unsubscribe(handlers.error)) { - done = false; - } + if (typeof nameOrChannels === 'object' && nameOrChannels !== null) { + ObjectDefineProperty(this, 'yield', { + __proto__: null, + value: channelFromMap(nameOrChannels, 'yield', 'AsyncIteratorTracingChannel'), + }); + ObjectDefineProperty(this, 'return', { + __proto__: null, + value: channelFromMap(nameOrChannels, 'return', 'AsyncIteratorTracingChannel'), + }); + ObjectDefineProperty(this, 'error', { + __proto__: null, + value: channelFromMap(nameOrChannels, 'error', 'AsyncIteratorTracingChannel'), + }); + return; } - return done; + throw new ERR_INVALID_ARG_TYPE('nameOrChannels', + ['string', 'object', 'AsyncIteratorTracingChannel'], + nameOrChannels); + } + + get hasSubscribers() { + return tracingChannelHasSubscribers(this, iteratorTraceEvents); + } + + subscribe(handlers) { + tracingChannelSubscribe(this, handlers, iteratorTraceEvents); + } + + unsubscribe(handlers) { + return tracingChannelUnsubscribe(this, handlers, iteratorTraceEvents); + } + + wrap(iterator, options = { __proto__: null }) { + return wrapIterator(this, iterator, options, SymbolAsyncIterator); + } +} + +class TracingChannel { + #syncChannel; + #callbackChannel; + + constructor(nameOrChannels) { + defineTracingChannels(this, nameOrChannels, traceEvents); + + this.#syncChannel = new SyncTracingChannel(this); + this.#callbackChannel = new CallbackTracingChannel(this); + } + + get hasSubscribers() { + return tracingChannelHasSubscribers(this, traceEvents); + } + + subscribe(handlers) { + tracingChannelSubscribe(this, handlers, traceEvents); + } + + unsubscribe(handlers) { + return tracingChannelUnsubscribe(this, handlers, traceEvents); + } + + wrap(fn, options = { __proto__: null }) { + validateFunction(fn, 'fn'); + return this.#syncChannel.wrap(fn, options); } traceSync(fn, context = {}, thisArg, ...args) { @@ -522,7 +1017,7 @@ class TracingChannel { const { error } = this; // eslint-disable-next-line no-unused-vars - using scope = this.#callWindow.withScope(context); + using scope = this.#syncChannel.withScope(context); try { const result = ReflectApply(fn, thisArg, args); context.result = result; @@ -544,29 +1039,30 @@ class TracingChannel { } const { error } = this; - const continuationWindow = this.#continuationWindow; + // Use #callbackChannel (start=asyncStart, end=asyncEnd) for the continuation + // window so that asyncStart stores are properly entered on promise settlement, + // matching the backward-compatible behaviour of the upstream tracePromise. + const callbackChannel = this.#callbackChannel; function reject(err) { context.error = err; error.publish(context); - // Use continuation window for asyncStart/asyncEnd // eslint-disable-next-line no-unused-vars - using scope = continuationWindow.withScope(context); + using scope = callbackChannel.withScope(context); // TODO: Is there a way to have asyncEnd _after_ the continuation? return PromiseReject(err); } function resolve(result) { context.result = result; - // Use continuation window for asyncStart/asyncEnd // eslint-disable-next-line no-unused-vars - using scope = continuationWindow.withScope(context); + using scope = callbackChannel.withScope(context); // TODO: Is there a way to have asyncEnd _after_ the continuation? return result; } // eslint-disable-next-line no-unused-vars - using scope = this.#callWindow.withScope(context); + using scope = this.#syncChannel.withScope(context); try { const result = ReflectApply(fn, thisArg, args); // If the return value is not a thenable, return it directly with a warning. @@ -595,7 +1091,7 @@ class TracingChannel { } const { error } = this; - const continuationWindow = this.#continuationWindow; + const callbackChannel = this.#callbackChannel; function wrappedCallback(err, res) { if (err) { @@ -605,9 +1101,8 @@ class TracingChannel { context.result = res; } - // Use continuation window for asyncStart/asyncEnd around callback // eslint-disable-next-line no-unused-vars - using scope = continuationWindow.withScope(context); + using scope = callbackChannel.withScope(context); return ReflectApply(callback, this, arguments); } @@ -616,7 +1111,7 @@ class TracingChannel { ArrayPrototypeSplice(args, position, 1, wrappedCallback); // eslint-disable-next-line no-unused-vars - using scope = this.#callWindow.withScope(context); + using scope = this.#syncChannel.withScope(context); try { return ReflectApply(fn, thisArg, args); } catch (err) { @@ -631,15 +1126,40 @@ function tracingChannel(nameOrChannels) { return new TracingChannel(nameOrChannels); } +function syncTracingChannel(nameOrChannels) { + return new SyncTracingChannel(nameOrChannels); +} + +function callbackTracingChannel(nameOrChannels) { + return new CallbackTracingChannel(nameOrChannels); +} + +function promiseTracingChannel(nameOrChannels) { + return new PromiseTracingChannel(nameOrChannels); +} + +function syncIteratorTracingChannel(nameOrChannels) { + return new SyncIteratorTracingChannel(nameOrChannels); +} + +function asyncIteratorTracingChannel(nameOrChannels) { + return new AsyncIteratorTracingChannel(nameOrChannels); +} + dc_binding.linkNativeChannel((name) => channel(name)); module.exports = { + asyncIteratorTracingChannel, + boundedChannel, + callbackTracingChannel, channel, hasSubscribers, + promiseTracingChannel, subscribe, + syncIteratorTracingChannel, + syncTracingChannel, tracingChannel, unsubscribe, - boundedChannel, - Channel, BoundedChannel, + Channel, }; diff --git a/test/parallel/test-diagnostics-channel-callback-tracing-channel-wrap.js b/test/parallel/test-diagnostics-channel-callback-tracing-channel-wrap.js new file mode 100644 index 00000000000000..e0ef8f94b39494 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-callback-tracing-channel-wrap.js @@ -0,0 +1,118 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.callbackTracingChannel('test'); + +assert.strictEqual(channel.asyncStart.name, 'tracing:test:asyncStart'); +assert.strictEqual(channel.asyncEnd.name, 'tracing:test:asyncEnd'); +assert.strictEqual(channel.error.name, 'tracing:test:error'); + +const callbackThis = { name: 'callback-this' }; +const result = { ok: true }; +const args = [() => {}, 'value']; + +const handlers = { + asyncStart: common.mustCall((context) => { + assert.strictEqual(context.callback.result, result); + assert.strictEqual(context.kind, 'wrapped'); + }), + asyncEnd: common.mustCall((context) => { + assert.strictEqual(context.callback.result, result); + assert.strictEqual(context.kind, 'wrapped'); + }), + error: common.mustNotCall(), +}; + +channel.subscribe(handlers); + +const wrapped = channel.wrap(common.mustCall(function(err, value) { + assert.strictEqual(this, callbackThis); + assert.strictEqual(err, null); + assert.strictEqual(value, result); +}), { + context: { kind: 'wrapped' }, +}); + +wrapped.call(callbackThis, null, result); + +const wrappedArgs = channel.wrapArgs(args, 0, { + context(first, second) { + return { + first, + second, + }; + }, +}); + +assert.notStrictEqual(wrappedArgs, args); +assert.strictEqual(wrappedArgs[1], 'value'); + +channel.unsubscribe(handlers); + +const wrappedArgsChannel = dc.callbackTracingChannel('args'); + +wrappedArgsChannel.subscribe({ + asyncStart: common.mustCall((context) => { + assert.strictEqual(context.first, 'first'); + assert.strictEqual(context.second, 'second'); + assert.strictEqual(context.callback.error, 'first'); + assert.strictEqual(context.callback.result, 'second'); + }), + asyncEnd: common.mustCall((context) => { + assert.strictEqual(context.first, 'first'); + assert.strictEqual(context.second, 'second'); + assert.strictEqual(context.callback.error, 'first'); + assert.strictEqual(context.callback.result, 'second'); + }), + error: common.mustCall((context) => { + assert.strictEqual(context.callback.error, 'first'); + }), +}); + +const mapped = dc.callbackTracingChannel('mapped'); + +mapped.subscribe({ + asyncStart: common.mustCall((context) => { + assert.strictEqual(context.callback.result, 'second'); + assert.strictEqual(context.callback.error, 'first'); + }), + asyncEnd: common.mustCall((context) => { + assert.strictEqual(context.callback.result, 'second'); + assert.strictEqual(context.callback.error, 'first'); + }), + error: common.mustCall((context) => { + assert.strictEqual(context.callback.error, 'first'); + }), +}); + +const argsCallback = wrappedArgsChannel.wrapArgs(args, 0, { + context: common.mustCall(function(first, second) { + assert.strictEqual(this, callbackThis); + return { + first, + second, + }; + }), + mapOutcome(first, second) { + return { + error: first, + result: second, + }; + }, +}); + +argsCallback[0].call(callbackThis, 'first', 'second'); + +const mappedCallback = mapped.wrap(common.mustCall(), { + mapOutcome(first, second) { + return { + error: first, + result: second, + }; + }, +}); + +mappedCallback('first', 'second'); diff --git a/test/parallel/test-diagnostics-channel-iterator-tracing-channel-wrap.js b/test/parallel/test-diagnostics-channel-iterator-tracing-channel-wrap.js new file mode 100644 index 00000000000000..8a4e48da8eb99c --- /dev/null +++ b/test/parallel/test-diagnostics-channel-iterator-tracing-channel-wrap.js @@ -0,0 +1,90 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +{ + const channel = dc.syncIteratorTracingChannel('test'); + + assert.strictEqual(channel.yield.name, 'tracing:test:syncYield'); + assert.strictEqual(channel.return.name, 'tracing:test:return'); + assert.strictEqual(channel.error.name, 'tracing:test:error'); + + function* values() { + yield 1; + return 2; + } + + const wrapped = channel.wrap(values(), { + context: { type: 'sync' }, + }); + + channel.subscribe({ + yield: common.mustCall((context) => { + assert.strictEqual(context.type, 'sync'); + assert.strictEqual(context.iterator.method, 'next'); + assert.deepStrictEqual(context.iterator.args, []); + assert.strictEqual(context.iterator.result.value, 1); + assert.strictEqual(context.iterator.result.done, false); + assert.strictEqual('step' in context.iterator, false); + assert.strictEqual('yieldIndex' in context.iterator, false); + assert.strictEqual('kind' in context.iterator, false); + assert.strictEqual('done' in context.iterator, false); + }), + return: common.mustCall((context) => { + assert.strictEqual(context.type, 'sync'); + assert.strictEqual(context.iterator.method, 'next'); + assert.deepStrictEqual(context.iterator.args, []); + assert.strictEqual(context.iterator.result.value, 2); + assert.strictEqual(context.iterator.result.done, true); + }), + error: common.mustNotCall(), + }); + + assert.deepStrictEqual(wrapped.next(), { value: 1, done: false }); + assert.deepStrictEqual(wrapped.next(), { value: 2, done: true }); +} + +{ + const channel = dc.asyncIteratorTracingChannel('async'); + + assert.strictEqual(channel.yield.name, 'tracing:async:asyncYield'); + assert.strictEqual(channel.return.name, 'tracing:async:return'); + assert.strictEqual(channel.error.name, 'tracing:async:error'); + + const expectedError = new Error('boom'); + + async function* values() { + yield 1; + throw expectedError; + } + + const wrapped = channel.wrap(values(), { + context: { type: 'async' }, + }); + + channel.subscribe({ + yield: common.mustCall((context) => { + assert.strictEqual(context.type, 'async'); + assert.strictEqual(context.iterator.method, 'next'); + assert.strictEqual(context.iterator.result.value, 1); + assert.strictEqual(context.iterator.result.done, false); + }), + return: common.mustNotCall(), + error: common.mustCall((context) => { + assert.strictEqual(context.type, 'async'); + assert.strictEqual(context.iterator.method, 'next'); + assert.strictEqual(context.iterator.error, expectedError); + }), + }); + + (async () => { + const result = await wrapped.next(); + assert.deepStrictEqual(result, { value: 1, done: false }); + await assert.rejects(wrapped.next(), (err) => { + assert.strictEqual(err, expectedError); + return true; + }); + })().then(common.mustCall()); +} diff --git a/test/parallel/test-diagnostics-channel-promise-tracing-channel-wrap.js b/test/parallel/test-diagnostics-channel-promise-tracing-channel-wrap.js new file mode 100644 index 00000000000000..78aa59447b7455 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-promise-tracing-channel-wrap.js @@ -0,0 +1,49 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.promiseTracingChannel('test'); +const callbackChannel = dc.callbackTracingChannel('test'); + +assert.strictEqual(channel.asyncStart.name, 'tracing:test:asyncStart'); +assert.strictEqual(channel.asyncEnd.name, 'tracing:test:asyncEnd'); +assert.strictEqual(channel.error.name, 'tracing:test:error'); +assert.strictEqual(channel.asyncStart, callbackChannel.asyncStart); +assert.strictEqual(channel.asyncEnd, callbackChannel.asyncEnd); +assert.strictEqual(channel.error, callbackChannel.error); + +const contexts = []; + +channel.subscribe({ + asyncStart: common.mustCall((context) => { + contexts.push(context); + assert.strictEqual(context.promise.result, 2); + assert.strictEqual(context.kind, 'promise'); + }), + asyncEnd: common.mustCall((context) => { + assert.strictEqual(context.promise.result, 2); + assert.strictEqual(context.kind, 'promise'); + }), + error: common.mustNotCall(), +}); + +channel.wrap(Promise.resolve(1), { + context: { kind: 'promise' }, + mapResult(result) { + return result + 1; + }, +}).then(common.mustCall((result) => { + assert.strictEqual(result, 2); + assert.strictEqual(contexts.length, 1); +})); + +process.on('warning', common.mustCall((warning) => { + assert.strictEqual( + warning.message, + "tracePromise was called with the function '', which returned a non-thenable.", + ); +})); + +assert.strictEqual(channel.wrap(1), 1); diff --git a/test/parallel/test-diagnostics-channel-sync-tracing-channel-wrap.js b/test/parallel/test-diagnostics-channel-sync-tracing-channel-wrap.js new file mode 100644 index 00000000000000..e588cb58ba7811 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-sync-tracing-channel-wrap.js @@ -0,0 +1,53 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.syncTracingChannel('test'); + +assert.strictEqual(channel.start.name, 'tracing:test:start'); +assert.strictEqual(channel.end.name, 'tracing:test:end'); +assert.strictEqual(channel.error.name, 'tracing:test:error'); + +const sharedContext = { type: 'seed' }; +const thisArg = { name: 'receiver' }; +const contexts = []; + +channel.subscribe({ + start: common.mustCall((context) => { + contexts.push(context); + }, 3), + end: common.mustCall((context) => { + if (context.type === 'seed') { + assert.strictEqual(context.call.result, 'ok'); + return; + } + + assert.strictEqual(context.call.result, context.value + 1); + }, 3), + error: common.mustNotCall(), +}); + +const wrapped = channel.wrap(common.mustCall(function(value) { + assert.strictEqual(this, thisArg); + return value + 1; +}, 2), { + context: common.mustCall(function(value) { + assert.strictEqual(this, thisArg); + return { value }; + }, 2), +}); + +assert.strictEqual(wrapped.call(thisArg, 1), 2); +assert.strictEqual(wrapped.call(thisArg, 2), 3); + +assert.notStrictEqual(contexts[0], contexts[1]); +assert.deepStrictEqual(sharedContext, { type: 'seed' }); + +const seeded = channel.wrap(common.mustCall(() => 'ok'), { + context: sharedContext, +}); + +assert.strictEqual(seeded(), 'ok'); +assert.strictEqual(contexts[2], sharedContext); diff --git a/test/parallel/test-diagnostics-channel-tracing-channel-wrap-composition.js b/test/parallel/test-diagnostics-channel-tracing-channel-wrap-composition.js new file mode 100644 index 00000000000000..ad9c603e6ca012 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-tracing-channel-wrap-composition.js @@ -0,0 +1,149 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const channel = dc.tracingChannel('composed'); +const callbackChannel = dc.callbackTracingChannel(channel); +const promiseChannel = dc.promiseTracingChannel(channel); + +let callbackContext; +let promiseContext; +let resolveCallback; +const callbackComplete = new Promise((resolve) => { + resolveCallback = resolve; +}); + +assert.strictEqual('callback' in channel, false); +assert.strictEqual('promise' in channel, false); +assert.strictEqual('sync' in channel, false); +assert.strictEqual('syncIterator' in channel, false); +assert.strictEqual('asyncIterator' in channel, false); + +assert.strictEqual(callbackChannel.asyncStart, promiseChannel.asyncStart); +assert.strictEqual(callbackChannel.asyncEnd, promiseChannel.asyncEnd); +assert.strictEqual(callbackChannel.error, promiseChannel.error); +assert.strictEqual(callbackChannel.asyncStart.name, 'tracing:composed:asyncStart'); +assert.strictEqual(promiseChannel.asyncEnd.name, 'tracing:composed:asyncEnd'); + +callbackChannel.subscribe({ + asyncStart: common.mustCall((context) => { + assert.strictEqual(context.label, 'hybrid'); + if (context.callback) { + callbackContext = context; + assert.strictEqual(context.callback.result, 'callback-result'); + return; + } + + if (context.promise) { + promiseContext = context; + return; + } + + assert.fail('Unexpected asyncStart context'); + }, 2), + asyncEnd: common.mustCall((context) => { + if (context.callback) { + assert.strictEqual(context.callback.result, 'callback-result'); + return; + } + + if (context.promise) { + assert.strictEqual(context.promise.result, 'promise-result'); + return; + } + + assert.fail('Unexpected asyncEnd context'); + }, 2), + error: common.mustNotCall(), +}); + +const wrapped = channel.wrap(function(callback) { + if (typeof callback === 'function') { + setImmediate(callback, null, 'callback-result'); + } + return Promise.resolve('promise-result'); +}, { + context() { + return { + label: 'hybrid', + }; + }, + wrapArgs(args, context) { + if (typeof args[0] !== 'function') { + return args; + } + + return callbackChannel.wrapArgs(args, 0, { + context: () => context, + }); + }, + mapResult(result, context) { + return promiseChannel.wrap(result, { + context: () => context, + }); + }, +}); + +const wrappedResult = wrapped(common.mustCall((err, result) => { + assert.strictEqual(err, null); + assert.strictEqual(result, 'callback-result'); + resolveCallback(); +})); + +Promise.all([ + wrappedResult, + callbackComplete, +]).then(common.mustCall(([result]) => { + assert.strictEqual(result, 'promise-result'); + assert.strictEqual(callbackContext, promiseContext); +})); + +const iteratorPromiseChannel = dc.promiseTracingChannel('iterator'); +const iteratorChannel = dc.syncIteratorTracingChannel('iterator'); + +let settledContext; +let yieldedContext; + +iteratorPromiseChannel.subscribe({ + asyncStart: common.mustCall((context) => { + settledContext = context; + }), + asyncEnd: common.mustCall(), + error: common.mustNotCall(), +}); + +iteratorChannel.subscribe({ + yield: common.mustCall((context) => { + yieldedContext = context; + assert.strictEqual(context.iterator.result.value, 1); + }), + return: common.mustCall((context) => { + assert.strictEqual(context.iterator.result.value, 2); + assert.strictEqual(context, settledContext); + }), + error: common.mustNotCall(), +}); + +function* values() { + yield 1; + return 2; +} + +iteratorPromiseChannel.wrap(Promise.resolve(values()), { + context() { + return { + label: 'iterator', + }; + }, + mapResult(iterator, context) { + return iteratorChannel.wrap(iterator, { + context: () => context, + }); + }, +}).then(common.mustCall((iterator) => { + assert.strictEqual(iterator.next().value, 1); + assert.strictEqual(iterator.next().value, 2); + assert.strictEqual(settledContext, yieldedContext); +}));