From d81a184b4bcff7e190778eb9ce616e0bae8c19de Mon Sep 17 00:00:00 2001 From: Rando Luik Date: Thu, 14 May 2026 11:15:21 +0300 Subject: [PATCH 01/11] WIP --- .../graphql-subscriptions-fixture.ts | 9 +- .../__tests__/graphql-subscriptions.spec.ts | 210 ++++++++-- .../lib/services/graphql/graphql.service.ts | 370 +++++++++++------- 3 files changed, 402 insertions(+), 187 deletions(-) diff --git a/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts b/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts index ded2e3e7..4534ac39 100644 --- a/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts +++ b/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts @@ -40,12 +40,12 @@ export class GraphQLSubscriptionsFixture { return (this.graphqlService as any).subscriptions.length; } - getGraphqlServiceSubscriptionObserverMapSize(): number { - return Object.keys(this.getGraphqlServiceSubscriptionObserverMap()).length; + getMessageSubscribersSize(): number { + return (this.graphqlService as any).messageSubscribers.size; } - getGraphqlServiceSubscriptionObserverMap(): Record> { - return (this.graphqlService as any).subscriptionObserverMap; + hasMessageSubscriber(id: string): boolean { + return (this.graphqlService as any).messageSubscribers.has(id); } async waitForConnection() { @@ -125,6 +125,7 @@ export class GraphQLSubscriptionsFixture { } async cleanup() { + (this.graphqlService as any).clearSubscriptionRetry(); WS.clean(); await this.server.closed; } diff --git a/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts b/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts index 88ed8840..22c16641 100644 --- a/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts +++ b/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts @@ -1,10 +1,8 @@ import gql from 'graphql-tag'; import fetchMock from 'jest-fetch-mock'; import { WebSocket } from 'mock-socket'; -import { Subscriber } from 'rxjs'; import { ConnectionStatus } from '../../../model/connection-status'; import { GraphQLSubscriptionsFixture } from '../__fixtures__/graphql-subscriptions-fixture'; -import { QminderGraphQLError } from '../graphql.service'; jest.mock('isomorphic-ws', () => WebSocket); jest.mock('../../../util/sleep-ms/sleep-ms', () => ({ @@ -96,24 +94,20 @@ describe('GraphQL subscriptions', () => { }); it('cleans up internal state when unsubscribing', async () => { - // start the test with an empty observer-map - expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0); + expect(fixture.getMessageSubscribersSize()).toBe(0); const subscription = fixture.triggerSubscription(); await fixture.handleConnectionInit(); await fixture.consumeSubscribeMessage(); - // the observer map should equal { "1": Subscriber => spy } - expect(fixture.getGraphqlServiceSubscriptionObserverMap()).toEqual({ - '1': expect.any(Subscriber), - }); + expect(fixture.getMessageSubscribersSize()).toBe(1); + expect(fixture.hasMessageSubscriber('1')).toBe(true); - // unsubscribing should clean up subscription.unsubscribe(); await fixture.consumeAnyMessage(); - expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0); + expect(fixture.getMessageSubscribersSize()).toBe(0); }); it('when receiving a published message for a subscription that does not exist anymore, it does not throw', async () => { - expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0); + expect(fixture.getMessageSubscribersSize()).toBe(0); const subscription = fixture.triggerSubscription(); await fixture.handleConnectionInit(); @@ -335,43 +329,191 @@ describe('GraphQL subscriptions', () => { subscription.unsubscribe(); }); - it('error messages are propagated to the subscriber', async () => { - const ERRORS: QminderGraphQLError[] = [ - { - message: - "Invalid Syntax : offending token 'createdTickets' at line 1 column 1", - sourcePreview: - 'createdTickets(locationId: 673) {\n' + - ' id\n' + - ' firstName\n' + - ' lastName\n', - offendingToken: 'createdTickets', - locations: [], - errorType: 'InvalidSyntax', - extensions: null, - path: null, - }, - ]; + it('GQL_ERROR does not kill the subscription or trigger reconnect', async () => { + const reconnectSpy = jest.spyOn( + fixture.graphqlService as any, + 'handleConnectionDrop', + ); const errorSpy = jest.fn(); const subscription = fixture.triggerSubscription('subscription { baba }', { error: errorSpy, }); await fixture.handleConnectionInit(); await fixture.consumeSubscribeMessage(); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [{ message: 'Subscription limit reached' }], + }, + }); + + await new Promise((r) => setTimeout(r, 10)); + + expect(errorSpy).not.toHaveBeenCalled(); + expect(reconnectSpy).not.toHaveBeenCalled(); + expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(1); + expect(fixture.hasMessageSubscriber('1')).toBe(true); + + subscription.unsubscribe(); + }); + + it('GQL_ERROR emits true on the subscription error observable', async () => { + const values: boolean[] = []; + fixture.graphqlService + .getSubscriptionErrorObservable() + .subscribe((v) => values.push(v)); + + const subscription = fixture.triggerSubscription('subscription { baba }'); + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [ + { + message: + 'The maximum subscription limit of 100 has been reached', + }, + ], + }, + }); + + await new Promise((r) => setTimeout(r, 10)); + + expect(values).toEqual([false, true]); + + subscription.unsubscribe(); + }); + + it('retries failed subscriptions after delay and clears error state', async () => { + (fixture.graphqlService as any).subscriptionRetryDelayMs = 50; + + const values: boolean[] = []; + fixture.graphqlService + .getSubscriptionErrorObservable() + .subscribe((v) => values.push(v)); + + const subscription = fixture.triggerSubscription('subscription { baba }'); + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(); + fixture.server.send({ id: '1', type: 'error', payload: { data: null, - errors: ERRORS, + errors: [{ message: 'Limit reached' }], }, }); - expect(errorSpy).toHaveBeenCalledWith(ERRORS); - // Cleans up as well - expect( - (fixture.graphqlService as any).subscriptionObserverMap['1'], - ).toBeUndefined(); + await new Promise((r) => setTimeout(r, 10)); + expect(values).toEqual([false, true]); + + await new Promise((r) => setTimeout(r, 60)); + + expect(values).toEqual([false, true, false]); + expect(await fixture.getNextMessage()).toEqual({ + id: '1', + type: 'start', + payload: { query: 'subscription { baba }' }, + }); + + subscription.unsubscribe(); + }); + + it('does not send GQL_STOP when server sends GQL_COMPLETE', async () => { + const completeSpy = jest.fn(); + const subscription = fixture.triggerSubscription('subscription { baba }', { + next: () => {}, + complete: completeSpy, + }); + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(); + + fixture.sendMessageToClient({ + id: '1', + type: 'complete', + }); + + await new Promise((r) => setTimeout(r, 10)); + + expect(completeSpy).toHaveBeenCalled(); + expect(fixture.hasMessageSubscriber('1')).toBe(false); + expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(0); + expect(fixture.server.messagesToConsume.pendingItems).toHaveLength(0); + + subscription.unsubscribe(); + }); + + it('GQL_ERROR keeps subscription tracked so it re-subscribes on natural reconnect and clears error state', async () => { + const values: boolean[] = []; + fixture.graphqlService + .getSubscriptionErrorObservable() + .subscribe((v) => values.push(v)); + + const subscription = fixture.triggerSubscription('subscription { baba }'); + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(); + + fixture.sendMessageToClient({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [{ message: 'Limit reached' }], + }, + }); + + await new Promise((r) => setTimeout(r, 10)); + + expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(1); + expect(fixture.hasMessageSubscriber('1')).toBe(true); + expect(values).toEqual([false, true]); + + await fixture.closeWithCode(1001); + fixture.openServer(); + await fixture.handleConnectionInit(); + expect(await fixture.getNextMessage()).toEqual({ + id: '1', + type: 'start', + payload: { query: 'subscription { baba }' }, + }); + + expect(values).toEqual([false, true, false]); + + subscription.unsubscribe(); + }); + + it('cleans up subscription on unknown message type with errors', async () => { + const errorSpy = jest.fn(); + const subscription = fixture.triggerSubscription('subscription { baba }', { + error: errorSpy, + }); + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(); + + fixture.sendMessageToClient({ + id: '1', + type: 'unknown_type', + payload: { + errors: [{ message: 'Something went wrong' }], + }, + }); + + await new Promise((r) => setTimeout(r, 10)); + + expect(errorSpy).toHaveBeenCalledWith( + expect.objectContaining({ message: 'Something went wrong' }), + ); + expect(errorSpy.mock.calls[0][0]).toBeInstanceOf(Error); + expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(0); + expect(fixture.hasMessageSubscriber('1')).toBe(false); subscription.unsubscribe(); }); diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index 87458fff..58375f4e 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -9,28 +9,16 @@ import { BehaviorSubject, distinctUntilChanged, Observable, - Observer, shareReplay, + Subscriber, } from 'rxjs'; import { ConnectionStatus } from '../../model/connection-status.js'; +import { Logger } from '../../util/logger/logger.js'; import { calculateRandomizedExponentialBackoffTime } from '../../util/randomized-exponential-backoff/randomized-exponential-backoff.js'; import { sleepMs } from '../../util/sleep-ms/sleep-ms.js'; import { ApiBase, GraphqlQuery } from '../api-base/api-base.js'; import { TemporaryApiKeyService } from '../temporary-api-key/temporary-api-key.service.js'; -import { Logger } from '../../util/logger/logger.js'; - -type QueryOrDocument = string | DocumentNode; - -function queryToString(query: QueryOrDocument): string { - if (typeof query === 'string') { - return query; - } - if (query.kind === 'Document') { - return print(query); - } - throw new Error('queryToString: query must be a string or a DocumentNode'); -} export interface QminderGraphQLError { message: string; @@ -51,14 +39,9 @@ interface OperationMessage { }; } -class Subscription { - id: string; - query: string; - - constructor(id: string, query: string) { - this.id = id; - this.query = query; - } +interface Subscription { + readonly messageId: string; + readonly query: string; } enum MessageType { @@ -79,6 +62,7 @@ enum MessageType { const PONG_TIMEOUT_IN_MS = 12000; const PING_PONG_INTERVAL_IN_MS = 20000; +const SUBSCRIBER_RETRY_DELAY_MS = 5000; // https://www.w3.org/TR/websockets/#concept-websocket-close-fail const CLIENT_SIDE_CLOSE_EVENT = 1000; @@ -102,14 +86,21 @@ export class GraphqlService { ConnectionStatus.DISCONNECTED, ); - private nextSubscriptionId = 1; + private nextMessageId = 1; private subscriptions: Subscription[] = []; - private readonly subscriptionObserverMap: { [id: string]: Observer } = - {}; + private readonly messageSubscribers = new Map< + string, + Subscriber> + >(); private readonly subscriptionConnection$: Observable; + + private readonly hasSubscriberErrored$ = new BehaviorSubject(false); + private readonly failedSubscribers = new Set(); + + private subscriberRetryTimeout: ReturnType | null = null; private temporaryApiKeyService: TemporaryApiKeyService | undefined; private pongTimeout: any; @@ -198,25 +189,30 @@ export class GraphqlService { * ``` * * @param queryDocument required: the GraphQL query to send, for example `"subscription { createdTickets(locationId: 123) { id firstName } }"` - * @returns an RxJS Observable that will push data as - * @throws when the 'query' argument is undefined or an empty string + * @returns an RxJS Observable that will push data + * @throws when the 'queryDocument' argument is an empty string */ - subscribe(queryDocument: QueryOrDocument): Observable { - const query = queryToString(queryDocument); + subscribe>( + queryDocument: string | DocumentNode, + ): Observable { + const query = + typeof queryDocument === 'string' ? queryDocument : print(queryDocument); - if (!query || query.length === 0) { + if (!query) { throw new Error( 'GraphQLService query expects a GraphQL query as its first argument', ); } - return new Observable((observer: Observer) => { - const id = this.generateOperationId(); - this.subscriptions.push(new Subscription(id, query)); - this.sendMessage(id, MessageType.GQL_START, { query }); - this.subscriptionObserverMap[id] = observer; + return new Observable((subscriber) => { + const messageId = `${this.nextMessageId++}`; + this.subscriptions.push({ messageId, query }); + this.sendMessage(messageId, MessageType.GQL_START, { query }); + this.messageSubscribers.set(messageId, subscriber); - return () => this.stopSubscription(id); + return () => { + this.removeSubscriber(messageId); + }; }); } @@ -258,6 +254,13 @@ export class GraphqlService { return this.subscriptionConnection$; } + /** + * Emits `false` if errored subscriptions are successfully retried + */ + hasSubscriberErrored(): Observable { + return this.hasSubscriberErrored$.pipe(distinctUntilChanged()); + } + /** * Set the WebSocket hostname the GraphQL service uses. * @hidden @@ -266,19 +269,28 @@ export class GraphqlService { this.apiServer = apiServer; } - private stopSubscription(id: string) { - this.sendMessage(id, MessageType.GQL_STOP, null); - this.cleanupSubscription(id); + private removeSubscriber(messageId: string): void { + if (this.messageSubscribers.has(messageId)) { + this.sendMessage(messageId, MessageType.GQL_STOP, null); + } + + this.cleanUpSubscription(messageId); } - private cleanupSubscription(id: string) { - delete this.subscriptionObserverMap[id]; - this.subscriptions = this.subscriptions.filter((sub) => { - return sub.id !== id; - }); + private cleanUpSubscription(messageId: string): void { + this.messageSubscribers.delete(messageId); + this.failedSubscribers.delete(messageId); + + this.subscriptions = this.subscriptions.filter( + ({ messageId: id }) => id !== messageId, + ); + + if (!this.failedSubscribers.size) { + this.hasSubscriberErrored$.next(false); + } } - private openSocket() { + private async openSocket(): Promise { if ( [ConnectionStatus.CONNECTING, ConnectionStatus.CONNECTED].includes( this.connectionStatus, @@ -286,26 +298,23 @@ export class GraphqlService { ) { return; } + this.setConnectionStatus(ConnectionStatus.CONNECTING); this.logger.info('Connecting to websocket'); - this.fetchTemporaryApiKey() - .then((temporaryApiKey: string) => { - this.createSocketConnection(temporaryApiKey); - }) - .catch((e) => { - throw e; - }); + + const temporaryApiKey = await this.getTemporaryApiKey(); + this.createSocketConnection(temporaryApiKey); } - private async fetchTemporaryApiKey(): Promise { - return this.temporaryApiKeyService.fetchTemporaryApiKey(); + private async getTemporaryApiKey(): Promise { + return await this.temporaryApiKeyService.fetchTemporaryApiKey(); } private getServerUrl(temporaryApiKey: string): string { return `wss://${this.apiServer}:443/graphql/subscription?rest-api-key=${temporaryApiKey}`; } - private createSocketConnection(temporaryApiKey: string) { + private createSocketConnection(temporaryApiKey: string): void { if (this.socket) { this.socket.onclose = null; this.socket.onmessage = null; @@ -314,9 +323,10 @@ export class GraphqlService { this.socket.close(); this.socket = null; } - this.socket = new WebSocket(this.getServerUrl(temporaryApiKey)); + this.socket = new WebSocket(this.getServerUrl(temporaryApiKey)); const socket = this.socket; + socket.onopen = () => { this.sendRawMessage( JSON.stringify({ @@ -327,7 +337,7 @@ export class GraphqlService { ); }; - socket.onclose = (event: CloseEvent) => { + socket.onclose = (event) => { this.logger.warn('WebSocket connection closed:', { code: event.code, reason: event.reason, @@ -337,15 +347,18 @@ export class GraphqlService { this.socket = null; this.clearPingMonitoring(); + if (this.shouldRetry(event)) { const timer = calculateRandomizedExponentialBackoffTime( this.connectionAttemptsCount, ); + this.logger.info( `Waiting for ${timer.toFixed(1)}ms before reconnecting`, ); + sleepMs(timer).then(() => { - this.connectionAttemptsCount += 1; + this.connectionAttemptsCount++; this.openSocket(); }); } @@ -358,104 +371,133 @@ export class GraphqlService { }; socket.onerror = () => { - const message = 'Websocket error occurred!'; if (this.isBrowserOnline()) { - this.logger.error(message); + this.logger.error('Websocket error occurred!'); } else { - this.logger.info(message); + this.logger.info('Websocket error occurred!'); } }; - socket.onmessage = (rawMessage: { data: WebSocket.Data }) => { - if (typeof rawMessage.data === 'string') { - const message: OperationMessage = JSON.parse(rawMessage.data); - - switch (message.type) { - case MessageType.GQL_CONNECTION_KEEP_ALIVE: - break; - - case MessageType.GQL_CONNECTION_ACK: { - this.connectionAttemptsCount = 0; - this.setConnectionStatus(ConnectionStatus.CONNECTED); - this.logger.info('Connected to websocket'); - this.startConnectionMonitoring(); - let resubscriptionFailed = false; - for (const subscription of this.subscriptions) { - const payload = { query: subscription.query }; - const msg = JSON.stringify({ - id: subscription.id, - type: MessageType.GQL_START, - payload, - }); - if (!this.sendRawMessage(msg)) { - this.logger.warn( - `Failed to re-subscribe ${this.subscriptions.length} subscription(s): WebSocket not open`, - ); - resubscriptionFailed = true; - break; - } - } - if (resubscriptionFailed) { - this.handleConnectionDrop(); - } - break; - } + socket.onmessage = (event) => { + if (typeof event.data !== 'string') { + return; + } - case MessageType.GQL_DATA: - this.subscriptionObserverMap[message.id]?.next( - message.payload.data, - ); - break; + const message: OperationMessage = JSON.parse(event.data); - case MessageType.GQL_COMPLETE: - this.subscriptionObserverMap[message.id]?.complete(); - break; + switch (message.type) { + case MessageType.GQL_CONNECTION_KEEP_ALIVE: + break; - case MessageType.GQL_PONG: - clearTimeout(this.pongTimeout); - break; + case MessageType.GQL_CONNECTION_ACK: { + this.connectionAttemptsCount = 0; - case MessageType.GQL_ERROR: - this.subscriptionObserverMap[message.id]?.error( - message.payload.errors, - ); - this.cleanupSubscription(message.id); - break; + this.clearSubscriberRetry(); + this.failedSubscribers.clear(); + this.hasSubscriberErrored$.next(false); - default: - if (message.payload && message.payload.data) { - this.subscriptionObserverMap[message.id]?.error( - message.payload.data, - ); - } else if ( - message.payload.errors && - message.payload.errors.length > 0 - ) { - this.subscriptionObserverMap[message.id]?.error( - message.payload.errors, + this.setConnectionStatus(ConnectionStatus.CONNECTED); + this.logger.info('Connected to websocket'); + this.startConnectionMonitoring(); + + let resubscriptionFailed = false; + + for (const { messageId, query } of this.subscriptions) { + const msg = JSON.stringify({ + id: messageId, + type: MessageType.GQL_START, + payload: { query }, + }); + + if (!this.sendRawMessage(msg)) { + this.logger.warn( + `Failed to re-subscribe ${this.subscriptions.length} subscription(s): WebSocket not open`, ); + + resubscriptionFailed = true; + break; } + } + + if (resubscriptionFailed) { + this.handleConnectionDrop(); + } + + break; + } + + case MessageType.GQL_DATA: + this.messageSubscribers.get(message.id)?.next(message.payload.data); + break; + + case MessageType.GQL_COMPLETE: { + const subscriber: Subscriber> | undefined = + this.messageSubscribers.get(message.id); + + this.cleanUpSubscription(message.id); + subscriber?.complete(); + + break; + } + + case MessageType.GQL_PONG: + clearTimeout(this.pongTimeout); + break; + + case MessageType.GQL_ERROR: + this.logger.warn( + `GraphQL subscription error: ${JSON.stringify(message)}`, + ); + + this.failedSubscribers.add(message.id); + this.hasSubscriberErrored$.next(true); + + if (!this.subscriberRetryTimeout) { + this.scheduleSubscriberRetry(); + } + + break; + + default: { + const subscriber = this.messageSubscribers.get(message.id); + if (!subscriber) { + return; + } + + this.cleanUpSubscription(message.id); + + if (message.payload?.data) { + subscriber.error(message.payload.data); + return; + } + + if (message.payload?.errors?.length) { + subscriber.error( + new Error( + message.payload.errors.map(({ message }) => message).join('; '), + ), + ); + } } } }; } - private shouldRetry(event: CloseEvent) { - if (event.code !== CLIENT_SIDE_CLOSE_EVENT) { - return true; - } - - return Object.entries(this.subscriptionObserverMap).length > 0; + private shouldRetry(event: CloseEvent): boolean { + return ( + event.code !== CLIENT_SIDE_CLOSE_EVENT || !!this.messageSubscribers.size + ); } - private sendMessage(id: string, type: MessageType, payload: any) { - if (this.connectionStatus === ConnectionStatus.CONNECTED) { - if (!this.sendRawMessage(JSON.stringify({ id, type, payload }))) { - this.logger.warn('Message dropped: WebSocket is not in OPEN state'); - this.handleConnectionDrop(); - } - } else { + private sendMessage(id: string, type: MessageType, payload: any): void { + if (this.connectionStatus !== ConnectionStatus.CONNECTED) { this.openSocket(); + return; + } + + if (!this.sendRawMessage(JSON.stringify({ id, type, payload }))) { + this.logger.warn('Message dropped: WebSocket is not in OPEN state'); + this.handleConnectionDrop(); } } @@ -464,16 +506,11 @@ export class GraphqlService { this.socket.send(message); return true; } - return false; - } - private generateOperationId(): string { - const currentId = `${this.nextSubscriptionId}`; - this.nextSubscriptionId += 1; - return currentId; + return false; } - private setConnectionStatus(status: ConnectionStatus) { + private setConnectionStatus(status: ConnectionStatus): void { this.connectionStatus = status; this.connectionStatus$.next(status); } @@ -509,11 +546,13 @@ export class GraphqlService { if (this.connectionStatus === ConnectionStatus.CONNECTING) { return; } + if (this.isBrowserOnline()) { this.logger.warn(`Websocket connection dropped!`); } else { this.logger.info(`Websocket connection dropped. We are offline.`); } + this.setConnectionStatus(ConnectionStatus.DISCONNECTED); this.clearPingMonitoring(); @@ -525,14 +564,47 @@ export class GraphqlService { clearInterval(this.pingPongInterval); } + private clearSubscriberRetry(): void { + clearTimeout(this.subscriberRetryTimeout); + this.subscriberRetryTimeout = null; + } + + private scheduleSubscriberRetry(): void { + this.subscriberRetryTimeout = setTimeout(() => { + this.subscriberRetryTimeout = null; + this.retryFailedSubscribers(); + }, SUBSCRIBER_RETRY_DELAY_MS); + } + + private retryFailedSubscribers(): void { + const subscribers = [...this.failedSubscribers]; + + this.failedSubscribers.clear(); + this.hasSubscriberErrored$.next(false); + + for (const messageId of subscribers) { + const subscription = this.subscriptions.find( + (subscription) => subscription.messageId === messageId, + ); + + if (!subscription) { + continue; + } + + this.sendRawMessage( + JSON.stringify({ + id: subscription.messageId, + type: MessageType.GQL_START, + payload: { query: subscription.query }, + }), + ); + } + } + /** - * Returns the online status of the browser. - * In the non-browser environment (NodeJS) this always returns true. + * In a non-browser environment (NodeJS) returns true. */ private isBrowserOnline(): boolean { - if (typeof navigator === 'undefined') { - return true; - } - return navigator.onLine; + return typeof navigator === 'undefined' || navigator.onLine; } } From 8a07e0a186fd6fadb0182b434cfa663b1cb595a9 Mon Sep 17 00:00:00 2001 From: Rando Luik Date: Thu, 14 May 2026 15:28:57 +0300 Subject: [PATCH 02/11] Cleanup --- .../lib/services/graphql/graphql.service.ts | 171 +++++++++--------- 1 file changed, 81 insertions(+), 90 deletions(-) diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index 58375f4e..f864f8fc 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -20,6 +20,12 @@ import { sleepMs } from '../../util/sleep-ms/sleep-ms.js'; import { ApiBase, GraphqlQuery } from '../api-base/api-base.js'; import { TemporaryApiKeyService } from '../temporary-api-key/temporary-api-key.service.js'; +function parseQuery(queryOrDocumentNode: string | DocumentNode): string { + return typeof queryOrDocumentNode === 'string' + ? queryOrDocumentNode + : print(queryOrDocumentNode); +} + export interface QminderGraphQLError { message: string; errorType?: string | null; @@ -30,12 +36,12 @@ export interface QminderGraphQLError { path?: (string | number)[] | null; } -interface OperationMessage { - id?: string; - type: MessageType; - payload?: { - data?: T | null; - errors?: QminderGraphQLError[]; +interface Message { + readonly id?: string; + readonly type: MessageType; + readonly payload?: { + readonly data?: Record | null; + readonly errors?: QminderGraphQLError[]; }; } @@ -60,9 +66,9 @@ enum MessageType { GQL_ERROR = 'error', } +const ERRORED_SUBSCRIPTIONS_RETRY_DELAY_MS = 5 /* seconds */ * 1000; /* ms */ const PONG_TIMEOUT_IN_MS = 12000; const PING_PONG_INTERVAL_IN_MS = 20000; -const SUBSCRIBER_RETRY_DELAY_MS = 5000; // https://www.w3.org/TR/websockets/#concept-websocket-close-fail const CLIENT_SIDE_CLOSE_EVENT = 1000; @@ -86,7 +92,7 @@ export class GraphqlService { ConnectionStatus.DISCONNECTED, ); - private nextMessageId = 1; + private subcriptionsCount = 0; private subscriptions: Subscription[] = []; @@ -97,10 +103,13 @@ export class GraphqlService { private readonly subscriptionConnection$: Observable; - private readonly hasSubscriberErrored$ = new BehaviorSubject(false); - private readonly failedSubscribers = new Set(); + private readonly haveAnySubscriptionsErrored$ = new BehaviorSubject(false); + private readonly erroredSubscriptionsMessageIds = new Set(); + + private erroredSubscriptionsRetryTimeout: ReturnType< + typeof setTimeout + > | null = null; - private subscriberRetryTimeout: ReturnType | null = null; private temporaryApiKeyService: TemporaryApiKeyService | undefined; private pongTimeout: any; @@ -188,16 +197,14 @@ export class GraphqlService { * } * ``` * - * @param queryDocument required: the GraphQL query to send, for example `"subscription { createdTickets(locationId: 123) { id firstName } }"` + * @param queryOrDocumentNode the GraphQL query to send, for example `"subscription { createdTickets(locationId: 123) { id firstName } }"` * @returns an RxJS Observable that will push data * @throws when the 'queryDocument' argument is an empty string */ subscribe>( - queryDocument: string | DocumentNode, + queryOrDocumentNode: string | DocumentNode, ): Observable { - const query = - typeof queryDocument === 'string' ? queryDocument : print(queryDocument); - + const query = parseQuery(queryOrDocumentNode); if (!query) { throw new Error( 'GraphQLService query expects a GraphQL query as its first argument', @@ -205,13 +212,18 @@ export class GraphqlService { } return new Observable((subscriber) => { - const messageId = `${this.nextMessageId++}`; + const messageId = `${++this.subcriptionsCount}`; this.subscriptions.push({ messageId, query }); this.sendMessage(messageId, MessageType.GQL_START, { query }); this.messageSubscribers.set(messageId, subscriber); return () => { - this.removeSubscriber(messageId); + this.sendMessage(messageId, MessageType.GQL_STOP, null); + this.cleanUpSubscription(messageId); + + if (!this.erroredSubscriptionsMessageIds.size) { + this.haveAnySubscriptionsErrored$.next(false); + } }; }); } @@ -223,13 +235,13 @@ export class GraphqlService { * There is no need to call this method in order for data transfer to work. The `subscribe()` method also initializes * a websocket connection before proceeding. */ - openPendingWebSocket(): void { + async openPendingWebSocket(): Promise { if ( ![ConnectionStatus.CONNECTING, ConnectionStatus.CONNECTED].includes( this.connectionStatus, ) ) { - this.openSocket(); + await this.openSocket(); } } @@ -239,7 +251,7 @@ export class GraphqlService { * This method is automatically called when doing Qminder.setKey(). * @hidden */ - setKey(apiKey: string) { + setKey(apiKey: string): void { this.temporaryApiKeyService = new TemporaryApiKeyService( this.apiServer, apiKey, @@ -255,39 +267,29 @@ export class GraphqlService { } /** - * Emits `false` if errored subscriptions are successfully retried + * Have any GraphQL subscriptions been rejected by the server. + * + * Emits `false` if all errored subscriptions have been successfully retried. */ - hasSubscriberErrored(): Observable { - return this.hasSubscriberErrored$.pipe(distinctUntilChanged()); + haveAnySubscriptionsErrored(): Observable { + return this.haveAnySubscriptionsErrored$.pipe(distinctUntilChanged()); } /** * Set the WebSocket hostname the GraphQL service uses. * @hidden */ - setServer(apiServer: string) { + setServer(apiServer: string): void { this.apiServer = apiServer; } - private removeSubscriber(messageId: string): void { - if (this.messageSubscribers.has(messageId)) { - this.sendMessage(messageId, MessageType.GQL_STOP, null); - } - - this.cleanUpSubscription(messageId); - } - private cleanUpSubscription(messageId: string): void { + this.erroredSubscriptionsMessageIds.delete(messageId); this.messageSubscribers.delete(messageId); - this.failedSubscribers.delete(messageId); this.subscriptions = this.subscriptions.filter( - ({ messageId: id }) => id !== messageId, + (subscription) => subscription.messageId !== messageId, ); - - if (!this.failedSubscribers.size) { - this.hasSubscriberErrored$.next(false); - } } private async openSocket(): Promise { @@ -325,9 +327,8 @@ export class GraphqlService { } this.socket = new WebSocket(this.getServerUrl(temporaryApiKey)); - const socket = this.socket; - socket.onopen = () => { + this.socket.onopen = () => { this.sendRawMessage( JSON.stringify({ id: undefined, @@ -337,7 +338,7 @@ export class GraphqlService { ); }; - socket.onclose = (event) => { + this.socket.onclose = (event) => { this.logger.warn('WebSocket connection closed:', { code: event.code, reason: event.reason, @@ -345,7 +346,6 @@ export class GraphqlService { this.setConnectionStatus(ConnectionStatus.DISCONNECTED); this.socket = null; - this.clearPingMonitoring(); if (this.shouldRetry(event)) { @@ -370,7 +370,7 @@ export class GraphqlService { } }; - socket.onerror = () => { + this.socket.onerror = () => { if (this.isBrowserOnline()) { this.logger.error('Websocket error occurred!'); } else { @@ -378,12 +378,12 @@ export class GraphqlService { } }; - socket.onmessage = (event) => { + this.socket.onmessage = (event) => { if (typeof event.data !== 'string') { return; } - const message: OperationMessage = JSON.parse(event.data); + const message: Message = JSON.parse(event.data); switch (message.type) { case MessageType.GQL_CONNECTION_KEEP_ALIVE: @@ -392,9 +392,9 @@ export class GraphqlService { case MessageType.GQL_CONNECTION_ACK: { this.connectionAttemptsCount = 0; - this.clearSubscriberRetry(); - this.failedSubscribers.clear(); - this.hasSubscriberErrored$.next(false); + this.clearErroredSubscriptionsRetry(); + this.erroredSubscriptionsMessageIds.clear(); + this.haveAnySubscriptionsErrored$.next(false); this.setConnectionStatus(ConnectionStatus.CONNECTED); this.logger.info('Connected to websocket'); @@ -431,12 +431,8 @@ export class GraphqlService { break; case MessageType.GQL_COMPLETE: { - const subscriber: Subscriber> | undefined = - this.messageSubscribers.get(message.id); - + this.messageSubscribers.get(message.id)?.complete(); this.cleanUpSubscription(message.id); - subscriber?.complete(); - break; } @@ -449,11 +445,11 @@ export class GraphqlService { `GraphQL subscription error: ${JSON.stringify(message)}`, ); - this.failedSubscribers.add(message.id); - this.hasSubscriberErrored$.next(true); + this.erroredSubscriptionsMessageIds.add(message.id); + this.haveAnySubscriptionsErrored$.next(true); - if (!this.subscriberRetryTimeout) { - this.scheduleSubscriberRetry(); + if (!this.erroredSubscriptionsRetryTimeout) { + this.scheduleErroredSubscriptionsRetry(); } break; @@ -464,19 +460,12 @@ export class GraphqlService { return; } - this.cleanUpSubscription(message.id); - if (message.payload?.data) { subscriber.error(message.payload.data); - return; - } - - if (message.payload?.errors?.length) { - subscriber.error( - new Error( - message.payload.errors.map(({ message }) => message).join('; '), - ), - ); + this.cleanUpSubscription(message.id); + } else if (message.payload?.errors?.length) { + subscriber.error(message.payload.errors); + this.cleanUpSubscription(message.id); } } } @@ -489,15 +478,19 @@ export class GraphqlService { ); } - private sendMessage(id: string, type: MessageType, payload: any): void { + private async sendMessage( + id: string, + type: MessageType, + payload: Record | null, + ): Promise { if (this.connectionStatus !== ConnectionStatus.CONNECTED) { - this.openSocket(); + await this.openSocket(); return; } if (!this.sendRawMessage(JSON.stringify({ id, type, payload }))) { this.logger.warn('Message dropped: WebSocket is not in OPEN state'); - this.handleConnectionDrop(); + await this.handleConnectionDrop(); } } @@ -542,7 +535,7 @@ export class GraphqlService { this.sendRawMessage(JSON.stringify({ type: MessageType.GQL_PING })); } - private handleConnectionDrop(): void { + private async handleConnectionDrop(): Promise { if (this.connectionStatus === ConnectionStatus.CONNECTING) { return; } @@ -556,7 +549,7 @@ export class GraphqlService { this.setConnectionStatus(ConnectionStatus.DISCONNECTED); this.clearPingMonitoring(); - this.openSocket(); + await this.openSocket(); } private clearPingMonitoring(): void { @@ -564,25 +557,20 @@ export class GraphqlService { clearInterval(this.pingPongInterval); } - private clearSubscriberRetry(): void { - clearTimeout(this.subscriberRetryTimeout); - this.subscriberRetryTimeout = null; + private clearErroredSubscriptionsRetry(): void { + clearTimeout(this.erroredSubscriptionsRetryTimeout ?? undefined); + this.erroredSubscriptionsRetryTimeout = null; } - private scheduleSubscriberRetry(): void { - this.subscriberRetryTimeout = setTimeout(() => { - this.subscriberRetryTimeout = null; - this.retryFailedSubscribers(); - }, SUBSCRIBER_RETRY_DELAY_MS); + private scheduleErroredSubscriptionsRetry(): void { + this.erroredSubscriptionsRetryTimeout = setTimeout(() => { + this.retryErroredSubscriptions(); + this.erroredSubscriptionsRetryTimeout = null; + }, ERRORED_SUBSCRIPTIONS_RETRY_DELAY_MS); } - private retryFailedSubscribers(): void { - const subscribers = [...this.failedSubscribers]; - - this.failedSubscribers.clear(); - this.hasSubscriberErrored$.next(false); - - for (const messageId of subscribers) { + private retryErroredSubscriptions(): void { + for (const messageId of this.erroredSubscriptionsMessageIds) { const subscription = this.subscriptions.find( (subscription) => subscription.messageId === messageId, ); @@ -599,10 +587,13 @@ export class GraphqlService { }), ); } + + this.erroredSubscriptionsMessageIds.clear(); + this.haveAnySubscriptionsErrored$.next(false); } /** - * In a non-browser environment (NodeJS) returns true. + * In a non-browser environment (NodeJS) returns `true`. */ private isBrowserOnline(): boolean { return typeof navigator === 'undefined' || navigator.onLine; From aa4fab18efcd2ad240c1e110b7de37baa8decc18 Mon Sep 17 00:00:00 2001 From: Rando Luik Date: Thu, 14 May 2026 15:39:44 +0300 Subject: [PATCH 03/11] Refactor to remove unnecessary steps --- .../lib/services/graphql/graphql.service.ts | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index f864f8fc..1b22d2ab 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -218,12 +218,11 @@ export class GraphqlService { this.messageSubscribers.set(messageId, subscriber); return () => { - this.sendMessage(messageId, MessageType.GQL_STOP, null); - this.cleanUpSubscription(messageId); - - if (!this.erroredSubscriptionsMessageIds.size) { - this.haveAnySubscriptionsErrored$.next(false); + if (this.messageSubscribers.has(messageId)) { + this.sendMessage(messageId, MessageType.GQL_STOP, null); } + + this.cleanUpSubscription(messageId); }; }); } @@ -290,6 +289,10 @@ export class GraphqlService { this.subscriptions = this.subscriptions.filter( (subscription) => subscription.messageId !== messageId, ); + + if (!this.erroredSubscriptionsMessageIds.size) { + this.haveAnySubscriptionsErrored$.next(false); + } } private async openSocket(): Promise { @@ -431,8 +434,9 @@ export class GraphqlService { break; case MessageType.GQL_COMPLETE: { - this.messageSubscribers.get(message.id)?.complete(); + const subscriber = this.messageSubscribers.get(message.id); this.cleanUpSubscription(message.id); + subscriber?.complete(); break; } @@ -461,11 +465,11 @@ export class GraphqlService { } if (message.payload?.data) { - subscriber.error(message.payload.data); this.cleanUpSubscription(message.id); + subscriber.error(message.payload.data); } else if (message.payload?.errors?.length) { - subscriber.error(message.payload.errors); this.cleanUpSubscription(message.id); + subscriber.error(message.payload.errors); } } } From a26644be104497a6c87b97fea819341f40211aa6 Mon Sep 17 00:00:00 2001 From: Rando Luik Date: Thu, 14 May 2026 18:41:47 +0300 Subject: [PATCH 04/11] Add exponential backoff --- .../lib/services/graphql/graphql.service.ts | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index 1b22d2ab..af4ac8fb 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -66,7 +66,6 @@ enum MessageType { GQL_ERROR = 'error', } -const ERRORED_SUBSCRIPTIONS_RETRY_DELAY_MS = 5 /* seconds */ * 1000; /* ms */ const PONG_TIMEOUT_IN_MS = 12000; const PING_PONG_INTERVAL_IN_MS = 20000; @@ -110,6 +109,8 @@ export class GraphqlService { typeof setTimeout > | null = null; + private erroredSubscriptionsRetryCount = 0; + private temporaryApiKeyService: TemporaryApiKeyService | undefined; private pongTimeout: any; @@ -356,9 +357,7 @@ export class GraphqlService { this.connectionAttemptsCount, ); - this.logger.info( - `Waiting for ${timer.toFixed(1)}ms before reconnecting`, - ); + this.logger.info(`Reconnect socket in ${timer.toFixed(1)}ms`); sleepMs(timer).then(() => { this.connectionAttemptsCount++; @@ -396,6 +395,7 @@ export class GraphqlService { this.connectionAttemptsCount = 0; this.clearErroredSubscriptionsRetry(); + this.erroredSubscriptionsRetryCount = 0; this.erroredSubscriptionsMessageIds.clear(); this.haveAnySubscriptionsErrored$.next(false); @@ -430,6 +430,13 @@ export class GraphqlService { } case MessageType.GQL_DATA: + if ( + this.erroredSubscriptionsMessageIds.delete(message.id) && + !this.erroredSubscriptionsMessageIds.size + ) { + this.haveAnySubscriptionsErrored$.next(false); + } + this.messageSubscribers.get(message.id)?.next(message.payload.data); break; @@ -567,10 +574,16 @@ export class GraphqlService { } private scheduleErroredSubscriptionsRetry(): void { + const delay = calculateRandomizedExponentialBackoffTime( + this.erroredSubscriptionsRetryCount++, + ); + + this.logger.info(`Retry errored subscriptions in ${delay.toFixed(0)}ms`); + this.erroredSubscriptionsRetryTimeout = setTimeout(() => { this.retryErroredSubscriptions(); this.erroredSubscriptionsRetryTimeout = null; - }, ERRORED_SUBSCRIPTIONS_RETRY_DELAY_MS); + }, delay); } private retryErroredSubscriptions(): void { @@ -591,9 +604,6 @@ export class GraphqlService { }), ); } - - this.erroredSubscriptionsMessageIds.clear(); - this.haveAnySubscriptionsErrored$.next(false); } /** From b926a0e991681f30e69c54934ddf840707782606 Mon Sep 17 00:00:00 2001 From: Rando Luik Date: Thu, 14 May 2026 18:45:05 +0300 Subject: [PATCH 05/11] Improve JS docs formatting --- .../javascript-api/src/lib/services/graphql/graphql.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index af4ac8fb..dbf186fc 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -200,7 +200,7 @@ export class GraphqlService { * * @param queryOrDocumentNode the GraphQL query to send, for example `"subscription { createdTickets(locationId: 123) { id firstName } }"` * @returns an RxJS Observable that will push data - * @throws when the 'queryDocument' argument is an empty string + * @throws when the `queryDocument` argument is an empty string */ subscribe>( queryOrDocumentNode: string | DocumentNode, From f8a6aaeac1edd31c9a7d2ed7596aa10d0ddf5195 Mon Sep 17 00:00:00 2001 From: Rando Luik Date: Thu, 14 May 2026 19:39:37 +0300 Subject: [PATCH 06/11] Simplify --- .../lib/services/graphql/graphql.service.ts | 112 ++++++++++++------ 1 file changed, 79 insertions(+), 33 deletions(-) diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index dbf186fc..eb915b22 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -8,9 +8,14 @@ import WebSocket, { CloseEvent } from 'isomorphic-ws'; import { BehaviorSubject, distinctUntilChanged, + map, Observable, + scan, shareReplay, + startWith, + Subject, Subscriber, + take, } from 'rxjs'; import { ConnectionStatus } from '../../model/connection-status.js'; @@ -102,8 +107,44 @@ export class GraphqlService { private readonly subscriptionConnection$: Observable; - private readonly haveAnySubscriptionsErrored$ = new BehaviorSubject(false); - private readonly erroredSubscriptionsMessageIds = new Set(); + private readonly erroredSubscriptionsAction$ = new Subject< + | { + readonly type: 'add'; + readonly messageId: string; + } + | { + readonly type: 'remove'; + readonly messageId: string; + } + | { + readonly type: 'clear'; + } + >(); + + private readonly erroredSubscriptionsMessageIds$ = + this.erroredSubscriptionsAction$.pipe( + scan((messageIds, action) => { + const result = new Set(messageIds); + + switch (action.type) { + case 'add': + return result.add(action.messageId); + case 'remove': + result.delete(action.messageId); + return result; + case 'clear': + return new Set(); + } + }, new Set()), + startWith(new Set()), + shareReplay(1), + ); + + private readonly haveAnySubscriptionsErrored$ = + this.erroredSubscriptionsMessageIds$.pipe( + map(({ size }) => !!size), + distinctUntilChanged(), + ); private erroredSubscriptionsRetryTimeout: ReturnType< typeof setTimeout @@ -129,6 +170,8 @@ export class GraphqlService { distinctUntilChanged(), shareReplay(1), ); + + this.erroredSubscriptionsMessageIds$.subscribe(); } /** @@ -272,7 +315,7 @@ export class GraphqlService { * Emits `false` if all errored subscriptions have been successfully retried. */ haveAnySubscriptionsErrored(): Observable { - return this.haveAnySubscriptionsErrored$.pipe(distinctUntilChanged()); + return this.haveAnySubscriptionsErrored$; } /** @@ -284,16 +327,16 @@ export class GraphqlService { } private cleanUpSubscription(messageId: string): void { - this.erroredSubscriptionsMessageIds.delete(messageId); + this.erroredSubscriptionsAction$.next({ + type: 'remove', + messageId, + }); + this.messageSubscribers.delete(messageId); this.subscriptions = this.subscriptions.filter( (subscription) => subscription.messageId !== messageId, ); - - if (!this.erroredSubscriptionsMessageIds.size) { - this.haveAnySubscriptionsErrored$.next(false); - } } private async openSocket(): Promise { @@ -396,8 +439,7 @@ export class GraphqlService { this.clearErroredSubscriptionsRetry(); this.erroredSubscriptionsRetryCount = 0; - this.erroredSubscriptionsMessageIds.clear(); - this.haveAnySubscriptionsErrored$.next(false); + this.erroredSubscriptionsAction$.next({ type: 'clear' }); this.setConnectionStatus(ConnectionStatus.CONNECTED); this.logger.info('Connected to websocket'); @@ -430,12 +472,10 @@ export class GraphqlService { } case MessageType.GQL_DATA: - if ( - this.erroredSubscriptionsMessageIds.delete(message.id) && - !this.erroredSubscriptionsMessageIds.size - ) { - this.haveAnySubscriptionsErrored$.next(false); - } + this.erroredSubscriptionsAction$.next({ + type: 'remove', + messageId: message.id, + }); this.messageSubscribers.get(message.id)?.next(message.payload.data); break; @@ -456,8 +496,10 @@ export class GraphqlService { `GraphQL subscription error: ${JSON.stringify(message)}`, ); - this.erroredSubscriptionsMessageIds.add(message.id); - this.haveAnySubscriptionsErrored$.next(true); + this.erroredSubscriptionsAction$.next({ + type: 'add', + messageId: message.id, + }); if (!this.erroredSubscriptionsRetryTimeout) { this.scheduleErroredSubscriptionsRetry(); @@ -587,23 +629,27 @@ export class GraphqlService { } private retryErroredSubscriptions(): void { - for (const messageId of this.erroredSubscriptionsMessageIds) { - const subscription = this.subscriptions.find( - (subscription) => subscription.messageId === messageId, - ); + this.erroredSubscriptionsMessageIds$ + .pipe(take(1)) + .subscribe((messageIds) => { + for (const messageId of messageIds) { + const subscription = this.subscriptions.find( + (subscription) => subscription.messageId === messageId, + ); - if (!subscription) { - continue; - } + if (!subscription) { + continue; + } - this.sendRawMessage( - JSON.stringify({ - id: subscription.messageId, - type: MessageType.GQL_START, - payload: { query: subscription.query }, - }), - ); - } + this.sendRawMessage( + JSON.stringify({ + id: subscription.messageId, + type: MessageType.GQL_START, + payload: { query: subscription.query }, + }), + ); + } + }); } /** From fd52a21c0e9fceb60c8b7a7e14183985b7e5cc89 Mon Sep 17 00:00:00 2001 From: Rando Luik Date: Thu, 14 May 2026 20:13:24 +0300 Subject: [PATCH 07/11] Cleanup --- .../src/lib/services/graphql/graphql.service.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index eb915b22..1140844a 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -158,9 +158,6 @@ export class GraphqlService { private pingPongInterval: any; private readonly sendPingWithThisBound = this.sendPing.bind(this); - private readonly handleConnectionDropWithThisBound = - this.handleConnectionDrop.bind(this); - private connectionAttemptsCount = 0; constructor() { @@ -400,7 +397,7 @@ export class GraphqlService { this.connectionAttemptsCount, ); - this.logger.info(`Reconnect socket in ${timer.toFixed(1)}ms`); + this.logger.info(`Reconnect socket in ${timer.toFixed(0)}ms`); sleepMs(timer).then(() => { this.connectionAttemptsCount++; @@ -582,9 +579,10 @@ export class GraphqlService { private sendPing(): void { this.pongTimeout = setTimeout( - this.handleConnectionDropWithThisBound, + () => this.handleConnectionDrop(), PONG_TIMEOUT_IN_MS, ); + this.sendRawMessage(JSON.stringify({ type: MessageType.GQL_PING })); } From 333ce7d6bf93e2eff5bb129df046d511f9e717c3 Mon Sep 17 00:00:00 2001 From: Rando Luik Date: Thu, 14 May 2026 20:43:08 +0300 Subject: [PATCH 08/11] Improve JSDoc --- .../lib/services/graphql/graphql.service.ts | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index 1140844a..cc4fafa0 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -223,24 +223,36 @@ export class GraphqlService { /** * Subscribe to Qminder Events API using GraphQL. * - * For example + * @example + * + * Be notified of any created tickets * * ```javascript * import { Qminder } from 'qminder-api'; - * // 1. Be notified of any created tickets - * try { - * const observable = Qminder.GraphQL.subscribe("subscription { createdTickets(locationId: 123) { id firstName } }") * - * observable.subscribe(data => console.log(data)); - * // => { createdTickets: { id: '12', firstName: 'Marta' } } + * try { + * Qminder.GraphQL.subscribe(` + * subscription { + * createdTickets(locationId: 123) { + * id + * firstName + * } + * } + * `).subscribe((data) => { + * console.log(data); // { createdTickets: { id: '12', firstName: 'Marta' } } + * }); * } catch (error) { * console.error(error); * } * ``` * * @param queryOrDocumentNode the GraphQL query to send, for example `"subscription { createdTickets(locationId: 123) { id firstName } }"` - * @returns an RxJS Observable that will push data + * @returns a RxJS Observable that will push data * @throws when the `queryDocument` argument is an empty string + * + * Retries errored subscriptions (doesn't throw) with exponential backoff. + * + * To get notified when any subscriptions have errored, use the {@link haveAnySubscriptionsErrored} method. */ subscribe>( queryOrDocumentNode: string | DocumentNode, From 6e7f6127c78dc1ff7a16919b06a6cead7b3ef0ae Mon Sep 17 00:00:00 2001 From: Rando Luik Date: Thu, 14 May 2026 22:23:41 +0300 Subject: [PATCH 09/11] Add subscription retry limit --- .../lib/services/graphql/graphql.service.ts | 39 ++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index cc4fafa0..04e3f591 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -71,6 +71,7 @@ enum MessageType { GQL_ERROR = 'error', } +const ERRORED_SUBSCRIPTIONS_RETRY_LIMIT = 3; const PONG_TIMEOUT_IN_MS = 12000; const PING_PONG_INTERVAL_IN_MS = 20000; @@ -250,7 +251,7 @@ export class GraphqlService { * @returns a RxJS Observable that will push data * @throws when the `queryDocument` argument is an empty string * - * Retries errored subscriptions (doesn't throw) with exponential backoff. + * Retries errored subscriptions up to 3 times with exponential backoff. Afterwards throws an error. * * To get notified when any subscriptions have errored, use the {@link haveAnySubscriptionsErrored} method. */ @@ -510,8 +511,14 @@ export class GraphqlService { messageId: message.id, }); - if (!this.erroredSubscriptionsRetryTimeout) { + if ( + this.erroredSubscriptionsRetryCount < + ERRORED_SUBSCRIPTIONS_RETRY_LIMIT && + !this.erroredSubscriptionsRetryTimeout + ) { this.scheduleErroredSubscriptionsRetry(); + } else if (!this.erroredSubscriptionsRetryTimeout) { + this.failErroredSubscriptions(); } break; @@ -626,18 +633,38 @@ export class GraphqlService { } private scheduleErroredSubscriptionsRetry(): void { - const delay = calculateRandomizedExponentialBackoffTime( - this.erroredSubscriptionsRetryCount++, - ); - + const retryCount = this.erroredSubscriptionsRetryCount + 1; + const delay = calculateRandomizedExponentialBackoffTime(retryCount); this.logger.info(`Retry errored subscriptions in ${delay.toFixed(0)}ms`); this.erroredSubscriptionsRetryTimeout = setTimeout(() => { this.retryErroredSubscriptions(); + this.erroredSubscriptionsRetryCount = retryCount; this.erroredSubscriptionsRetryTimeout = null; }, delay); } + private failErroredSubscriptions(): void { + this.logger.error( + `Errored subscriptions retry limit (${ERRORED_SUBSCRIPTIONS_RETRY_LIMIT}) reached, giving up`, + ); + + this.erroredSubscriptionsMessageIds$ + .pipe(take(1)) + .subscribe((messageIds) => { + for (const messageId of messageIds) { + const subscriber = this.messageSubscribers.get(messageId); + this.cleanUpSubscription(messageId); + + subscriber?.error( + new Error( + `Subscription failed after ${this.erroredSubscriptionsRetryCount} retries`, + ), + ); + } + }); + } + private retryErroredSubscriptions(): void { this.erroredSubscriptionsMessageIds$ .pipe(take(1)) From 31eb44e407b07574d7923dc9e535a0c28b4a4f16 Mon Sep 17 00:00:00 2001 From: Rando Luik Date: Thu, 14 May 2026 22:41:04 +0300 Subject: [PATCH 10/11] Log unhandled promise errors --- .../lib/services/graphql/graphql.service.ts | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index 04e3f591..2bcd0973 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -268,12 +268,22 @@ export class GraphqlService { return new Observable((subscriber) => { const messageId = `${++this.subcriptionsCount}`; this.subscriptions.push({ messageId, query }); - this.sendMessage(messageId, MessageType.GQL_START, { query }); + + this.sendMessage(messageId, MessageType.GQL_START, { query }).catch( + (error: Error) => { + this.logger.error('Failed to start subscription: ', error); + }, + ); + this.messageSubscribers.set(messageId, subscriber); return () => { if (this.messageSubscribers.has(messageId)) { - this.sendMessage(messageId, MessageType.GQL_STOP, null); + this.sendMessage(messageId, MessageType.GQL_STOP, null).catch( + (error) => { + this.logger.error('Failed to stop subscription: ', error); + }, + ); } this.cleanUpSubscription(messageId); @@ -412,10 +422,14 @@ export class GraphqlService { this.logger.info(`Reconnect socket in ${timer.toFixed(0)}ms`); - sleepMs(timer).then(() => { - this.connectionAttemptsCount++; - this.openSocket(); - }); + sleepMs(timer) + .then(async () => { + this.connectionAttemptsCount++; + return await this.openSocket(); + }) + .catch((error: Error) => { + this.logger.error('Failed to reconnect socket: ', error); + }); } if (this.connectionStatus === ConnectionStatus.CONNECTING) { @@ -475,7 +489,12 @@ export class GraphqlService { } if (resubscriptionFailed) { - this.handleConnectionDrop(); + this.handleConnectionDrop().catch((error) => { + this.logger.error( + 'Failed to handle connection drop after resubscription failure: ', + error, + ); + }); } break; @@ -597,10 +616,11 @@ export class GraphqlService { } private sendPing(): void { - this.pongTimeout = setTimeout( - () => this.handleConnectionDrop(), - PONG_TIMEOUT_IN_MS, - ); + this.pongTimeout = setTimeout(() => { + this.handleConnectionDrop().catch((error) => { + this.logger.error('Failed to handle pong connection drop: ', error); + }); + }, PONG_TIMEOUT_IN_MS); this.sendRawMessage(JSON.stringify({ type: MessageType.GQL_PING })); } From c9e850b87c5741f2da8a797cf40a44d04d4c3a93 Mon Sep 17 00:00:00 2001 From: Rando Luik Date: Fri, 15 May 2026 14:34:03 +0300 Subject: [PATCH 11/11] Improve log --- .../src/lib/services/graphql/graphql.service.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index 2bcd0973..a782a49b 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -655,7 +655,10 @@ export class GraphqlService { private scheduleErroredSubscriptionsRetry(): void { const retryCount = this.erroredSubscriptionsRetryCount + 1; const delay = calculateRandomizedExponentialBackoffTime(retryCount); - this.logger.info(`Retry errored subscriptions in ${delay.toFixed(0)}ms`); + + this.logger.info( + `Retry (${retryCount}) errored subscriptions in ${delay.toFixed(0)}ms`, + ); this.erroredSubscriptionsRetryTimeout = setTimeout(() => { this.retryErroredSubscriptions();