Skip to content

Commit 7f7cae9

Browse files
fix: cancel losing timeout in AudioMixer race to prevent timer leak (#641)
* fix: cancel losing timeout in AudioMixer race to prevent timer leak * chore: add changeset * fix: clear timeout on error path when iterator.next() rejects * Simplify timeoutRace with promise.finally() for automatic cleanup
1 parent dc8664e commit 7f7cae9

3 files changed

Lines changed: 116 additions & 3 deletions

File tree

.changeset/long-keys-roll.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@livekit/rtc-node': patch
3+
---
4+
5+
Cancel losing timeout in AudioMixer race to prevent orphaned timers

packages/livekit-rtc/src/audio_mixer.test.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,106 @@ describe('AudioMixer', () => {
164164
// Should get at least 2 frames (stream exhausts after 2)
165165
expect(frames.length).toBeGreaterThanOrEqual(2);
166166
});
167+
168+
it('completes mixing without lingering timers when iterator is fast', async () => {
169+
const sampleRate = 48000;
170+
const numChannels = 1;
171+
const samplesPerChannel = 480;
172+
const mixer = new AudioMixer(sampleRate, numChannels, {
173+
blocksize: samplesPerChannel,
174+
// Long timeout so the iterator always wins the race.
175+
// Before the fix, each iteration leaked a 5s timer; with the fix,
176+
// cancel() clears it immediately so the mixer shuts down without delay.
177+
streamTimeoutMs: 5000,
178+
});
179+
180+
const stream = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 42);
181+
mixer.addStream(stream);
182+
183+
const frames: AudioFrame[] = [];
184+
for await (const frame of mixer) {
185+
frames.push(frame);
186+
if (frames.length >= 2) break;
187+
}
188+
189+
await mixer.aclose();
190+
191+
expect(frames.length).toBe(2);
192+
// Verify the frames contain the expected mixed value
193+
for (const frame of frames) {
194+
expect(frame.data[0]).toBe(42);
195+
}
196+
});
197+
198+
it('produces frames even with many race iterations', async () => {
199+
const sampleRate = 48000;
200+
const numChannels = 1;
201+
const samplesPerChannel = 480;
202+
const mixer = new AudioMixer(sampleRate, numChannels, {
203+
blocksize: samplesPerChannel,
204+
streamTimeoutMs: 5000,
205+
});
206+
207+
// Use more frames to stress multiple race iterations
208+
const stream = createMockAudioStream(6, sampleRate, numChannels, samplesPerChannel, 10);
209+
mixer.addStream(stream);
210+
211+
const frames: AudioFrame[] = [];
212+
for await (const frame of mixer) {
213+
frames.push(frame);
214+
if (frames.length >= 4) break;
215+
}
216+
217+
await mixer.aclose();
218+
219+
expect(frames.length).toBe(4);
220+
// All frames should contain the expected value
221+
for (const frame of frames) {
222+
expect(frame.data[0]).toBe(10);
223+
}
224+
});
225+
226+
it('handles slow streams via timeout path', async () => {
227+
const sampleRate = 48000;
228+
const numChannels = 1;
229+
const samplesPerChannel = 480;
230+
const mixer = new AudioMixer(sampleRate, numChannels, {
231+
blocksize: samplesPerChannel,
232+
// Very short timeout to trigger the timeout path
233+
streamTimeoutMs: 1,
234+
});
235+
236+
// Create a stream that is slower than the timeout
237+
async function* slowStream(): AsyncGenerator<AudioFrame> {
238+
await new Promise((resolve) => setTimeout(resolve, 200));
239+
const data = new Int16Array(numChannels * samplesPerChannel).fill(500);
240+
yield new AudioFrame(data, sampleRate, numChannels, samplesPerChannel);
241+
}
242+
243+
// Suppress the expected console.warn from the timeout path
244+
const originalWarn = console.warn;
245+
const warnings: string[] = [];
246+
console.warn = (...args: unknown[]) => {
247+
warnings.push(args.map(String).join(' '));
248+
};
249+
250+
try {
251+
mixer.addStream(slowStream());
252+
253+
// The mixer should produce a frame (zero-padded due to timeout)
254+
// and auto-close when the stream exhausts.
255+
const frames: AudioFrame[] = [];
256+
for await (const frame of mixer) {
257+
frames.push(frame);
258+
if (frames.length >= 1) break;
259+
}
260+
261+
await mixer.aclose();
262+
263+
// The timeout warning should have been logged
264+
expect(warnings.some((w) => w.includes('stream timeout after'))).toBe(true);
265+
} finally {
266+
console.warn = originalWarn;
267+
}
268+
});
167269
});

packages/livekit-rtc/src/audio_mixer.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ export class AudioMixer {
311311
// Accumulate data until we have at least chunkSize samples
312312
while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) {
313313
try {
314-
const result = await Promise.race([iterator.next(), this.timeout(this.streamTimeoutMs)]);
314+
const result = await this.timeoutRace(iterator.next(), this.streamTimeoutMs);
315315

316316
if (result === 'timeout') {
317317
console.warn(`AudioMixer: stream timeout after ${this.streamTimeoutMs}ms`);
@@ -412,7 +412,13 @@ export class AudioMixer {
412412
return new Promise((resolve) => setTimeout(resolve, ms));
413413
}
414414

415-
private timeout(ms: number): Promise<'timeout'> {
416-
return new Promise((resolve) => setTimeout(() => resolve('timeout'), ms));
415+
/** Race a promise against a timeout. The losing setTimeout is automatically
416+
* cleared via `.finally()` so callers don't need to manage cleanup. */
417+
private timeoutRace<T>(promise: Promise<T>, ms: number): Promise<T | 'timeout'> {
418+
let timer: ReturnType<typeof setTimeout>;
419+
const timeoutPromise = new Promise<'timeout'>((resolve) => {
420+
timer = setTimeout(() => resolve('timeout'), ms);
421+
});
422+
return Promise.race([promise.finally(() => clearTimeout(timer)), timeoutPromise]);
417423
}
418424
}

0 commit comments

Comments
 (0)