Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/famous-gifts-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": patch
---

Resolve 4 Detail bugs (maxToolSteps, JWT caching, fallback concurrency, mergeFrames perf)
17 changes: 7 additions & 10 deletions agents/src/llm/fallback_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,10 @@ class FallbackLLMStream extends LLMStream {
extraKwargs: this.extraKwargs,
});

// Listen for error events - child LLMs emit errors via their LLM instance, not the stream
let streamError: Error | undefined;
const errorHandler = (ev: { error: Error }) => {
streamError = ev.error;
};
llm.on('error', errorHandler);
// Suppress unhandled 'error' events from the child LLM — error detection
// is done via stream._runError to avoid cross-request contamination.
const noop = () => {};
llm.on('error', noop);

try {
let shouldSetCurrent = !checkRecovery;
Expand All @@ -226,9 +224,8 @@ class FallbackLLMStream extends LLMStream {
yield chunk;
}

// If an error was emitted but not thrown through iteration, throw it now
if (streamError) {
throw streamError;
if (stream._runError) {
throw stream._runError;
}
} catch (error) {
if (error instanceof APIError) {
Expand Down Expand Up @@ -258,7 +255,7 @@ class FallbackLLMStream extends LLMStream {
}
throw error;
} finally {
llm.off('error', errorHandler);
llm.off('error', noop);
}
}

Expand Down
10 changes: 9 additions & 1 deletion agents/src/llm/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
protected abortController = new AbortController();
protected _connOptions: APIConnectOptions;
protected logger = log();
/** @internal – Captured when run() fails so callers can inspect after iteration ends. */
_runError?: Error;

#llm: LLM;
#chatCtx: ChatContext;
Expand Down Expand Up @@ -148,7 +150,13 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
// is run **after** the constructor has finished. Otherwise we get
// runtime error when trying to access class variables in the
// `run` method.
startSoon(() => this.mainTask().finally(() => this.queue.close()));
startSoon(() =>
this.mainTask()
.catch((e) => {
this._runError = e instanceof Error ? e : new Error(String(e));
})
.finally(() => this.queue.close()),
);
}

private _mainTaskImpl = async (span: Span) => {
Expand Down
4 changes: 3 additions & 1 deletion agents/src/telemetry/otel_http_exporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface SimpleOTLPHttpLogExporterConfig {
export class SimpleOTLPHttpLogExporter {
private readonly config: SimpleOTLPHttpLogExporterConfig;
private jwt: string | null = null;
private jwtExpiresAt = 0;

private static readonly FORCE_DOUBLE_KEYS = new Set([
'transcriptConfidence',
Expand Down Expand Up @@ -102,7 +103,7 @@ export class SimpleOTLPHttpLogExporter {
}

private async ensureJwt(): Promise<void> {
if (this.jwt) return;
if (this.jwt && Date.now() < this.jwtExpiresAt) return;

const apiKey = process.env.LIVEKIT_API_KEY;
const apiSecret = process.env.LIVEKIT_API_SECRET;
Expand All @@ -114,6 +115,7 @@ export class SimpleOTLPHttpLogExporter {
const token = new AccessToken(apiKey, apiSecret, { ttl: '6h' });
token.addObservabilityGrant({ write: true });
this.jwt = await token.toJwt();
this.jwtExpiresAt = Date.now() + 5 * 60 * 60 * 1000;
}

private buildPayload(records: SimpleLogRecord[]): object {
Expand Down
7 changes: 6 additions & 1 deletion agents/src/telemetry/pino_otel_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export class PinoCloudExporter {
private readonly batchSize: number;
private readonly flushIntervalMs: number;
private jwt: string | null = null;
private jwtExpiresAt = 0;
private pendingLogs: any[] = [];
private flushTimer: NodeJS.Timeout | null = null;

Expand Down Expand Up @@ -172,6 +173,9 @@ export class PinoCloudExporter {
await this.sendLogs(logs);
} catch (error) {
this.pendingLogs = [...logs, ...this.pendingLogs];
if (this.pendingLogs.length > 10000) {
this.pendingLogs = this.pendingLogs.slice(-10000);
}
console.error('[PinoCloudExporter] Failed to flush logs:', error);
}
}
Expand Down Expand Up @@ -223,7 +227,7 @@ export class PinoCloudExporter {
}

private async ensureJwt(): Promise<void> {
if (this.jwt) return;
if (this.jwt && Date.now() < this.jwtExpiresAt) return;

const apiKey = process.env.LIVEKIT_API_KEY;
const apiSecret = process.env.LIVEKIT_API_SECRET;
Expand All @@ -235,6 +239,7 @@ export class PinoCloudExporter {
const token = new AccessToken(apiKey, apiSecret, { ttl: '6h' });
token.addObservabilityGrant({ write: true });
this.jwt = await token.toJwt();
this.jwtExpiresAt = Date.now() + 5 * 60 * 60 * 1000;
}

async shutdown(): Promise<void> {
Expand Down
13 changes: 10 additions & 3 deletions agents/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ export const mergeFrames = (buffer: AudioBuffer): AudioFrame => {

const sampleRate = buffer[0]!.sampleRate;
const channels = buffer[0]!.channels;
let totalDataLength = 0;
let samplesPerChannel = 0;
let data = new Int16Array();

for (const frame of buffer) {
if (frame.sampleRate !== sampleRate) {
Expand All @@ -74,10 +74,17 @@ export const mergeFrames = (buffer: AudioBuffer): AudioFrame => {
throw new TypeError('channel count mismatch');
}

data = new Int16Array([...data, ...frame.data]);
totalDataLength += frame.data.length;
samplesPerChannel += frame.samplesPerChannel;
}

const data = new Int16Array(totalDataLength);
let offset = 0;
for (const frame of buffer) {
data.set(frame.data, offset);
offset += frame.data.length;
}

return new AudioFrame(data, sampleRate, channels, samplesPerChannel);
}

Expand All @@ -101,7 +108,7 @@ export class Queue<T> {
await once(this.#events, 'put');
}
let item = this.items.shift();
if (typeof item === 'undefined') {
if (item === undefined) {
item = await _get();
}
return item;
Expand Down
3 changes: 2 additions & 1 deletion agents/src/voice/speech_handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class SpeechHandle {
_tasks: Task<void>[] = [];

/** @internal */
_numSteps = 1;
_numSteps: number;

/** @internal - OpenTelemetry context for the agent turn span */
_agentTurnContext?: Context;
Expand All @@ -62,6 +62,7 @@ export class SpeechHandle {
public _stepIndex: number,
readonly parent?: SpeechHandle,
) {
this._numSteps = _stepIndex;
this.doneFut.await.finally(() => {
for (const callback of this.doneCallbacks) {
callback(this);
Expand Down
Loading