Skip to content

Commit 71ed012

Browse files
committed
merge: worker/65d6be2f - MCP server bug fixes
2 parents 973e07c + 74f7d95 commit 71ed012

2 files changed

Lines changed: 400 additions & 272 deletions

File tree

mcp-server/src/client.ts

Lines changed: 124 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
import * as net from "net";
66

77
interface SocketRequest {
8+
id: number;
89
command: string;
910
payload: Record<string, unknown>;
1011
}
1112

1213
interface SocketResponse {
14+
id?: number;
1315
success: boolean;
1416
data?: unknown;
1517
error?: string;
@@ -24,10 +26,10 @@ interface PendingRequest {
2426

2527
// Command-specific timeouts (in ms)
2628
const COMMAND_TIMEOUTS: Record<string, number> = {
27-
takeScreenshot: 60000, // Screenshots can take longer
28-
getDom: 60000, // DOM can be large
29-
executeJs: 60000, // JS execution varies
30-
default: 30000, // Default timeout
29+
takeScreenshot: 60000,
30+
getDom: 60000,
31+
executeJs: 60000,
32+
default: 30000,
3133
};
3234

3335
export class CortexSocketClient {
@@ -37,9 +39,10 @@ export class CortexSocketClient {
3739
private connected: boolean = false;
3840
private connecting: boolean = false;
3941
private responseBuffer: string = "";
40-
private pendingRequest: PendingRequest | null = null;
41-
private reconnectAttempts: number = 0;
42+
private pendingRequests: Map<number, PendingRequest> = new Map();
43+
private nextRequestId: number = 1;
4244
private maxReconnectAttempts: number = 3;
45+
private connectWaiters: Array<{ resolve: () => void; reject: (err: Error) => void }> = [];
4346

4447
constructor(host: string = "127.0.0.1", port: number = 4000) {
4548
this.host = host;
@@ -52,34 +55,16 @@ export class CortexSocketClient {
5255
}
5356

5457
if (this.connecting) {
55-
// Wait for existing connection attempt
56-
return new Promise((resolve, reject) => {
57-
const checkInterval = setInterval(() => {
58-
if (this.connected) {
59-
clearInterval(checkInterval);
60-
resolve();
61-
} else if (!this.connecting) {
62-
clearInterval(checkInterval);
63-
reject(new Error("Connection failed"));
64-
}
65-
}, 100);
66-
67-
// Timeout waiting for connection
68-
setTimeout(() => {
69-
clearInterval(checkInterval);
70-
if (!this.connected) {
71-
reject(new Error("Connection timeout"));
72-
}
73-
}, 10000);
58+
return new Promise<void>((resolve, reject) => {
59+
this.connectWaiters.push({ resolve, reject });
7460
});
7561
}
7662

7763
this.connecting = true;
7864

79-
return new Promise((resolve, reject) => {
65+
return new Promise<void>((resolve, reject) => {
8066
this.socket = new net.Socket();
81-
82-
// Set socket timeout
67+
8368
this.socket.setTimeout(60000);
8469

8570
const connectTimeout = setTimeout(() => {
@@ -88,26 +73,31 @@ export class CortexSocketClient {
8873
this.socket.destroy();
8974
this.socket = null;
9075
}
91-
reject(new Error("Connection timeout"));
76+
const err = new Error("Connection timeout");
77+
reject(err);
78+
this.drainConnectWaiters(err);
9279
}, 10000);
9380

81+
let settled = false;
82+
9483
this.socket.on("connect", () => {
9584
clearTimeout(connectTimeout);
9685
this.connected = true;
9786
this.connecting = false;
98-
this.reconnectAttempts = 0;
87+
settled = true;
9988
console.error(`[MCP Client] Connected to Cortex Desktop at ${this.host}:${this.port}`);
10089
resolve();
90+
this.drainConnectWaiters(null);
10191
});
10292

10393
this.socket.on("data", (data) => {
10494
this.handleData(data);
10595
});
10696

10797
this.socket.on("timeout", () => {
108-
console.error("[MCP Client] Socket timeout");
109-
if (this.pendingRequest) {
110-
this.clearPendingRequest(new Error("Socket timeout"));
98+
console.error("[MCP Client] Socket idle timeout — destroying connection");
99+
if (this.socket) {
100+
this.socket.destroy();
111101
}
112102
});
113103

@@ -116,149 +106,189 @@ export class CortexSocketClient {
116106
console.error("[MCP Client] Socket error:", err.message);
117107
this.connected = false;
118108
this.connecting = false;
119-
120-
if (this.pendingRequest) {
121-
this.clearPendingRequest(err);
109+
110+
this.rejectAllPending(err);
111+
112+
if (!settled) {
113+
settled = true;
114+
reject(err);
115+
this.drainConnectWaiters(err);
122116
}
123-
124-
reject(err);
125117
});
126118

127119
this.socket.on("close", () => {
128120
console.error("[MCP Client] Connection closed");
121+
const wasClosed = !this.connected;
129122
this.connected = false;
130123
this.connecting = false;
131124
this.socket = null;
132-
133-
if (this.pendingRequest) {
134-
this.clearPendingRequest(new Error("Connection closed"));
125+
this.responseBuffer = "";
126+
127+
this.rejectAllPending(new Error("Connection closed"));
128+
129+
if (!settled && !wasClosed) {
130+
settled = true;
131+
const err = new Error("Connection closed before established");
132+
reject(err);
133+
this.drainConnectWaiters(err);
135134
}
136135
});
137136

138137
this.socket.connect(this.port, this.host);
139138
});
140139
}
141140

141+
private drainConnectWaiters(err: Error | null): void {
142+
const waiters = this.connectWaiters;
143+
this.connectWaiters = [];
144+
for (const w of waiters) {
145+
if (err) {
146+
w.reject(err);
147+
} else {
148+
w.resolve();
149+
}
150+
}
151+
}
152+
142153
private handleData(data: Buffer): void {
143154
this.responseBuffer += data.toString();
144-
145-
// Process all complete JSON responses in buffer
155+
146156
let newlineIndex: number;
147157
while ((newlineIndex = this.responseBuffer.indexOf("\n")) !== -1) {
148158
const jsonStr = this.responseBuffer.substring(0, newlineIndex);
149159
this.responseBuffer = this.responseBuffer.substring(newlineIndex + 1);
150-
160+
151161
if (!jsonStr.trim()) continue;
152-
162+
153163
try {
154164
const response = JSON.parse(jsonStr) as SocketResponse;
155-
if (this.pendingRequest) {
156-
const { resolve, timeoutId } = this.pendingRequest;
157-
clearTimeout(timeoutId);
158-
this.pendingRequest = null;
159-
resolve(response);
165+
166+
if (response.id != null && this.pendingRequests.has(response.id)) {
167+
const pending = this.pendingRequests.get(response.id)!;
168+
this.pendingRequests.delete(response.id);
169+
clearTimeout(pending.timeoutId);
170+
pending.resolve(response);
171+
} else {
172+
const firstEntry = this.pendingRequests.entries().next();
173+
if (!firstEntry.done) {
174+
const [id, pending] = firstEntry.value;
175+
this.pendingRequests.delete(id);
176+
clearTimeout(pending.timeoutId);
177+
pending.resolve(response);
178+
}
160179
}
161180
} catch (e) {
162181
console.error("[MCP Client] Failed to parse response:", e, "Raw:", jsonStr.substring(0, 200));
163-
if (this.pendingRequest) {
164-
this.clearPendingRequest(new Error(`Failed to parse response: ${e}`));
182+
const firstEntry = this.pendingRequests.entries().next();
183+
if (!firstEntry.done) {
184+
const [id, pending] = firstEntry.value;
185+
this.pendingRequests.delete(id);
186+
clearTimeout(pending.timeoutId);
187+
pending.reject(new Error(`Failed to parse response: ${e}`));
165188
}
166189
}
167190
}
168191
}
169192

170-
private clearPendingRequest(error: Error): void {
171-
if (this.pendingRequest) {
172-
const { reject, timeoutId } = this.pendingRequest;
173-
clearTimeout(timeoutId);
174-
this.pendingRequest = null;
175-
reject(error);
193+
private rejectAllPending(error: Error): void {
194+
const entries = Array.from(this.pendingRequests.entries());
195+
this.pendingRequests.clear();
196+
for (const [, pending] of entries) {
197+
clearTimeout(pending.timeoutId);
198+
pending.reject(error);
176199
}
177200
}
178201

179202
private getTimeout(command: string): number {
180-
return COMMAND_TIMEOUTS[command] || COMMAND_TIMEOUTS.default;
203+
return COMMAND_TIMEOUTS[command] ?? COMMAND_TIMEOUTS["default"];
181204
}
182205

183206
async sendCommand(command: string, payload: Record<string, unknown> = {}): Promise<SocketResponse> {
184-
// Ensure we're connected
185207
if (!this.connected || !this.socket) {
186-
try {
187-
await this.connect();
188-
} catch (err) {
189-
// Try to reconnect if initial connection failed
190-
if (this.reconnectAttempts < this.maxReconnectAttempts) {
191-
this.reconnectAttempts++;
192-
console.error(`[MCP Client] Reconnect attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts}`);
193-
await new Promise(resolve => setTimeout(resolve, 1000));
194-
return this.sendCommand(command, payload);
208+
let lastErr: unknown;
209+
for (let attempt = 0; attempt <= this.maxReconnectAttempts; attempt++) {
210+
try {
211+
if (attempt > 0) {
212+
console.error(`[MCP Client] Reconnect attempt ${attempt}/${this.maxReconnectAttempts}`);
213+
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
214+
}
215+
await this.connect();
216+
break;
217+
} catch (err) {
218+
lastErr = err;
219+
if (attempt === this.maxReconnectAttempts) {
220+
throw lastErr;
221+
}
195222
}
196-
throw err;
197-
}
198-
}
199-
200-
// Wait if there's already a pending request
201-
if (this.pendingRequest) {
202-
console.error(`[MCP Client] Waiting for pending request: ${this.pendingRequest.command}`);
203-
await new Promise(resolve => setTimeout(resolve, 100));
204-
if (this.pendingRequest) {
205-
throw new Error("Another request is already in progress");
206223
}
207224
}
208225

209226
return new Promise((resolve, reject) => {
210-
if (!this.socket) {
227+
if (!this.socket || !this.connected) {
211228
reject(new Error("Socket not connected"));
212229
return;
213230
}
214231

232+
const requestId = this.nextRequestId++;
215233
const timeout = this.getTimeout(command);
216-
234+
217235
const timeoutId = setTimeout(() => {
218-
if (this.pendingRequest) {
236+
if (this.pendingRequests.has(requestId)) {
237+
this.pendingRequests.delete(requestId);
219238
console.error(`[MCP Client] Request timeout for command: ${command} (${timeout}ms)`);
220-
this.pendingRequest = null;
221239
reject(new Error(`Request timed out after ${timeout}ms`));
222240
}
223241
}, timeout);
224242

225-
this.pendingRequest = {
243+
this.pendingRequests.set(requestId, {
226244
resolve,
227245
reject,
228246
timeoutId,
229247
command,
230-
};
248+
});
231249

232-
const request: SocketRequest = { command, payload };
250+
const request: SocketRequest = { id: requestId, command, payload };
233251
const json = JSON.stringify(request) + "\n";
234-
252+
235253
this.socket.write(json, (err) => {
236254
if (err) {
237-
this.clearPendingRequest(err);
255+
if (this.pendingRequests.has(requestId)) {
256+
const pending = this.pendingRequests.get(requestId)!;
257+
this.pendingRequests.delete(requestId);
258+
clearTimeout(pending.timeoutId);
259+
pending.reject(err);
260+
}
238261
}
239262
});
240263
});
241264
}
242265

243266
disconnect(): void {
244-
if (this.pendingRequest) {
245-
this.clearPendingRequest(new Error("Client disconnected"));
246-
}
247-
267+
this.rejectAllPending(new Error("Client disconnected"));
268+
248269
if (this.socket) {
249270
this.socket.destroy();
250271
this.socket = null;
251272
this.connected = false;
252273
}
274+
275+
this.responseBuffer = "";
276+
this.connectWaiters = [];
253277
}
254278

255279
isConnected(): boolean {
256-
return this.connected;
280+
return this.connected && this.socket !== null;
257281
}
258282
}
259283

284+
function parsePort(value: string | undefined, fallback: number): number {
285+
if (value == null) return fallback;
286+
const parsed = parseInt(value, 10);
287+
return Number.isFinite(parsed) && parsed > 0 && parsed <= 65535 ? parsed : fallback;
288+
}
289+
260290
// Singleton instance
261291
export const socketClient = new CortexSocketClient(
262292
process.env.CORTEX_MCP_HOST || "127.0.0.1",
263-
parseInt(process.env.CORTEX_MCP_PORT || "4000", 10)
293+
parsePort(process.env.CORTEX_MCP_PORT, 4000),
264294
);

0 commit comments

Comments
 (0)