Skip to content

Commit 5089c6b

Browse files
fix: add AbortSignal to waitFor() and send trailer on stream abort (#636)
* fix: add AbortSignal to waitFor() and send trailer on stream abort * chore: add changeset * fix: abort disconnectController on server-initiated disconnect * Pass abort reason directly instead of wrapping in Error
1 parent 7f7cae9 commit 5089c6b

4 files changed

Lines changed: 147 additions & 52 deletions

File tree

.changeset/kind-foxes-wave.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@livekit/rtc-node': patch
3+
---
4+
5+
Add AbortSignal to waitFor() to clean up listeners on disconnect and send trailer on stream abort

packages/livekit-rtc/src/ffi_client.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,37 @@ export class FfiClient extends (EventEmitter as new () => TypedEmitter<FfiClient
7070
return livekitRetrievePtr(data);
7171
}
7272

73-
async waitFor<T>(predicate: (ev: FfiEvent) => boolean): Promise<T> {
74-
return new Promise<T>((resolve) => {
73+
async waitFor<T>(
74+
predicate: (ev: FfiEvent) => boolean,
75+
options?: { signal?: AbortSignal },
76+
): Promise<T> {
77+
return new Promise<T>((resolve, reject) => {
7578
const listener = (ev: FfiEvent) => {
7679
if (predicate(ev)) {
77-
this.off(FfiClientEvent.FfiEvent, listener);
80+
cleanup();
7881
resolve(ev.message.value as T);
7982
}
8083
};
84+
85+
const cleanup = () => {
86+
this.off(FfiClientEvent.FfiEvent, listener);
87+
options?.signal?.removeEventListener('abort', onAbort);
88+
};
89+
90+
// If an AbortSignal is provided, remove the listener when the signal
91+
// fires so that pending waitFor() calls don't leak listeners after
92+
// the room disconnects or the operation is cancelled.
93+
const onAbort = () => {
94+
cleanup();
95+
reject(options?.signal?.reason ?? new Error('waitFor aborted'));
96+
};
97+
98+
if (options?.signal?.aborted) {
99+
reject(options.signal.reason ?? new Error('waitFor aborted'));
100+
return;
101+
}
102+
103+
options?.signal?.addEventListener('abort', onAbort);
81104
this.on(FfiClientEvent.FfiEvent, listener);
82105
});
83106
}

packages/livekit-rtc/src/participant.ts

Lines changed: 100 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,16 @@ export class LocalParticipant extends Participant {
157157

158158
private ffiEventLock: Mutex;
159159

160+
// Signal that fires when the owning Room disconnects, used to cancel
161+
// pending FfiClient.waitFor() listeners so they don't leak.
162+
private disconnectSignal: AbortSignal;
163+
160164
trackPublications: Map<string, LocalTrackPublication> = new Map();
161165

162-
constructor(info: OwnedParticipant, ffiEventLock: Mutex) {
166+
constructor(info: OwnedParticipant, ffiEventLock: Mutex, disconnectSignal: AbortSignal) {
163167
super(info);
164168
this.ffiEventLock = ffiEventLock;
169+
this.disconnectSignal = disconnectSignal;
165170
}
166171

167172
async publishData(data: Uint8Array, options: DataPublishOptions) {
@@ -178,9 +183,10 @@ export class LocalParticipant extends Participant {
178183
message: { case: 'publishData', value: req },
179184
});
180185

181-
const cb = await FfiClient.instance.waitFor<PublishDataCallback>((ev) => {
182-
return ev.message.case == 'publishData' && ev.message.value.asyncId == res.asyncId;
183-
});
186+
const cb = await FfiClient.instance.waitFor<PublishDataCallback>(
187+
(ev) => ev.message.case == 'publishData' && ev.message.value.asyncId == res.asyncId,
188+
{ signal: this.disconnectSignal },
189+
);
184190

185191
if (cb.error) {
186192
throw new Error(cb.error);
@@ -198,9 +204,10 @@ export class LocalParticipant extends Participant {
198204
message: { case: 'publishSipDtmf', value: req },
199205
});
200206

201-
const cb = await FfiClient.instance.waitFor<PublishSipDtmfCallback>((ev) => {
202-
return ev.message.case == 'publishSipDtmf' && ev.message.value.asyncId == res.asyncId;
203-
});
207+
const cb = await FfiClient.instance.waitFor<PublishSipDtmfCallback>(
208+
(ev) => ev.message.case == 'publishSipDtmf' && ev.message.value.asyncId == res.asyncId,
209+
{ signal: this.disconnectSignal },
210+
);
204211

205212
if (cb.error) {
206213
throw new Error(cb.error);
@@ -229,9 +236,10 @@ export class LocalParticipant extends Participant {
229236
message: { case: 'publishTranscription', value: req },
230237
});
231238

232-
const cb = await FfiClient.instance.waitFor<PublishTranscriptionCallback>((ev) => {
233-
return ev.message.case == 'publishTranscription' && ev.message.value.asyncId == res.asyncId;
234-
});
239+
const cb = await FfiClient.instance.waitFor<PublishTranscriptionCallback>(
240+
(ev) => ev.message.case == 'publishTranscription' && ev.message.value.asyncId == res.asyncId,
241+
{ signal: this.disconnectSignal },
242+
);
235243

236244
if (cb.error) {
237245
throw new Error(cb.error);
@@ -248,9 +256,10 @@ export class LocalParticipant extends Participant {
248256
message: { case: 'setLocalMetadata', value: req },
249257
});
250258

251-
await FfiClient.instance.waitFor<SetLocalMetadataCallback>((ev) => {
252-
return ev.message.case == 'setLocalMetadata' && ev.message.value.asyncId == res.asyncId;
253-
});
259+
await FfiClient.instance.waitFor<SetLocalMetadataCallback>(
260+
(ev) => ev.message.case == 'setLocalMetadata' && ev.message.value.asyncId == res.asyncId,
261+
{ signal: this.disconnectSignal },
262+
);
254263
}
255264

256265
/**
@@ -335,8 +344,24 @@ export class LocalParticipant extends Participant {
335344
});
336345
await sendTrailer(trailerReq);
337346
},
338-
abort(err) {
347+
// Send a trailer with the error reason so the remote side's stream
348+
// controller is closed instead of waiting for data that won't arrive.
349+
async abort(err) {
339350
log.error(err, 'Sink Error');
351+
try {
352+
const trailerReq = new SendStreamTrailerRequest({
353+
senderIdentity,
354+
localParticipantHandle: localHandle,
355+
destinationIdentities,
356+
trailer: new DataStream_Trailer({
357+
streamId,
358+
reason: err instanceof Error ? err.message : String(err ?? ''),
359+
}),
360+
});
361+
await sendTrailer(trailerReq);
362+
} catch {
363+
// Best-effort: the connection may already be gone.
364+
}
340365
},
341366
});
342367

@@ -450,8 +475,24 @@ export class LocalParticipant extends Participant {
450475
});
451476
await sendTrailer(trailerReq);
452477
},
453-
abort(err) {
478+
// Send a trailer with the error reason so the remote side's stream
479+
// controller is closed instead of waiting for data that won't arrive.
480+
async abort(err) {
454481
log.error(err, 'Sink error');
482+
try {
483+
const trailerReq = new SendStreamTrailerRequest({
484+
senderIdentity,
485+
localParticipantHandle: localHandle,
486+
destinationIdentities,
487+
trailer: new DataStream_Trailer({
488+
streamId,
489+
reason: err instanceof Error ? err.message : String(err ?? ''),
490+
}),
491+
});
492+
await sendTrailer(trailerReq);
493+
} catch {
494+
// Best-effort: the connection may already be gone.
495+
}
455496
},
456497
});
457498

@@ -494,44 +535,47 @@ export class LocalParticipant extends Participant {
494535
message: { case: type, value: req },
495536
});
496537

497-
const cb = await FfiClient.instance.waitFor<SendStreamHeaderCallback>((ev) => {
498-
return ev.message.case == type && ev.message.value.asyncId == res.asyncId;
499-
});
538+
const cb = await FfiClient.instance.waitFor<SendStreamHeaderCallback>(
539+
(ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId,
540+
{ signal: this.disconnectSignal },
541+
);
500542

501543
if (cb.error) {
502544
throw new Error(cb.error);
503545
}
504546
}
505547

506-
private async sendStreamChunk(req: SendStreamChunkRequest) {
548+
private sendStreamChunk = async (req: SendStreamChunkRequest) => {
507549
const type = 'sendStreamChunk';
508550
const res = FfiClient.instance.request<SendStreamChunkResponse>({
509551
message: { case: type, value: req },
510552
});
511553

512-
const cb = await FfiClient.instance.waitFor<SendStreamChunkCallback>((ev) => {
513-
return ev.message.case == type && ev.message.value.asyncId == res.asyncId;
514-
});
554+
const cb = await FfiClient.instance.waitFor<SendStreamChunkCallback>(
555+
(ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId,
556+
{ signal: this.disconnectSignal },
557+
);
515558

516559
if (cb.error) {
517560
throw new Error(cb.error);
518561
}
519-
}
562+
};
520563

521-
private async sendStreamTrailer(req: SendStreamTrailerRequest) {
564+
private sendStreamTrailer = async (req: SendStreamTrailerRequest) => {
522565
const type = 'sendStreamTrailer';
523566
const res = FfiClient.instance.request<SendStreamTrailerResponse>({
524567
message: { case: type, value: req },
525568
});
526569

527-
const cb = await FfiClient.instance.waitFor<SendStreamTrailerCallback>((ev) => {
528-
return ev.message.case == type && ev.message.value.asyncId == res.asyncId;
529-
});
570+
const cb = await FfiClient.instance.waitFor<SendStreamTrailerCallback>(
571+
(ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId,
572+
{ signal: this.disconnectSignal },
573+
);
530574

531575
if (cb.error) {
532576
throw new Error(cb.error);
533577
}
534-
}
578+
};
535579

536580
/**
537581
* Sends a chat message to participants in the room
@@ -557,9 +601,10 @@ export class LocalParticipant extends Participant {
557601
message: { case: 'sendChatMessage', value: req },
558602
});
559603

560-
const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>((ev) => {
561-
return ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId;
562-
});
604+
const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>(
605+
(ev) => ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId,
606+
{ signal: this.disconnectSignal },
607+
);
563608

564609
switch (cb.message.case) {
565610
case 'chatMessage':
@@ -603,9 +648,10 @@ export class LocalParticipant extends Participant {
603648
message: { case: 'editChatMessage', value: req },
604649
});
605650

606-
const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>((ev) => {
607-
return ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId;
608-
});
651+
const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>(
652+
(ev) => ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId,
653+
{ signal: this.disconnectSignal },
654+
);
609655

610656
switch (cb.message.case) {
611657
case 'chatMessage':
@@ -632,9 +678,10 @@ export class LocalParticipant extends Participant {
632678
message: { case: 'setLocalName', value: req },
633679
});
634680

635-
await FfiClient.instance.waitFor<SetLocalNameCallback>((ev) => {
636-
return ev.message.case == 'setLocalName' && ev.message.value.asyncId == res.asyncId;
637-
});
681+
await FfiClient.instance.waitFor<SetLocalNameCallback>(
682+
(ev) => ev.message.case == 'setLocalName' && ev.message.value.asyncId == res.asyncId,
683+
{ signal: this.disconnectSignal },
684+
);
638685
}
639686

640687
async setAttributes(attributes: Record<string, string>) {
@@ -647,9 +694,10 @@ export class LocalParticipant extends Participant {
647694
message: { case: 'setLocalAttributes', value: req },
648695
});
649696

650-
await FfiClient.instance.waitFor<SetLocalAttributesCallback>((ev) => {
651-
return ev.message.case == 'setLocalAttributes' && ev.message.value.asyncId == res.asyncId;
652-
});
697+
await FfiClient.instance.waitFor<SetLocalAttributesCallback>(
698+
(ev) => ev.message.case == 'setLocalAttributes' && ev.message.value.asyncId == res.asyncId,
699+
{ signal: this.disconnectSignal },
700+
);
653701
}
654702

655703
async publishTrack(
@@ -669,9 +717,10 @@ export class LocalParticipant extends Participant {
669717
});
670718

671719
try {
672-
const cb = await FfiClient.instance.waitFor<PublishTrackCallback>((ev) => {
673-
return ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId;
674-
});
720+
const cb = await FfiClient.instance.waitFor<PublishTrackCallback>(
721+
(ev) => ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId,
722+
{ signal: this.disconnectSignal },
723+
);
675724

676725
switch (cb.message.case) {
677726
case 'publication':
@@ -702,9 +751,10 @@ export class LocalParticipant extends Participant {
702751
message: { case: 'unpublishTrack', value: req },
703752
});
704753

705-
const cb = await FfiClient.instance.waitFor<UnpublishTrackCallback>((ev) => {
706-
return ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId;
707-
});
754+
const cb = await FfiClient.instance.waitFor<UnpublishTrackCallback>(
755+
(ev) => ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId,
756+
{ signal: this.disconnectSignal },
757+
);
708758

709759
if (cb.error) {
710760
throw new Error(cb.error);
@@ -744,9 +794,10 @@ export class LocalParticipant extends Participant {
744794
message: { case: 'performRpc', value: req },
745795
});
746796

747-
const cb = await FfiClient.instance.waitFor<PerformRpcCallback>((ev) => {
748-
return ev.message.case === 'performRpc' && ev.message.value.asyncId === res.asyncId;
749-
});
797+
const cb = await FfiClient.instance.waitFor<PerformRpcCallback>(
798+
(ev) => ev.message.case === 'performRpc' && ev.message.value.asyncId === res.asyncId,
799+
{ signal: this.disconnectSignal },
800+
);
750801

751802
if (cb.error) {
752803
throw RpcError.fromProto(cb.error);

packages/livekit-rtc/src/room.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
9696

9797
private preConnectEvents: FfiEvent[] = [];
9898

99+
// Aborted on disconnect to cancel any pending FfiClient.waitFor() listeners,
100+
// preventing them from leaking when the room goes away.
101+
private disconnectController = new AbortController();
102+
99103
private _token?: string;
100104
private _serverUrl?: string;
101105

@@ -241,9 +245,13 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
241245
this._serverUrl = url;
242246
this.info = cb.message.value.room!.info;
243247
this.connectionState = ConnectionState.CONN_CONNECTED;
248+
// Reset the abort controller for this connection session so that
249+
// a previous disconnect doesn't immediately cancel new operations.
250+
this.disconnectController = new AbortController();
244251
this.localParticipant = new LocalParticipant(
245252
cb.message.value.localParticipant!,
246253
this.ffiEventLock,
254+
this.disconnectController.signal,
247255
);
248256

249257
for (const pt of cb.message.value.participants) {
@@ -283,6 +291,11 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
283291
return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId;
284292
});
285293

294+
// Abort all pending FfiClient.waitFor() listeners so they don't leak.
295+
// This causes any in-flight operations (publishData, publishTrack, etc.)
296+
// to reject and clean up their event listeners.
297+
this.disconnectController.abort();
298+
286299
FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent);
287300
this.removeAllListeners();
288301
}
@@ -599,6 +612,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
599612
/*} else if (ev.case == 'connected') {
600613
this.emit(RoomEvent.Connected);*/
601614
} else if (ev.case == 'disconnected') {
615+
// Abort pending waitFor() listeners on server-initiated disconnect too,
616+
// not just on explicit disconnect() calls.
617+
this.disconnectController.abort();
602618
this.emit(RoomEvent.Disconnected, ev.value.reason!);
603619
} else if (ev.case == 'reconnecting') {
604620
this.emit(RoomEvent.Reconnecting);

0 commit comments

Comments
 (0)