Skip to content

Commit

Permalink
Init implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
zxl629 committed Feb 11, 2025
1 parent 289f3e8 commit 88f2d19
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 77 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/push-preid-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
push:
branches:
# Change this to your branch name where "example-preid" corresponds to the preid you want your changes released on
- feat/example-preid-branch/main
- feat/websocket-event/main

jobs:
e2e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,16 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
query,
apiKey,
region,
variables,
} = options;

// This will be needed for WS publish
// const data = {
// events: [variables],
// };
const data = {
channel: query,
events: [variables],
};

const serializedData = JSON.stringify({ channel: query });
const serializedData = JSON.stringify(data);

const headers = {
...(await awsRealTimeHeaderBasedAuth({
Expand All @@ -125,18 +127,18 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
const subscriptionMessage = {
id: subscriptionId,
channel: query,
// events: [JSON.stringify(variables)],
events: [JSON.stringify(variables)],
authorization: {
...headers,
},
// payload: {
// events: serializedData,
// extensions: {
// authorization: {
// ...headers,
// },
// },
// },
payload: {
events: serializedData,
extensions: {
authorization: {
...headers,
},
},
},
type: publish
? MESSAGE_TYPES.EVENT_PUBLISH
: MESSAGE_TYPES.EVENT_SUBSCRIBE,
Expand Down
124 changes: 63 additions & 61 deletions packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
CONNECTION_INIT_TIMEOUT,
CONNECTION_STATE_CHANGE,
DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT,
DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT,
DEFAULT_KEEP_ALIVE_TIMEOUT,
MAX_DELAY_MS,
MESSAGE_TYPES,
NON_RETRYABLE_CODES,
Expand Down Expand Up @@ -83,8 +83,9 @@ export abstract class AWSWebSocketProvider {

protected awsRealTimeSocket?: WebSocket;
private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED;
private keepAliveTimestamp: number = Date.now();
private keepAliveHeartbeatIntervalId?: ReturnType<typeof setInterval>;
private keepAliveTimeoutId?: ReturnType<typeof setTimeout>;
private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
private keepAliveAlertTimeoutId?: ReturnType<typeof setTimeout>;
private promiseArray: { res(): void; rej(reason?: any): void }[] = [];
private connectionState: ConnectionState | undefined;
private readonly connectionStateMonitor = new ConnectionStateMonitor();
Expand Down Expand Up @@ -118,7 +119,6 @@ export abstract class AWSWebSocketProvider {
return new Promise<void>((resolve, reject) => {
if (this.awsRealTimeSocket) {
this.awsRealTimeSocket.onclose = (_: CloseEvent) => {
this._closeSocket();
this.subscriptionObserverMap = new Map();
this.awsRealTimeSocket = undefined;
resolve();
Expand Down Expand Up @@ -171,7 +171,7 @@ export abstract class AWSWebSocketProvider {
this.logger.debug(
`${CONTROL_MSG.REALTIME_SUBSCRIPTION_INIT_ERROR}: ${err}`,
);
this._closeSocket();
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
})
.finally(() => {
subscriptionStartInProgress = false;
Expand Down Expand Up @@ -255,6 +255,11 @@ export abstract class AWSWebSocketProvider {

return new Promise((resolve, reject) => {
if (this.awsRealTimeSocket) {
const timeoutId = setTimeout(() => {
cleanup();
reject(new Error('Publish operation timed out'));
}, 30000); // 30 seconds timeout

const publishListener = (event: MessageEvent) => {
const data = JSON.parse(event.data);
if (data.id === subscriptionId && data.type === 'publish_success') {
Expand All @@ -263,22 +268,47 @@ export abstract class AWSWebSocketProvider {
'message',
publishListener,
);

cleanup();
resolve();
}

if (data.erroredEvents && data.erroredEvents.length > 0) {
// TODO: handle errors
const errors = data.erroredEvents.map(
(errorEvent: any) => errorEvent.error,
);
cleanup();
reject(new Error(`Publish errors: ${errors.join(', ')}`));
}
};
this.awsRealTimeSocket.addEventListener('message', publishListener);
this.awsRealTimeSocket.addEventListener('close', () => {

const errorListener = (error: Event) => {
cleanup();
reject(new Error(`WebSocket error: ${error}`));
};

const closeListener = () => {
cleanup();
reject(new Error('WebSocket is closed'));
});
//
// this.awsRealTimeSocket.addEventListener('error', publishListener);
};

const cleanup = () => {
clearTimeout(timeoutId);
this.awsRealTimeSocket?.removeEventListener(
'message',
publishListener,
);
this.awsRealTimeSocket?.removeEventListener('error', errorListener);
this.awsRealTimeSocket?.removeEventListener('close', closeListener);
};

this.awsRealTimeSocket.addEventListener('message', publishListener);
this.awsRealTimeSocket.addEventListener('error', errorListener);
this.awsRealTimeSocket.addEventListener('close', closeListener);

this.awsRealTimeSocket.send(serializedSubscriptionMessage);
} else {
reject(new Error('WebSocket is not connected'));
}
});
}
Expand Down Expand Up @@ -435,7 +465,7 @@ export abstract class AWSWebSocketProvider {
this.logger.debug({ err });
const message = String(err.message ?? '');
// Resolving to give the state observer time to propogate the update
this._closeSocket();
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);

// Capture the error only when the network didn't cause disruption
if (
Expand Down Expand Up @@ -544,15 +574,20 @@ export abstract class AWSWebSocketProvider {
setTimeout(this._closeSocketIfRequired.bind(this), 1000);
} else {
this.logger.debug('closing WebSocket...');

if (this.keepAliveTimeoutId) {
clearTimeout(this.keepAliveTimeoutId);
}
if (this.keepAliveAlertTimeoutId) {
clearTimeout(this.keepAliveAlertTimeoutId);
}
const tempSocket = this.awsRealTimeSocket;
// Cleaning callbacks to avoid race condition, socket still exists
tempSocket.onclose = null;
tempSocket.onerror = null;
tempSocket.close(1000);
this.awsRealTimeSocket = undefined;
this.socketStatus = SOCKET_STATUS.CLOSED;
this._closeSocket();
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
}
}

Expand All @@ -572,40 +607,13 @@ export abstract class AWSWebSocketProvider {
errorType: string;
};

private maintainKeepAlive() {
this.keepAliveTimestamp = Date.now();
}

private keepAliveHeartbeat(connectionTimeoutMs: number) {
const currentTime = Date.now();

// Check for missed KA message
if (
currentTime - this.keepAliveTimestamp >
DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT
) {
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED);
} else {
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE);
}

// Recognize we are disconnected if we haven't seen messages in the keep alive timeout period
if (currentTime - this.keepAliveTimestamp > connectionTimeoutMs) {
this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT);
}
}

private _handleIncomingSubscriptionMessage(message: MessageEvent) {
if (typeof message.data !== 'string') {
return;
}

const [isData, data] = this._handleSubscriptionData(message);
if (isData) {
this.maintainKeepAlive();

return;
}
if (isData) return;

const { type, id, payload } = data;

Expand Down Expand Up @@ -654,7 +662,16 @@ export abstract class AWSWebSocketProvider {
}

if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) {
this.maintainKeepAlive();
if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId);
if (this.keepAliveAlertTimeoutId)
clearTimeout(this.keepAliveAlertTimeoutId);
this.keepAliveTimeoutId = setTimeout(() => {
this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT);
}, this.keepAliveTimeout);
this.keepAliveAlertTimeoutId = setTimeout(() => {
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED);
}, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT);
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE);

return;
}
Expand Down Expand Up @@ -699,21 +716,13 @@ export abstract class AWSWebSocketProvider {
this.logger.debug(`Disconnect error: ${msg}`);

if (this.awsRealTimeSocket) {
this._closeSocket();
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this.awsRealTimeSocket.close();
}

this.socketStatus = SOCKET_STATUS.CLOSED;
}

private _closeSocket() {
if (this.keepAliveHeartbeatIntervalId) {
clearInterval(this.keepAliveHeartbeatIntervalId);
this.keepAliveHeartbeatIntervalId = undefined;
}
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
}

private _timeoutStartSubscriptionAck(subscriptionId: string) {
const subscriptionObserver =
this.subscriptionObserverMap.get(subscriptionId);
Expand All @@ -729,7 +738,7 @@ export abstract class AWSWebSocketProvider {
subscriptionState: SUBSCRIPTION_STATUS.FAILED,
});

this._closeSocket();
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this.logger.debug(
'timeoutStartSubscription',
JSON.stringify({ query, variables }),
Expand Down Expand Up @@ -841,7 +850,6 @@ export abstract class AWSWebSocketProvider {
this.logger.debug(`WebSocket connection error`);
};
newSocket.onclose = () => {
this._closeSocket();
reject(new Error('Connection handshake error'));
};
newSocket.onopen = () => {
Expand Down Expand Up @@ -871,7 +879,6 @@ export abstract class AWSWebSocketProvider {

this.awsRealTimeSocket.onclose = event => {
this.logger.debug(`WebSocket closed ${event.reason}`);
this._closeSocket();
reject(new Error(JSON.stringify(event)));
};

Expand Down Expand Up @@ -935,11 +942,7 @@ export abstract class AWSWebSocketProvider {
return;
}

// Set up a keep alive heartbeat for this connection
this.keepAliveHeartbeatIntervalId = setInterval(() => {
this.keepAliveHeartbeat(connectionTimeoutMs);
}, DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT);

this.keepAliveTimeout = connectionTimeoutMs;
this.awsRealTimeSocket.onmessage =
this._handleIncomingSubscriptionMessage.bind(this);

Expand All @@ -950,7 +953,6 @@ export abstract class AWSWebSocketProvider {

this.awsRealTimeSocket.onclose = event => {
this.logger.debug(`WebSocket closed ${event.reason}`);
this._closeSocket();
this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED);
};
}
Expand Down
4 changes: 2 additions & 2 deletions packages/api-graphql/src/internals/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async function connect(
};

// WS publish is not enabled in the service yet. It will be a follow up feature
const _pub = async (
const pub = async (
event: DocumentType,
pubOptions?: EventsOptions,
): Promise<any> => {
Expand All @@ -94,7 +94,7 @@ async function connect(
return {
subscribe: sub,
close,
// publish: pub,
publish: pub,
};
}

Expand Down
2 changes: 2 additions & 0 deletions packages/api-graphql/src/internals/events/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { Subscription } from 'rxjs';
import type { GraphQLAuthMode } from '@aws-amplify/core/internals/utils';
import { DocumentType } from '@aws-amplify/core/internals/utils';

export interface SubscriptionObserver<T> {
next(value: T): void;
Expand Down Expand Up @@ -51,6 +52,7 @@ export interface EventsChannel {
*
*/
close(): void;
publish(event: DocumentType, pubOptions?: EventsOptions): Promise<any>;
}

export type ResolvedGraphQLAuthModes = Exclude<GraphQLAuthMode, 'identityPool'>;
Expand Down

0 comments on commit 88f2d19

Please sign in to comment.