Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions doc/api/diagnostics_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,20 @@ passed to `console.warn()`.
Emitted when `console.error()` is called. Receives and array of the arguments
passed to `console.error()`.

#### Diagnotics Channel

> Stability: 1 - Experimental
<!-- YAML
added: REPLACEME
-->

##### Event: `'diagnostics_channel.subscribe'`

* `args` {any\[]}

Emitted when `diagnostics_channel.subcribe()` or `channel.subscribe()` is called. Receives an object with the channel that was subscribed to, and the subscription function.

#### HTTP

> Stability: 1 - Experimental
Expand Down
32 changes: 30 additions & 2 deletions lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const { triggerUncaughtException } = internalBinding('errors');

const { WeakReference } = require('internal/util');

const dcSubscribeChannel = dc.channel('diagnostics_channel.subscribe');
const dcUnsubscribeChannel = dc.channel('diagnostics_channel.unsubscribe');
const dcPublishChannel = dc.channel('diagnostics_channel.publish');

// Can't delete when weakref count reaches 0 as it could increment again.
// Only GC can be used as a valid time to clean up the channels map.
class WeakRefMap extends SafeMap {
Expand Down Expand Up @@ -105,12 +109,21 @@ function wrapStoreRun(store, data, next, transform = defaultTransform) {
class ActiveChannel {
subscribe(subscription) {
validateFunction(subscription, 'subscription');

if (dcSubscribeChannel.hasSubscribers) {
dcSubscribeChannel.publish({ channel: this, subscription });
}

this._subscribers = ArrayPrototypeSlice(this._subscribers);
ArrayPrototypePush(this._subscribers, subscription);
channels.incRef(this.name);
}

unsubscribe(subscription) {
if (dcUnsubscribeChannel.hasSubscribers) {
dcUnsubscribeChannel.publish({ channel: this, subscription });
}

const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
if (index === -1) return false;

Expand Down Expand Up @@ -148,7 +161,7 @@ class ActiveChannel {
return true;
}

publish(data) {
_publish(data) {
const subscribers = this._subscribers;
for (let i = 0; i < (subscribers?.length || 0); i++) {
try {
Expand All @@ -162,6 +175,14 @@ class ActiveChannel {
}
}

publish(data) {
if (dcPublishChannel.hasSubscribers) {
dcPublishChannel._publish({ channel: this, data });
}

this._publish(data);
}

runStores(data, fn, thisArg, ...args) {
let run = () => {
this.publish(data);
Expand Down Expand Up @@ -199,6 +220,9 @@ class Channel {
}

unsubscribe() {
if (dcUnsubscribeChannel.hasSubscribers) {
dcUnsubscribeChannel.publish({ channel: this, subscription });
}
return false;
}

Expand All @@ -215,7 +239,11 @@ class Channel {
return false;
}

publish() {}
publish(data) {
if (dcUnsubscribeChannel.hasSubscribers) {
dcUnsubscribeChannel.publish({ channel: this, subscription });
}
}

runStores(data, fn, thisArg, ...args) {
return ReflectApply(fn, thisArg, args);
Expand Down
35 changes: 35 additions & 0 deletions test/parallel/test-diagnostics-channel-meta-channels.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict';

const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');

const testedChannel = dc.channel('test');
const testedSubscription = () => {};
const testedData = { foo: 'bar' };

// should publish on meta channel for subscribe() on both inactive and active prototype
dc.subscribe('diagnostics_channel.subscribe', common.mustCall(({ channel, subscription }) => {
assert.strictEqual(channel, testedChannel);
assert.strictEqual(subscription, testedSubscription);
}, 2)); // called twice
testedChannel.subscribe(testedSubscription); // inactive prototype
testedChannel.subscribe(testedSubscription); // active prototype

// should publish on meta channel for publish()
dc.subscribe('diagnostics_channel.publish', common.mustCall(({ channel, data }) => {
assert.strictEqual(channel, testedChannel);
assert.strictEqual(data, testedData);
}));
testedChannel.publish(testedData);

// should publish on meta channel for unsubscribe() on both inactive and active prototype
dc.subscribe('diagnostics_channel.unsubscribe', common.mustCall(({ channel, subscription }) => {
assert.strictEqual(channel, testedChannel);
assert.strictEqual(subscription, testedSubscription);
}, 2)); // called twice
testedChannel.unsubscribe(testedSubscription); // active prototype
testedChannel.unsubscribe(testedSubscription); // inactive prototype


// TODO: should it publish on inactive channels ?