Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ export class GraphQLSubscriptionsFixture {
return (this.graphqlService as any).subscriptions.length;
}

getGraphqlServiceSubscriptionObserverMapSize(): number {
return Object.keys(this.getGraphqlServiceSubscriptionObserverMap()).length;
getMessageSubscribersSize(): number {
return (this.graphqlService as any).messageSubscribers.size;
}

getGraphqlServiceSubscriptionObserverMap(): Record<string, Observer<object>> {
return (this.graphqlService as any).subscriptionObserverMap;
hasMessageSubscriber(id: string): boolean {
return (this.graphqlService as any).messageSubscribers.has(id);
}

async waitForConnection() {
Expand Down Expand Up @@ -125,6 +125,7 @@ export class GraphQLSubscriptionsFixture {
}

async cleanup() {
(this.graphqlService as any).clearSubscriptionRetry();
WS.clean();
await this.server.closed;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import gql from 'graphql-tag';
import fetchMock from 'jest-fetch-mock';
import { WebSocket } from 'mock-socket';
import { Subscriber } from 'rxjs';
import { ConnectionStatus } from '../../../model/connection-status';
import { GraphQLSubscriptionsFixture } from '../__fixtures__/graphql-subscriptions-fixture';
import { QminderGraphQLError } from '../graphql.service';

jest.mock('isomorphic-ws', () => WebSocket);
jest.mock('../../../util/sleep-ms/sleep-ms', () => ({
Expand Down Expand Up @@ -96,24 +94,20 @@ describe('GraphQL subscriptions', () => {
});

it('cleans up internal state when unsubscribing', async () => {
// start the test with an empty observer-map
expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0);
expect(fixture.getMessageSubscribersSize()).toBe(0);
const subscription = fixture.triggerSubscription();
await fixture.handleConnectionInit();
await fixture.consumeSubscribeMessage();
// the observer map should equal { "1": Subscriber => spy }
expect(fixture.getGraphqlServiceSubscriptionObserverMap()).toEqual({
'1': expect.any(Subscriber),
});
expect(fixture.getMessageSubscribersSize()).toBe(1);
expect(fixture.hasMessageSubscriber('1')).toBe(true);

// unsubscribing should clean up
subscription.unsubscribe();
await fixture.consumeAnyMessage();
expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0);
expect(fixture.getMessageSubscribersSize()).toBe(0);
});

it('when receiving a published message for a subscription that does not exist anymore, it does not throw', async () => {
expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0);
expect(fixture.getMessageSubscribersSize()).toBe(0);
const subscription = fixture.triggerSubscription();

await fixture.handleConnectionInit();
Expand Down Expand Up @@ -335,43 +329,191 @@ describe('GraphQL subscriptions', () => {
subscription.unsubscribe();
});

it('error messages are propagated to the subscriber', async () => {
const ERRORS: QminderGraphQLError[] = [
{
message:
"Invalid Syntax : offending token 'createdTickets' at line 1 column 1",
sourcePreview:
'createdTickets(locationId: 673) {\n' +
' id\n' +
' firstName\n' +
' lastName\n',
offendingToken: 'createdTickets',
locations: [],
errorType: 'InvalidSyntax',
extensions: null,
path: null,
},
];
it('GQL_ERROR does not kill the subscription or trigger reconnect', async () => {
const reconnectSpy = jest.spyOn(
fixture.graphqlService as any,
'handleConnectionDrop',
);
const errorSpy = jest.fn();
const subscription = fixture.triggerSubscription('subscription { baba }', {
error: errorSpy,
});
await fixture.handleConnectionInit();
await fixture.consumeSubscribeMessage();

fixture.server.send({
id: '1',
type: 'error',
payload: {
data: null,
errors: [{ message: 'Subscription limit reached' }],
},
});

await new Promise((r) => setTimeout(r, 10));

expect(errorSpy).not.toHaveBeenCalled();
expect(reconnectSpy).not.toHaveBeenCalled();
expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(1);
expect(fixture.hasMessageSubscriber('1')).toBe(true);

subscription.unsubscribe();
});

it('GQL_ERROR emits true on the subscription error observable', async () => {
const values: boolean[] = [];
fixture.graphqlService
.getSubscriptionErrorObservable()
.subscribe((v) => values.push(v));

const subscription = fixture.triggerSubscription('subscription { baba }');
await fixture.handleConnectionInit();
await fixture.consumeSubscribeMessage();

fixture.server.send({
id: '1',
type: 'error',
payload: {
data: null,
errors: [
{
message:
'The maximum subscription limit of 100 has been reached',
},
],
},
});

await new Promise((r) => setTimeout(r, 10));

expect(values).toEqual([false, true]);

subscription.unsubscribe();
});

it('retries failed subscriptions after delay and clears error state', async () => {
(fixture.graphqlService as any).subscriptionRetryDelayMs = 50;

const values: boolean[] = [];
fixture.graphqlService
.getSubscriptionErrorObservable()
.subscribe((v) => values.push(v));

const subscription = fixture.triggerSubscription('subscription { baba }');
await fixture.handleConnectionInit();
await fixture.consumeSubscribeMessage();

fixture.server.send({
id: '1',
type: 'error',
payload: {
data: null,
errors: ERRORS,
errors: [{ message: 'Limit reached' }],
},
});

expect(errorSpy).toHaveBeenCalledWith(ERRORS);
// Cleans up as well
expect(
(fixture.graphqlService as any).subscriptionObserverMap['1'],
).toBeUndefined();
await new Promise((r) => setTimeout(r, 10));
expect(values).toEqual([false, true]);

await new Promise((r) => setTimeout(r, 60));

expect(values).toEqual([false, true, false]);
expect(await fixture.getNextMessage()).toEqual({
id: '1',
type: 'start',
payload: { query: 'subscription { baba }' },
});

subscription.unsubscribe();
});

it('does not send GQL_STOP when server sends GQL_COMPLETE', async () => {
const completeSpy = jest.fn();
const subscription = fixture.triggerSubscription('subscription { baba }', {
next: () => {},
complete: completeSpy,
});
await fixture.handleConnectionInit();
await fixture.consumeSubscribeMessage();

fixture.sendMessageToClient({
id: '1',
type: 'complete',
});

await new Promise((r) => setTimeout(r, 10));

expect(completeSpy).toHaveBeenCalled();
expect(fixture.hasMessageSubscriber('1')).toBe(false);
expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(0);
expect(fixture.server.messagesToConsume.pendingItems).toHaveLength(0);

subscription.unsubscribe();
});

it('GQL_ERROR keeps subscription tracked so it re-subscribes on natural reconnect and clears error state', async () => {
const values: boolean[] = [];
fixture.graphqlService
.getSubscriptionErrorObservable()
.subscribe((v) => values.push(v));

const subscription = fixture.triggerSubscription('subscription { baba }');
await fixture.handleConnectionInit();
await fixture.consumeSubscribeMessage();

fixture.sendMessageToClient({
id: '1',
type: 'error',
payload: {
data: null,
errors: [{ message: 'Limit reached' }],
},
});

await new Promise((r) => setTimeout(r, 10));

expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(1);
expect(fixture.hasMessageSubscriber('1')).toBe(true);
expect(values).toEqual([false, true]);

await fixture.closeWithCode(1001);
fixture.openServer();
await fixture.handleConnectionInit();
expect(await fixture.getNextMessage()).toEqual({
id: '1',
type: 'start',
payload: { query: 'subscription { baba }' },
});

expect(values).toEqual([false, true, false]);

subscription.unsubscribe();
});

it('cleans up subscription on unknown message type with errors', async () => {
const errorSpy = jest.fn();
const subscription = fixture.triggerSubscription('subscription { baba }', {
error: errorSpy,
});
await fixture.handleConnectionInit();
await fixture.consumeSubscribeMessage();

fixture.sendMessageToClient({
id: '1',
type: 'unknown_type',
payload: {
errors: [{ message: 'Something went wrong' }],
},
});

await new Promise((r) => setTimeout(r, 10));

expect(errorSpy).toHaveBeenCalledWith(
expect.objectContaining({ message: 'Something went wrong' }),
);
expect(errorSpy.mock.calls[0][0]).toBeInstanceOf(Error);
expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(0);
expect(fixture.hasMessageSubscriber('1')).toBe(false);

subscription.unsubscribe();
});
Expand Down
Loading
Loading