diff --git a/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts b/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts index ded2e3e7..4534ac39 100644 --- a/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts +++ b/packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts @@ -40,12 +40,12 @@ export class GraphQLSubscriptionsFixture { return (this.graphqlService as any).subscriptions.length; } - getGraphqlServiceSubscriptionObserverMapSize(): number { - return Object.keys(this.getGraphqlServiceSubscriptionObserverMap()).length; + getMessageSubscribersSize(): number { + return (this.graphqlService as any).messageSubscribers.size; } - getGraphqlServiceSubscriptionObserverMap(): Record> { - return (this.graphqlService as any).subscriptionObserverMap; + hasMessageSubscriber(id: string): boolean { + return (this.graphqlService as any).messageSubscribers.has(id); } async waitForConnection() { @@ -125,6 +125,7 @@ export class GraphQLSubscriptionsFixture { } async cleanup() { + (this.graphqlService as any).clearSubscriptionRetry(); WS.clean(); await this.server.closed; } diff --git a/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts b/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts index 88ed8840..22c16641 100644 --- a/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts +++ b/packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts @@ -1,10 +1,8 @@ import gql from 'graphql-tag'; import fetchMock from 'jest-fetch-mock'; import { WebSocket } from 'mock-socket'; -import { Subscriber } from 'rxjs'; import { ConnectionStatus } from '../../../model/connection-status'; import { GraphQLSubscriptionsFixture } from '../__fixtures__/graphql-subscriptions-fixture'; -import { QminderGraphQLError } from '../graphql.service'; jest.mock('isomorphic-ws', () => WebSocket); jest.mock('../../../util/sleep-ms/sleep-ms', () => ({ @@ -96,24 +94,20 @@ describe('GraphQL subscriptions', () => { }); it('cleans up internal state when unsubscribing', async () => { - // start the test with an empty observer-map - expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0); + expect(fixture.getMessageSubscribersSize()).toBe(0); const subscription = fixture.triggerSubscription(); await fixture.handleConnectionInit(); await fixture.consumeSubscribeMessage(); - // the observer map should equal { "1": Subscriber => spy } - expect(fixture.getGraphqlServiceSubscriptionObserverMap()).toEqual({ - '1': expect.any(Subscriber), - }); + expect(fixture.getMessageSubscribersSize()).toBe(1); + expect(fixture.hasMessageSubscriber('1')).toBe(true); - // unsubscribing should clean up subscription.unsubscribe(); await fixture.consumeAnyMessage(); - expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0); + expect(fixture.getMessageSubscribersSize()).toBe(0); }); it('when receiving a published message for a subscription that does not exist anymore, it does not throw', async () => { - expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0); + expect(fixture.getMessageSubscribersSize()).toBe(0); const subscription = fixture.triggerSubscription(); await fixture.handleConnectionInit(); @@ -335,43 +329,191 @@ describe('GraphQL subscriptions', () => { subscription.unsubscribe(); }); - it('error messages are propagated to the subscriber', async () => { - const ERRORS: QminderGraphQLError[] = [ - { - message: - "Invalid Syntax : offending token 'createdTickets' at line 1 column 1", - sourcePreview: - 'createdTickets(locationId: 673) {\n' + - ' id\n' + - ' firstName\n' + - ' lastName\n', - offendingToken: 'createdTickets', - locations: [], - errorType: 'InvalidSyntax', - extensions: null, - path: null, - }, - ]; + it('GQL_ERROR does not kill the subscription or trigger reconnect', async () => { + const reconnectSpy = jest.spyOn( + fixture.graphqlService as any, + 'handleConnectionDrop', + ); const errorSpy = jest.fn(); const subscription = fixture.triggerSubscription('subscription { baba }', { error: errorSpy, }); await fixture.handleConnectionInit(); await fixture.consumeSubscribeMessage(); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [{ message: 'Subscription limit reached' }], + }, + }); + + await new Promise((r) => setTimeout(r, 10)); + + expect(errorSpy).not.toHaveBeenCalled(); + expect(reconnectSpy).not.toHaveBeenCalled(); + expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(1); + expect(fixture.hasMessageSubscriber('1')).toBe(true); + + subscription.unsubscribe(); + }); + + it('GQL_ERROR emits true on the subscription error observable', async () => { + const values: boolean[] = []; + fixture.graphqlService + .getSubscriptionErrorObservable() + .subscribe((v) => values.push(v)); + + const subscription = fixture.triggerSubscription('subscription { baba }'); + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(); + + fixture.server.send({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [ + { + message: + 'The maximum subscription limit of 100 has been reached', + }, + ], + }, + }); + + await new Promise((r) => setTimeout(r, 10)); + + expect(values).toEqual([false, true]); + + subscription.unsubscribe(); + }); + + it('retries failed subscriptions after delay and clears error state', async () => { + (fixture.graphqlService as any).subscriptionRetryDelayMs = 50; + + const values: boolean[] = []; + fixture.graphqlService + .getSubscriptionErrorObservable() + .subscribe((v) => values.push(v)); + + const subscription = fixture.triggerSubscription('subscription { baba }'); + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(); + fixture.server.send({ id: '1', type: 'error', payload: { data: null, - errors: ERRORS, + errors: [{ message: 'Limit reached' }], }, }); - expect(errorSpy).toHaveBeenCalledWith(ERRORS); - // Cleans up as well - expect( - (fixture.graphqlService as any).subscriptionObserverMap['1'], - ).toBeUndefined(); + await new Promise((r) => setTimeout(r, 10)); + expect(values).toEqual([false, true]); + + await new Promise((r) => setTimeout(r, 60)); + + expect(values).toEqual([false, true, false]); + expect(await fixture.getNextMessage()).toEqual({ + id: '1', + type: 'start', + payload: { query: 'subscription { baba }' }, + }); + + subscription.unsubscribe(); + }); + + it('does not send GQL_STOP when server sends GQL_COMPLETE', async () => { + const completeSpy = jest.fn(); + const subscription = fixture.triggerSubscription('subscription { baba }', { + next: () => {}, + complete: completeSpy, + }); + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(); + + fixture.sendMessageToClient({ + id: '1', + type: 'complete', + }); + + await new Promise((r) => setTimeout(r, 10)); + + expect(completeSpy).toHaveBeenCalled(); + expect(fixture.hasMessageSubscriber('1')).toBe(false); + expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(0); + expect(fixture.server.messagesToConsume.pendingItems).toHaveLength(0); + + subscription.unsubscribe(); + }); + + it('GQL_ERROR keeps subscription tracked so it re-subscribes on natural reconnect and clears error state', async () => { + const values: boolean[] = []; + fixture.graphqlService + .getSubscriptionErrorObservable() + .subscribe((v) => values.push(v)); + + const subscription = fixture.triggerSubscription('subscription { baba }'); + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(); + + fixture.sendMessageToClient({ + id: '1', + type: 'error', + payload: { + data: null, + errors: [{ message: 'Limit reached' }], + }, + }); + + await new Promise((r) => setTimeout(r, 10)); + + expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(1); + expect(fixture.hasMessageSubscriber('1')).toBe(true); + expect(values).toEqual([false, true]); + + await fixture.closeWithCode(1001); + fixture.openServer(); + await fixture.handleConnectionInit(); + expect(await fixture.getNextMessage()).toEqual({ + id: '1', + type: 'start', + payload: { query: 'subscription { baba }' }, + }); + + expect(values).toEqual([false, true, false]); + + subscription.unsubscribe(); + }); + + it('cleans up subscription on unknown message type with errors', async () => { + const errorSpy = jest.fn(); + const subscription = fixture.triggerSubscription('subscription { baba }', { + error: errorSpy, + }); + await fixture.handleConnectionInit(); + await fixture.consumeSubscribeMessage(); + + fixture.sendMessageToClient({ + id: '1', + type: 'unknown_type', + payload: { + errors: [{ message: 'Something went wrong' }], + }, + }); + + await new Promise((r) => setTimeout(r, 10)); + + expect(errorSpy).toHaveBeenCalledWith( + expect.objectContaining({ message: 'Something went wrong' }), + ); + expect(errorSpy.mock.calls[0][0]).toBeInstanceOf(Error); + expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(0); + expect(fixture.hasMessageSubscriber('1')).toBe(false); subscription.unsubscribe(); }); diff --git a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts index 87458fff..a782a49b 100644 --- a/packages/javascript-api/src/lib/services/graphql/graphql.service.ts +++ b/packages/javascript-api/src/lib/services/graphql/graphql.service.ts @@ -8,28 +8,27 @@ import WebSocket, { CloseEvent } from 'isomorphic-ws'; import { BehaviorSubject, distinctUntilChanged, + map, Observable, - Observer, + scan, shareReplay, + startWith, + Subject, + Subscriber, + take, } from 'rxjs'; import { ConnectionStatus } from '../../model/connection-status.js'; +import { Logger } from '../../util/logger/logger.js'; import { calculateRandomizedExponentialBackoffTime } from '../../util/randomized-exponential-backoff/randomized-exponential-backoff.js'; import { sleepMs } from '../../util/sleep-ms/sleep-ms.js'; import { ApiBase, GraphqlQuery } from '../api-base/api-base.js'; import { TemporaryApiKeyService } from '../temporary-api-key/temporary-api-key.service.js'; -import { Logger } from '../../util/logger/logger.js'; -type QueryOrDocument = string | DocumentNode; - -function queryToString(query: QueryOrDocument): string { - if (typeof query === 'string') { - return query; - } - if (query.kind === 'Document') { - return print(query); - } - throw new Error('queryToString: query must be a string or a DocumentNode'); +function parseQuery(queryOrDocumentNode: string | DocumentNode): string { + return typeof queryOrDocumentNode === 'string' + ? queryOrDocumentNode + : print(queryOrDocumentNode); } export interface QminderGraphQLError { @@ -42,23 +41,18 @@ export interface QminderGraphQLError { path?: (string | number)[] | null; } -interface OperationMessage { - id?: string; - type: MessageType; - payload?: { - data?: T | null; - errors?: QminderGraphQLError[]; +interface Message { + readonly id?: string; + readonly type: MessageType; + readonly payload?: { + readonly data?: Record | null; + readonly errors?: QminderGraphQLError[]; }; } -class Subscription { - id: string; - query: string; - - constructor(id: string, query: string) { - this.id = id; - this.query = query; - } +interface Subscription { + readonly messageId: string; + readonly query: string; } enum MessageType { @@ -77,6 +71,7 @@ enum MessageType { GQL_ERROR = 'error', } +const ERRORED_SUBSCRIPTIONS_RETRY_LIMIT = 3; const PONG_TIMEOUT_IN_MS = 12000; const PING_PONG_INTERVAL_IN_MS = 20000; @@ -102,23 +97,68 @@ export class GraphqlService { ConnectionStatus.DISCONNECTED, ); - private nextSubscriptionId = 1; + private subcriptionsCount = 0; private subscriptions: Subscription[] = []; - private readonly subscriptionObserverMap: { [id: string]: Observer } = - {}; + private readonly messageSubscribers = new Map< + string, + Subscriber> + >(); private readonly subscriptionConnection$: Observable; + + private readonly erroredSubscriptionsAction$ = new Subject< + | { + readonly type: 'add'; + readonly messageId: string; + } + | { + readonly type: 'remove'; + readonly messageId: string; + } + | { + readonly type: 'clear'; + } + >(); + + private readonly erroredSubscriptionsMessageIds$ = + this.erroredSubscriptionsAction$.pipe( + scan((messageIds, action) => { + const result = new Set(messageIds); + + switch (action.type) { + case 'add': + return result.add(action.messageId); + case 'remove': + result.delete(action.messageId); + return result; + case 'clear': + return new Set(); + } + }, new Set()), + startWith(new Set()), + shareReplay(1), + ); + + private readonly haveAnySubscriptionsErrored$ = + this.erroredSubscriptionsMessageIds$.pipe( + map(({ size }) => !!size), + distinctUntilChanged(), + ); + + private erroredSubscriptionsRetryTimeout: ReturnType< + typeof setTimeout + > | null = null; + + private erroredSubscriptionsRetryCount = 0; + private temporaryApiKeyService: TemporaryApiKeyService | undefined; private pongTimeout: any; private pingPongInterval: any; private readonly sendPingWithThisBound = this.sendPing.bind(this); - private readonly handleConnectionDropWithThisBound = - this.handleConnectionDrop.bind(this); - private connectionAttemptsCount = 0; constructor() { @@ -128,6 +168,8 @@ export class GraphqlService { distinctUntilChanged(), shareReplay(1), ); + + this.erroredSubscriptionsMessageIds$.subscribe(); } /** @@ -182,41 +224,70 @@ export class GraphqlService { /** * Subscribe to Qminder Events API using GraphQL. * - * For example + * @example + * + * Be notified of any created tickets * * ```javascript * import { Qminder } from 'qminder-api'; - * // 1. Be notified of any created tickets - * try { - * const observable = Qminder.GraphQL.subscribe("subscription { createdTickets(locationId: 123) { id firstName } }") * - * observable.subscribe(data => console.log(data)); - * // => { createdTickets: { id: '12', firstName: 'Marta' } } + * try { + * Qminder.GraphQL.subscribe(` + * subscription { + * createdTickets(locationId: 123) { + * id + * firstName + * } + * } + * `).subscribe((data) => { + * console.log(data); // { createdTickets: { id: '12', firstName: 'Marta' } } + * }); * } catch (error) { * console.error(error); * } * ``` * - * @param queryDocument required: the GraphQL query to send, for example `"subscription { createdTickets(locationId: 123) { id firstName } }"` - * @returns an RxJS Observable that will push data as - * @throws when the 'query' argument is undefined or an empty string + * @param queryOrDocumentNode the GraphQL query to send, for example `"subscription { createdTickets(locationId: 123) { id firstName } }"` + * @returns a RxJS Observable that will push data + * @throws when the `queryDocument` argument is an empty string + * + * Retries errored subscriptions up to 3 times with exponential backoff. Afterwards throws an error. + * + * To get notified when any subscriptions have errored, use the {@link haveAnySubscriptionsErrored} method. */ - subscribe(queryDocument: QueryOrDocument): Observable { - const query = queryToString(queryDocument); - - if (!query || query.length === 0) { + subscribe>( + queryOrDocumentNode: string | DocumentNode, + ): Observable { + const query = parseQuery(queryOrDocumentNode); + if (!query) { throw new Error( 'GraphQLService query expects a GraphQL query as its first argument', ); } - return new Observable((observer: Observer) => { - const id = this.generateOperationId(); - this.subscriptions.push(new Subscription(id, query)); - this.sendMessage(id, MessageType.GQL_START, { query }); - this.subscriptionObserverMap[id] = observer; + return new Observable((subscriber) => { + const messageId = `${++this.subcriptionsCount}`; + this.subscriptions.push({ messageId, query }); + + this.sendMessage(messageId, MessageType.GQL_START, { query }).catch( + (error: Error) => { + this.logger.error('Failed to start subscription: ', error); + }, + ); + + this.messageSubscribers.set(messageId, subscriber); + + return () => { + if (this.messageSubscribers.has(messageId)) { + this.sendMessage(messageId, MessageType.GQL_STOP, null).catch( + (error) => { + this.logger.error('Failed to stop subscription: ', error); + }, + ); + } - return () => this.stopSubscription(id); + this.cleanUpSubscription(messageId); + }; }); } @@ -227,13 +298,13 @@ export class GraphqlService { * There is no need to call this method in order for data transfer to work. The `subscribe()` method also initializes * a websocket connection before proceeding. */ - openPendingWebSocket(): void { + async openPendingWebSocket(): Promise { if ( ![ConnectionStatus.CONNECTING, ConnectionStatus.CONNECTED].includes( this.connectionStatus, ) ) { - this.openSocket(); + await this.openSocket(); } } @@ -243,7 +314,7 @@ export class GraphqlService { * This method is automatically called when doing Qminder.setKey(). * @hidden */ - setKey(apiKey: string) { + setKey(apiKey: string): void { this.temporaryApiKeyService = new TemporaryApiKeyService( this.apiServer, apiKey, @@ -258,27 +329,37 @@ export class GraphqlService { return this.subscriptionConnection$; } + /** + * Have any GraphQL subscriptions been rejected by the server. + * + * Emits `false` if all errored subscriptions have been successfully retried. + */ + haveAnySubscriptionsErrored(): Observable { + return this.haveAnySubscriptionsErrored$; + } + /** * Set the WebSocket hostname the GraphQL service uses. * @hidden */ - setServer(apiServer: string) { + setServer(apiServer: string): void { this.apiServer = apiServer; } - private stopSubscription(id: string) { - this.sendMessage(id, MessageType.GQL_STOP, null); - this.cleanupSubscription(id); - } - - private cleanupSubscription(id: string) { - delete this.subscriptionObserverMap[id]; - this.subscriptions = this.subscriptions.filter((sub) => { - return sub.id !== id; + private cleanUpSubscription(messageId: string): void { + this.erroredSubscriptionsAction$.next({ + type: 'remove', + messageId, }); + + this.messageSubscribers.delete(messageId); + + this.subscriptions = this.subscriptions.filter( + (subscription) => subscription.messageId !== messageId, + ); } - private openSocket() { + private async openSocket(): Promise { if ( [ConnectionStatus.CONNECTING, ConnectionStatus.CONNECTED].includes( this.connectionStatus, @@ -286,26 +367,23 @@ export class GraphqlService { ) { return; } + this.setConnectionStatus(ConnectionStatus.CONNECTING); this.logger.info('Connecting to websocket'); - this.fetchTemporaryApiKey() - .then((temporaryApiKey: string) => { - this.createSocketConnection(temporaryApiKey); - }) - .catch((e) => { - throw e; - }); + + const temporaryApiKey = await this.getTemporaryApiKey(); + this.createSocketConnection(temporaryApiKey); } - private async fetchTemporaryApiKey(): Promise { - return this.temporaryApiKeyService.fetchTemporaryApiKey(); + private async getTemporaryApiKey(): Promise { + return await this.temporaryApiKeyService.fetchTemporaryApiKey(); } private getServerUrl(temporaryApiKey: string): string { return `wss://${this.apiServer}:443/graphql/subscription?rest-api-key=${temporaryApiKey}`; } - private createSocketConnection(temporaryApiKey: string) { + private createSocketConnection(temporaryApiKey: string): void { if (this.socket) { this.socket.onclose = null; this.socket.onmessage = null; @@ -314,10 +392,10 @@ export class GraphqlService { this.socket.close(); this.socket = null; } + this.socket = new WebSocket(this.getServerUrl(temporaryApiKey)); - const socket = this.socket; - socket.onopen = () => { + this.socket.onopen = () => { this.sendRawMessage( JSON.stringify({ id: undefined, @@ -327,7 +405,7 @@ export class GraphqlService { ); }; - socket.onclose = (event: CloseEvent) => { + this.socket.onclose = (event) => { this.logger.warn('WebSocket connection closed:', { code: event.code, reason: event.reason, @@ -335,19 +413,23 @@ export class GraphqlService { this.setConnectionStatus(ConnectionStatus.DISCONNECTED); this.socket = null; - this.clearPingMonitoring(); + if (this.shouldRetry(event)) { const timer = calculateRandomizedExponentialBackoffTime( this.connectionAttemptsCount, ); - this.logger.info( - `Waiting for ${timer.toFixed(1)}ms before reconnecting`, - ); - sleepMs(timer).then(() => { - this.connectionAttemptsCount += 1; - this.openSocket(); - }); + + this.logger.info(`Reconnect socket in ${timer.toFixed(0)}ms`); + + sleepMs(timer) + .then(async () => { + this.connectionAttemptsCount++; + return await this.openSocket(); + }) + .catch((error: Error) => { + this.logger.error('Failed to reconnect socket: ', error); + }); } if (this.connectionStatus === ConnectionStatus.CONNECTING) { @@ -357,105 +439,146 @@ export class GraphqlService { } }; - socket.onerror = () => { - const message = 'Websocket error occurred!'; + this.socket.onerror = () => { if (this.isBrowserOnline()) { - this.logger.error(message); + this.logger.error('Websocket error occurred!'); } else { - this.logger.info(message); + this.logger.info('Websocket error occurred!'); } }; - socket.onmessage = (rawMessage: { data: WebSocket.Data }) => { - if (typeof rawMessage.data === 'string') { - const message: OperationMessage = JSON.parse(rawMessage.data); - - switch (message.type) { - case MessageType.GQL_CONNECTION_KEEP_ALIVE: - break; - - case MessageType.GQL_CONNECTION_ACK: { - this.connectionAttemptsCount = 0; - this.setConnectionStatus(ConnectionStatus.CONNECTED); - this.logger.info('Connected to websocket'); - this.startConnectionMonitoring(); - let resubscriptionFailed = false; - for (const subscription of this.subscriptions) { - const payload = { query: subscription.query }; - const msg = JSON.stringify({ - id: subscription.id, - type: MessageType.GQL_START, - payload, - }); - if (!this.sendRawMessage(msg)) { - this.logger.warn( - `Failed to re-subscribe ${this.subscriptions.length} subscription(s): WebSocket not open`, - ); - resubscriptionFailed = true; - break; - } - } - if (resubscriptionFailed) { - this.handleConnectionDrop(); + this.socket.onmessage = (event) => { + if (typeof event.data !== 'string') { + return; + } + + const message: Message = JSON.parse(event.data); + + switch (message.type) { + case MessageType.GQL_CONNECTION_KEEP_ALIVE: + break; + + case MessageType.GQL_CONNECTION_ACK: { + this.connectionAttemptsCount = 0; + + this.clearErroredSubscriptionsRetry(); + this.erroredSubscriptionsRetryCount = 0; + this.erroredSubscriptionsAction$.next({ type: 'clear' }); + + this.setConnectionStatus(ConnectionStatus.CONNECTED); + this.logger.info('Connected to websocket'); + this.startConnectionMonitoring(); + + let resubscriptionFailed = false; + + for (const { messageId, query } of this.subscriptions) { + const msg = JSON.stringify({ + id: messageId, + type: MessageType.GQL_START, + payload: { query }, + }); + + if (!this.sendRawMessage(msg)) { + this.logger.warn( + `Failed to re-subscribe ${this.subscriptions.length} subscription(s): WebSocket not open`, + ); + + resubscriptionFailed = true; + break; } - break; } - case MessageType.GQL_DATA: - this.subscriptionObserverMap[message.id]?.next( - message.payload.data, - ); - break; - - case MessageType.GQL_COMPLETE: - this.subscriptionObserverMap[message.id]?.complete(); - break; - - case MessageType.GQL_PONG: - clearTimeout(this.pongTimeout); - break; - - case MessageType.GQL_ERROR: - this.subscriptionObserverMap[message.id]?.error( - message.payload.errors, - ); - this.cleanupSubscription(message.id); - break; - - default: - if (message.payload && message.payload.data) { - this.subscriptionObserverMap[message.id]?.error( - message.payload.data, - ); - } else if ( - message.payload.errors && - message.payload.errors.length > 0 - ) { - this.subscriptionObserverMap[message.id]?.error( - message.payload.errors, + if (resubscriptionFailed) { + this.handleConnectionDrop().catch((error) => { + this.logger.error( + 'Failed to handle connection drop after resubscription failure: ', + error, ); - } + }); + } + + break; + } + + case MessageType.GQL_DATA: + this.erroredSubscriptionsAction$.next({ + type: 'remove', + messageId: message.id, + }); + + this.messageSubscribers.get(message.id)?.next(message.payload.data); + break; + + case MessageType.GQL_COMPLETE: { + const subscriber = this.messageSubscribers.get(message.id); + this.cleanUpSubscription(message.id); + subscriber?.complete(); + break; + } + + case MessageType.GQL_PONG: + clearTimeout(this.pongTimeout); + break; + + case MessageType.GQL_ERROR: + this.logger.warn( + `GraphQL subscription error: ${JSON.stringify(message)}`, + ); + + this.erroredSubscriptionsAction$.next({ + type: 'add', + messageId: message.id, + }); + + if ( + this.erroredSubscriptionsRetryCount < + ERRORED_SUBSCRIPTIONS_RETRY_LIMIT && + !this.erroredSubscriptionsRetryTimeout + ) { + this.scheduleErroredSubscriptionsRetry(); + } else if (!this.erroredSubscriptionsRetryTimeout) { + this.failErroredSubscriptions(); + } + + break; + + default: { + const subscriber = this.messageSubscribers.get(message.id); + if (!subscriber) { + return; + } + + if (message.payload?.data) { + this.cleanUpSubscription(message.id); + subscriber.error(message.payload.data); + } else if (message.payload?.errors?.length) { + this.cleanUpSubscription(message.id); + subscriber.error(message.payload.errors); + } } } }; } - private shouldRetry(event: CloseEvent) { - if (event.code !== CLIENT_SIDE_CLOSE_EVENT) { - return true; - } - - return Object.entries(this.subscriptionObserverMap).length > 0; + private shouldRetry(event: CloseEvent): boolean { + return ( + event.code !== CLIENT_SIDE_CLOSE_EVENT || !!this.messageSubscribers.size + ); } - private sendMessage(id: string, type: MessageType, payload: any) { - if (this.connectionStatus === ConnectionStatus.CONNECTED) { - if (!this.sendRawMessage(JSON.stringify({ id, type, payload }))) { - this.logger.warn('Message dropped: WebSocket is not in OPEN state'); - this.handleConnectionDrop(); - } - } else { - this.openSocket(); + private async sendMessage( + id: string, + type: MessageType, + payload: Record | null, + ): Promise { + if (this.connectionStatus !== ConnectionStatus.CONNECTED) { + await this.openSocket(); + return; + } + + if (!this.sendRawMessage(JSON.stringify({ id, type, payload }))) { + this.logger.warn('Message dropped: WebSocket is not in OPEN state'); + await this.handleConnectionDrop(); } } @@ -464,16 +587,11 @@ export class GraphqlService { this.socket.send(message); return true; } - return false; - } - private generateOperationId(): string { - const currentId = `${this.nextSubscriptionId}`; - this.nextSubscriptionId += 1; - return currentId; + return false; } - private setConnectionStatus(status: ConnectionStatus) { + private setConnectionStatus(status: ConnectionStatus): void { this.connectionStatus = status; this.connectionStatus$.next(status); } @@ -498,26 +616,30 @@ export class GraphqlService { } private sendPing(): void { - this.pongTimeout = setTimeout( - this.handleConnectionDropWithThisBound, - PONG_TIMEOUT_IN_MS, - ); + this.pongTimeout = setTimeout(() => { + this.handleConnectionDrop().catch((error) => { + this.logger.error('Failed to handle pong connection drop: ', error); + }); + }, PONG_TIMEOUT_IN_MS); + this.sendRawMessage(JSON.stringify({ type: MessageType.GQL_PING })); } - private handleConnectionDrop(): void { + private async handleConnectionDrop(): Promise { if (this.connectionStatus === ConnectionStatus.CONNECTING) { return; } + if (this.isBrowserOnline()) { this.logger.warn(`Websocket connection dropped!`); } else { this.logger.info(`Websocket connection dropped. We are offline.`); } + this.setConnectionStatus(ConnectionStatus.DISCONNECTED); this.clearPingMonitoring(); - this.openSocket(); + await this.openSocket(); } private clearPingMonitoring(): void { @@ -525,14 +647,75 @@ export class GraphqlService { clearInterval(this.pingPongInterval); } + private clearErroredSubscriptionsRetry(): void { + clearTimeout(this.erroredSubscriptionsRetryTimeout ?? undefined); + this.erroredSubscriptionsRetryTimeout = null; + } + + private scheduleErroredSubscriptionsRetry(): void { + const retryCount = this.erroredSubscriptionsRetryCount + 1; + const delay = calculateRandomizedExponentialBackoffTime(retryCount); + + this.logger.info( + `Retry (${retryCount}) errored subscriptions in ${delay.toFixed(0)}ms`, + ); + + this.erroredSubscriptionsRetryTimeout = setTimeout(() => { + this.retryErroredSubscriptions(); + this.erroredSubscriptionsRetryCount = retryCount; + this.erroredSubscriptionsRetryTimeout = null; + }, delay); + } + + private failErroredSubscriptions(): void { + this.logger.error( + `Errored subscriptions retry limit (${ERRORED_SUBSCRIPTIONS_RETRY_LIMIT}) reached, giving up`, + ); + + this.erroredSubscriptionsMessageIds$ + .pipe(take(1)) + .subscribe((messageIds) => { + for (const messageId of messageIds) { + const subscriber = this.messageSubscribers.get(messageId); + this.cleanUpSubscription(messageId); + + subscriber?.error( + new Error( + `Subscription failed after ${this.erroredSubscriptionsRetryCount} retries`, + ), + ); + } + }); + } + + private retryErroredSubscriptions(): void { + this.erroredSubscriptionsMessageIds$ + .pipe(take(1)) + .subscribe((messageIds) => { + for (const messageId of messageIds) { + const subscription = this.subscriptions.find( + (subscription) => subscription.messageId === messageId, + ); + + if (!subscription) { + continue; + } + + this.sendRawMessage( + JSON.stringify({ + id: subscription.messageId, + type: MessageType.GQL_START, + payload: { query: subscription.query }, + }), + ); + } + }); + } + /** - * Returns the online status of the browser. - * In the non-browser environment (NodeJS) this always returns true. + * In a non-browser environment (NodeJS) returns `true`. */ private isBrowserOnline(): boolean { - if (typeof navigator === 'undefined') { - return true; - } - return navigator.onLine; + return typeof navigator === 'undefined' || navigator.onLine; } }