diff --git a/.changeset/warm-owls-deny.md b/.changeset/warm-owls-deny.md new file mode 100644 index 00000000..8f5e3964 --- /dev/null +++ b/.changeset/warm-owls-deny.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Dispose track publication FfiHandles on participant disconnect to prevent FD leaks diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 35a9ac71..b228f1d8 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -446,6 +446,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter participant.trackPublications.delete(ev.value.publicationSid!); if (publication) { this.emit(RoomEvent.TrackUnpublished, publication, participant); + // Dispose eagerly so handles don't accumulate when a participant + // publishes and unpublishes many tracks during a long-lived session. + publication.ffiHandle.dispose(); } else { log.warn(`RoomEvent.TrackUnpublished: Could not find publication`); } diff --git a/packages/livekit-rtc/src/tests/e2e.test.ts b/packages/livekit-rtc/src/tests/e2e.test.ts index 3bbced55..dd234ae7 100644 --- a/packages/livekit-rtc/src/tests/e2e.test.ts +++ b/packages/livekit-rtc/src/tests/e2e.test.ts @@ -515,6 +515,98 @@ describeE2E('livekit-rtc e2e', () => { testTimeoutMs * 2, ); + it( + 'cleans up track publications when a remote participant disconnects', + async () => { + const { rooms } = await connectTestRooms(2); + const [stayingRoom, leavingRoom] = rooms; + + // Publish a track from the leaving participant so its track publication + // will need to be cleaned up on disconnect. + const source = new AudioSource(48_000, 1); + const track = LocalAudioTrack.createAudioTrack('cleanup-test', source); + const options = new TrackPublishOptions(); + options.source = TrackSource.SOURCE_MICROPHONE; + await leavingRoom!.localParticipant!.publishTrack(track, options); + + // Wait for the staying room to see the track subscription + await waitFor( + () => { + const remote = stayingRoom!.remoteParticipants.get( + leavingRoom!.localParticipant!.identity, + ); + return remote !== undefined && remote.trackPublications.size > 0; + }, + { timeoutMs: 5000, debugName: 'track publication visible' }, + ); + + // Capture a reference to the remote participant before disconnect + const remoteParticipant = stayingRoom!.remoteParticipants.get( + leavingRoom!.localParticipant!.identity, + )!; + expect(remoteParticipant.trackPublications.size).toBeGreaterThan(0); + + // Listen for the disconnect event + const disconnected = waitForRoomEvent( + stayingRoom!, + RoomEvent.ParticipantDisconnected, + testTimeoutMs, + (p: { identity: string }) => p.identity, + ); + + await leavingRoom!.disconnect(); + await disconnected; + + // trackUnpublished events fire before participantDisconnected, so + // by this point all publications should already be removed and disposed. + expect(remoteParticipant.trackPublications.size).toBe(0); + expect(stayingRoom!.remoteParticipants.has(remoteParticipant.identity)).toBe(false); + + await source.close(); + await stayingRoom!.disconnect(); + }, + testTimeoutMs, + ); + + it( + 'cleans up resources when multiple participants disconnect simultaneously', + async () => { + // Connect 4 participants to stress-test concurrent disconnection cleanup + const { rooms } = await connectTestRooms(4); + + // Publish a track from each participant to create track publications + const sources: AudioSource[] = []; + for (const room of rooms) { + const source = new AudioSource(48_000, 1); + sources.push(source); + const track = LocalAudioTrack.createAudioTrack('multi-cleanup', source); + const options = new TrackPublishOptions(); + options.source = TrackSource.SOURCE_MICROPHONE; + await room.localParticipant!.publishTrack(track, options); + } + + // Wait for all participants to see each other's tracks + await waitFor( + () => + rooms.every( + (r) => + r.remoteParticipants.size === 3 && + [...r.remoteParticipants.values()].every((p) => p.trackPublications.size > 0), + ), + { timeoutMs: 5000, debugName: 'all tracks visible' }, + ); + + // Disconnect all participants simultaneously + await Promise.all([...rooms.map((r) => r.disconnect()), ...sources.map((s) => s.close())]); + + // Verify all rooms are disconnected and remote participant maps are empty + for (const room of rooms) { + expect(room.isConnected).toBe(false); + } + }, + testTimeoutMs * 2, + ); + it( 'concurrent getSid() calls share a single listener and resolve consistently', async () => {