Skip to content

Commit 48df2e1

Browse files
committed
feat: Stream Middleware V2
1 parent 40925ed commit 48df2e1

2 files changed

Lines changed: 78 additions & 6 deletions

File tree

examples/vue-example/package-lock.json

Lines changed: 7 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import { Duplex } from "stream";
2+
3+
import { JRPCRequest, Json } from "../interfaces";
4+
import { SafeEventEmitter } from "../safeEventEmitter";
5+
import { JRPCMiddlewareV2 } from "./v2interfaces";
6+
7+
/**
8+
* Creates a V2-compatible client-side stream middleware for the dapp ↔ iframe
9+
* transport layer.
10+
*
11+
* Replaces V1's `createStreamMiddleware` by providing:
12+
* - A terminal middleware that sends outbound requests through the stream and
13+
* resolves when the matching response arrives.
14+
* - A Duplex object stream to pump through the ObjectMultiplex channel.
15+
* - Inbound notification routing via the supplied `notificationEmitter`.
16+
*/
17+
export function createClientStreamMiddlewareV2({ notificationEmitter }: { notificationEmitter?: SafeEventEmitter } = {}): {
18+
middleware: JRPCMiddlewareV2<JRPCRequest<unknown>, Json>;
19+
stream: Duplex;
20+
} {
21+
const pendingRequests = new Map<number | string, { resolve: (result: Json) => void; reject: (error: unknown) => void }>();
22+
23+
function noop() {
24+
// noop
25+
}
26+
27+
function write(this: Duplex, data: Record<string, unknown>, _encoding: BufferEncoding, cb: () => void) {
28+
// eslint-disable-next-line no-console
29+
console.log("createClientStreamMiddlewareV2::data", data);
30+
if (data.method !== undefined) {
31+
// Inbound request or notification from remote — route to event emitter
32+
// (matches V1 createStreamMiddleware behavior where all non-response
33+
// messages are emitted as "notification" regardless of whether they
34+
// carry an id)
35+
notificationEmitter?.emit("notification", data);
36+
} else {
37+
// No method → this is a response to one of our pending outbound requests
38+
const id = data.id as number | string | undefined;
39+
if (id !== undefined && id !== null && pendingRequests.has(id)) {
40+
const pending = pendingRequests.get(id)!;
41+
pendingRequests.delete(id);
42+
43+
if (data.error) {
44+
const errorObj = data.error as { code?: number; message?: string; data?: unknown };
45+
pending.reject(Object.assign(new Error(errorObj.message || "Internal JSON-RPC error"), { code: errorObj.code, data: errorObj.data }));
46+
} else {
47+
pending.resolve(data.result as Json);
48+
}
49+
}
50+
}
51+
52+
cb();
53+
}
54+
55+
const stream = new Duplex({ objectMode: true, read: noop, write });
56+
57+
stream.once("close", () => {
58+
const error = new Error("Stream closed");
59+
pendingRequests.forEach(({ reject }) => reject(error));
60+
pendingRequests.clear();
61+
});
62+
63+
const middleware: JRPCMiddlewareV2<JRPCRequest<unknown>, Json> = ({ request }) => {
64+
return new Promise<Json>((resolve, reject) => {
65+
pendingRequests.set(request.id as number | string, { resolve, reject });
66+
stream.push(request);
67+
});
68+
};
69+
70+
return { middleware, stream };
71+
}

0 commit comments

Comments
 (0)