Skip to content

Commit b01101b

Browse files
committed
fix: fixed stream write callback
1 parent 964e3f2 commit b01101b

4 files changed

Lines changed: 40 additions & 21 deletions

File tree

src/jrpc/v2/messageStream.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Duplex } from "readable-stream";
33

44
import { isRequest } from "../../utils/jrpc";
55
import { rpcErrors } from "../errors";
6-
import { JRPCRequest, Json } from "../interfaces";
6+
import { JRPCRequest } from "../interfaces";
77
import { SafeEventEmitter } from "../safeEventEmitter";
88
import { JRPCEngineV2 } from "./jrpcEngineV2";
99

@@ -21,18 +21,17 @@ export function createEngineStreamV2({ engine, notificationEmitter }: { engine:
2121
// noop
2222
}
2323

24-
function write(req: JRPCRequest<unknown>, _encoding: BufferEncoding, cb: (error?: Error | null) => void) {
25-
engine
24+
function handleRequest(req: JRPCRequest<unknown>) {
25+
return engine
2626
.handle(req)
27-
.then((res: Json | void): void => {
27+
.then((res): undefined => {
2828
if (res !== undefined && isRequest(req)) {
2929
stream?.push({
3030
id: req.id,
3131
jsonrpc: "2.0",
3232
result: res,
3333
});
3434
}
35-
cb();
3635
return undefined;
3736
})
3837
.catch((err: unknown) => {
@@ -45,10 +44,15 @@ export function createEngineStreamV2({ engine, notificationEmitter }: { engine:
4544
});
4645
}
4746
log.error(err);
48-
cb(err as Error | null);
4947
});
5048
}
5149

50+
function write(req: JRPCRequest<unknown>, _encoding: BufferEncoding, cb: (error?: Error | null) => void) {
51+
return handleRequest(req).finally(() => {
52+
cb();
53+
});
54+
}
55+
5256
stream = new Duplex({ objectMode: true, read: noop, write });
5357

5458
if (notificationEmitter) {

src/jrpc/v2/providerUtils.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,24 @@ export function providerFromEngine(engine: JRPCEngineV2): SafeEventEmitterProvid
2525
return result as U;
2626
};
2727

28+
async function handleWithCallback<T extends JRPCParams, U>(req: JRPCRequest<T>, callback: (error: unknown, providerRes: JRPCResponse<U>) => void) {
29+
try {
30+
const result = await engine.handle(req as JRPCRequest);
31+
callback(null, { id: req.id, jsonrpc: "2.0", result: result as U });
32+
} catch (error) {
33+
const serializedError = serializeJrpcError(error, {
34+
shouldIncludeStack: false,
35+
shouldPreserveMessage: true,
36+
});
37+
callback(serializedError, { id: req.id, jsonrpc: "2.0", error: serializedError });
38+
}
39+
}
40+
2841
provider.send = <T extends JRPCParams, U>(req: JRPCRequest<T>, callback: (error: unknown, providerRes: JRPCResponse<U>) => void) => {
2942
if (typeof callback !== "function") {
3043
throw new Error('Must provide callback to "send" method.');
3144
}
32-
engine
33-
.handle(req as JRPCRequest)
34-
.then((result) => callback(null, { id: req.id, jsonrpc: "2.0", result: result as U }))
35-
.catch((error) => {
36-
const serializedError = serializeJrpcError(error, {
37-
shouldIncludeStack: false,
38-
shouldPreserveMessage: true,
39-
});
40-
callback(error, { id: req.id, jsonrpc: "2.0", error: serializedError });
41-
});
45+
handleWithCallback(req, callback);
4246
};
4347

4448
provider.request = async <T extends JRPCParams, U>(args: RequestArguments<T>) => {

test/v2/messageStream.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,16 @@ describe("createEngineStreamV2", () => {
180180
expect((pushed.error as Record<string, unknown>).message).toContain("specific failure");
181181
});
182182

183+
it("calls cb() without an error to keep the stream alive", async () => {
184+
setup(createMockEngine(vi.fn().mockRejectedValue(new Error("engine failure"))));
185+
186+
const cb = vi.fn();
187+
capturedWrite(makeRequest({ id: "err-alive" }), "utf-8", cb);
188+
await flushPromises();
189+
190+
expect(cb).toHaveBeenCalledOnce();
191+
});
192+
183193
it("uses a fallback message for non-Error throws", async () => {
184194
setup(createMockEngine(vi.fn().mockRejectedValue("string error")));
185195

test/v2/providerUtils.test.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,10 @@ describe("providerUtils", () => {
134134
const req = makeRequest({ id: "42" });
135135
const { error, response } = await sendPromise(provider, req);
136136

137-
expect(error).toBe(testError);
137+
expect(error).toMatchObject({ code: -32603, message: "send error" });
138+
expect(error).toBe(response.error);
138139
expect(response.id).toBe("42");
139140
expect(response.jsonrpc).toBe("2.0");
140-
expect(response.error).toMatchObject({ message: "send error" });
141141
});
142142

143143
it("constructs a proper JRPCResponse shape", async () => {
@@ -202,7 +202,7 @@ describe("providerUtils", () => {
202202

203203
const { error, response } = await sendPromise(provider, makeRequest({ id: "x" }));
204204

205-
expect(error).toBe(thrown);
205+
expect(error).toBe(response.error);
206206
expect(response.id).toBe("x");
207207
expect(response.jsonrpc).toBe("2.0");
208208
const rpcError = response.error as Record<string, unknown>;
@@ -323,9 +323,10 @@ describe("providerUtils", () => {
323323
};
324324
const provider = providerFromMiddleware(middleware as JRPCMiddlewareV2);
325325

326-
const { error } = await sendPromise(provider, makeRequest());
326+
const { error, response } = await sendPromise(provider, makeRequest());
327327

328-
expect(error).toBe(testError);
328+
expect(error).toMatchObject({ code: -32603, message: "middleware-error" });
329+
expect(error).toBe(response.error);
329330
});
330331

331332
it("works with a middleware that uses context", async () => {

0 commit comments

Comments
 (0)