Skip to content

Commit

Permalink
Merge pull request #39024 from Expensify/Rory-GenericConflictingRequests
Browse files Browse the repository at this point in the history
Create generic network layer logic for conflicting write requests
  • Loading branch information
yuwenmemon authored Mar 28, 2024
2 parents 7603719 + be315d6 commit 3f3869a
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 110 deletions.
10 changes: 9 additions & 1 deletion src/libs/API/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as Pusher from '@libs/Pusher/pusher';
import * as Request from '@libs/Request';
import CONST from '@src/CONST';
import type OnyxRequest from '@src/types/onyx/Request';
import type {RequestConflictResolver} from '@src/types/onyx/Request';
import type Response from '@src/types/onyx/Response';
import pkg from '../../../package.json';
import type {ApiRequest, ApiRequestCommandParameters, ReadCommand, SideEffectRequestCommand, WriteCommand} from './types';
Expand Down Expand Up @@ -51,8 +52,14 @@ type OnyxData = {
* @param [onyxData.successData] - Onyx instructions that will be passed to Onyx.update() when the response has jsonCode === 200.
* @param [onyxData.failureData] - Onyx instructions that will be passed to Onyx.update() when the response has jsonCode !== 200.
* @param [onyxData.finallyData] - Onyx instructions that will be passed to Onyx.update() when the response has jsonCode === 200 or jsonCode !== 200.
* @param [conflictResolver] - callbacks used in special cases to detect and handle conflicting requests in the sequential queue
*/
function write<TCommand extends WriteCommand>(command: TCommand, apiCommandParameters: ApiRequestCommandParameters[TCommand], onyxData: OnyxData = {}) {
function write<TCommand extends WriteCommand>(
command: TCommand,
apiCommandParameters: ApiRequestCommandParameters[TCommand],
onyxData: OnyxData = {},
conflictResolver: RequestConflictResolver = {},
) {
Log.info('Called API write', false, {command, ...apiCommandParameters});
const {optimisticData, ...onyxDataWithoutOptimisticData} = onyxData;

Expand Down Expand Up @@ -83,6 +90,7 @@ function write<TCommand extends WriteCommand>(command: TCommand, apiCommandParam
canCancel: true,
},
...onyxDataWithoutOptimisticData,
...conflictResolver,
};

// Write commands can be saved and retried, so push it to the SequentialQueue
Expand Down
1 change: 0 additions & 1 deletion src/libs/API/parameters/OpenReportParams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ type OpenReportParams = {
shouldRetry?: boolean;
createdReportActionID?: string;
clientLastReadTime?: string;
idempotencyKey?: string;
groupChatAdminLogins?: string;
reportName?: string;
chatType?: string;
Expand Down
1 change: 0 additions & 1 deletion src/libs/API/parameters/ReconnectAppParams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ type ReconnectAppParams = {
mostRecentReportActionLastModified?: string;
updateIDFrom?: number;
policyIDList: string[];
idempotencyKey?: string;
};

export default ReconnectAppParams;
46 changes: 0 additions & 46 deletions src/libs/Network/SequentialQueue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import Onyx from 'react-native-onyx';
import * as ActiveClientManager from '@libs/ActiveClientManager';
import {WRITE_COMMANDS} from '@libs/API/types';
import * as Request from '@libs/Request';
import * as RequestThrottle from '@libs/RequestThrottle';
import * as PersistedRequests from '@userActions/PersistedRequests';
Expand Down Expand Up @@ -46,47 +45,6 @@ function flushOnyxUpdatesQueue() {
QueuedOnyxUpdates.flushQueue();
}

/**
* Identifies and removes conflicting requests from the queue
*/
function reconcileRequests(persistedRequests: OnyxRequest[], commands: string[]) {
const requestsByActionID: Record<string, OnyxRequest[]> = {};

// Group requests by reportActionID
persistedRequests.forEach((request) => {
const {data} = request;
const reportActionID = data?.reportActionID as string;
if (reportActionID) {
if (!requestsByActionID[reportActionID]) {
requestsByActionID[reportActionID] = [];
}
requestsByActionID[reportActionID].push(request);
}
});

// Process requests with conflicting actions
Object.values(requestsByActionID).forEach((requests) => {
const conflictingRequests: OnyxRequest[] = [];
commands.forEach((command) => {
const conflictingRequest = requests.find((request) => request.command === command);
if (conflictingRequest) {
conflictingRequests.push(conflictingRequest);
}
});

if (conflictingRequests.length > 1) {
// Remove all requests as they cancel each other out
conflictingRequests.forEach((request) => {
// Perform action: Remove request from persisted requests
const index = persistedRequests.findIndex((req) => req === request);
if (index !== -1) {
persistedRequests.splice(index, 1);
}
});
}
});
}

/**
* Process any persisted requests, when online, one at a time until the queue is empty.
*
Expand All @@ -106,10 +64,6 @@ function process(): Promise<void> {
return Promise.resolve();
}

// Remove conflicting requests from the queue to avoid processing them
const commands = [WRITE_COMMANDS.ADD_COMMENT, WRITE_COMMANDS.DELETE_COMMENT];
reconcileRequests(persistedRequests, commands);

const requestToProcess = persistedRequests[0];

// Set the current request to a promise awaiting its processing so that getCurrentRequest can be used to take some action after the current request has processed.
Expand Down
3 changes: 0 additions & 3 deletions src/libs/Network/enhanceParameters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,5 @@ export default function enhanceParameters(command: string, parameters: Record<st

finalParameters.isFromDevEnv = Environment.isDevelopment();

// idempotencyKey declared in JS is front-end-only. We delete it here so it doesn't interfere with idempotency in other layers.
delete finalParameters.idempotencyKey;

return finalParameters;
}
9 changes: 4 additions & 5 deletions src/libs/actions/App.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,7 @@ function openApp() {
function reconnectApp(updateIDFrom: OnyxEntry<number> = 0) {
console.debug(`[OnyxUpdates] App reconnecting with updateIDFrom: ${updateIDFrom}`);
getPolicyParamsForOpenOrReconnect().then((policyParams) => {
const params: ReconnectAppParams = {
...policyParams,
idempotencyKey: `${WRITE_COMMANDS.RECONNECT_APP}`,
};
const params: ReconnectAppParams = policyParams;

// When the app reconnects we do a fast "sync" of the LHN and only return chats that have new messages. We achieve this by sending the most recent reportActionID.
// we have locally. And then only update the user about chats with messages that have occurred after that reportActionID.
Expand All @@ -242,7 +239,9 @@ function reconnectApp(updateIDFrom: OnyxEntry<number> = 0) {
params.updateIDFrom = updateIDFrom;
}

API.write(WRITE_COMMANDS.RECONNECT_APP, params, getOnyxDataForOpenOrReconnect());
API.write(WRITE_COMMANDS.RECONNECT_APP, params, getOnyxDataForOpenOrReconnect(), {
getConflictingRequests: (persistedRequests) => persistedRequests.filter((request) => request?.command === WRITE_COMMANDS.RECONNECT_APP),
});
});
}

Expand Down
39 changes: 30 additions & 9 deletions src/libs/actions/PersistedRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,38 @@ function clear() {
}

function save(requestToPersist: Request) {
const requests = [...persistedRequests];
const existingRequestIndex = requests.findIndex((request) => request.data?.idempotencyKey && request.data?.idempotencyKey === requestToPersist.data?.idempotencyKey);
if (existingRequestIndex > -1) {
// Merge the new request into the existing one, keeping its place in the queue
requests.splice(existingRequestIndex, 1, requestToPersist);
} else {
// If not, push the new request to the end of the queue
requests.push(requestToPersist);
const requests = [...persistedRequests, requestToPersist];

// identify and handle any existing requests that conflict with the new one
const {getConflictingRequests, handleConflictingRequest, shouldIncludeCurrentRequest} = requestToPersist;
if (getConflictingRequests) {
// Get all the requests, potentially including the one we're adding, which will always be at the end of the array
const potentiallyConflictingRequests = shouldIncludeCurrentRequest ? requests : requests.slice(0, requests.length - 1);

// Identify conflicting requests according to logic bound to the new request
const conflictingRequests = getConflictingRequests(potentiallyConflictingRequests);

conflictingRequests.forEach((conflictingRequest) => {
// delete the conflicting request
const index = requests.findIndex((req) => req === conflictingRequest);
if (index !== -1) {
requests.splice(index, 1);
}

// Allow the new request to perform any additional cleanup for a cancelled request
handleConflictingRequest?.(conflictingRequest);
});
}
persistedRequests = requests;

// Don't try to serialize conflict resolution functions
persistedRequests = requests.map((request) => {
delete request.getConflictingRequests;
delete request.handleConflictingRequest;
delete request.shouldIncludeCurrentRequest;
return request;
});

// Save the updated set of requests
Onyx.set(ONYXKEYS.PERSISTED_REQUESTS, requests);
}

Expand Down
41 changes: 35 additions & 6 deletions src/libs/actions/Report.ts
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,6 @@ function openReport(
emailList: participantLoginList ? participantLoginList.join(',') : '',
accountIDList: participantAccountIDList ? participantAccountIDList.join(',') : '',
parentReportActionID,
idempotencyKey: `${SIDE_EFFECT_REQUEST_COMMANDS.OPEN_REPORT}_${reportID}`,
};

if (ReportUtils.isGroupChat(newReportObject)) {
Expand All @@ -664,7 +663,8 @@ function openReport(
}

// If we are creating a new report, we need to add the optimistic report data and a report action
if (!isEmptyObject(newReportObject)) {
const isCreatingNewReport = !isEmptyObject(newReportObject);
if (isCreatingNewReport) {
// Change the method to set for new reports because it doesn't exist yet, is faster,
// and we need the data to be available when we navigate to the chat page
optimisticData[0].onyxMethod = Onyx.METHOD.SET;
Expand Down Expand Up @@ -748,7 +748,6 @@ function openReport(

// Add the createdReportActionID parameter to the API call
parameters.createdReportActionID = optimisticCreatedAction.reportActionID;
parameters.idempotencyKey = `${parameters.idempotencyKey}_NewReport_${optimisticCreatedAction.reportActionID}`;

// If we are creating a thread, ensure the report action has childReportID property added
if (newReportObject.parentReportID && parentReportActionID) {
Expand All @@ -774,7 +773,19 @@ function openReport(
});
} else {
// eslint-disable-next-line rulesdir/no-multiple-api-calls
API.write(WRITE_COMMANDS.OPEN_REPORT, parameters, {optimisticData, successData, failureData});
API.write(
WRITE_COMMANDS.OPEN_REPORT,
parameters,
{optimisticData, successData, failureData},
{
getConflictingRequests: (persistedRequests) =>
// requests conflict only if:
// 1. they are OpenReport commands
// 2. they have the same reportID
// 3. they are not creating a report - all calls to OpenReport that create a report will be unique and have a unique createdReportActionID
persistedRequests.filter((request) => request.command === WRITE_COMMANDS.OPEN_REPORT && request.data?.reportID === reportID && !request.data?.createdReportActionID),
},
);
}
}

Expand Down Expand Up @@ -1188,14 +1199,15 @@ function deleteReportComment(reportID: string, reportAction: ReportAction) {
return;
}

const isDeletedParentAction = ReportActionsUtils.isThreadParentMessage(reportAction, reportID);
const deletedMessage: Message[] = [
{
translationKey: '',
type: 'COMMENT',
html: '',
text: '',
isEdited: true,
isDeletedParentAction: ReportActionsUtils.isThreadParentMessage(reportAction, reportID),
isDeletedParentAction,
},
];
const optimisticReportActions: NullishDeep<ReportActions> = {
Expand Down Expand Up @@ -1292,7 +1304,24 @@ function deleteReportComment(reportID: string, reportAction: ReportAction) {
};

CachedPDFPaths.clearByKey(reportActionID);
API.write(WRITE_COMMANDS.DELETE_COMMENT, parameters, {optimisticData, successData, failureData});

API.write(
WRITE_COMMANDS.DELETE_COMMENT,
parameters,
{optimisticData, successData, failureData},
{
getConflictingRequests: (persistedRequests) => {
const conflictingCommands = (
isDeletedParentAction
? [WRITE_COMMANDS.UPDATE_COMMENT]
: [WRITE_COMMANDS.ADD_COMMENT, WRITE_COMMANDS.ADD_ATTACHMENT, WRITE_COMMANDS.UPDATE_COMMENT, WRITE_COMMANDS.DELETE_COMMENT]
) as string[];
return persistedRequests.filter((request) => conflictingCommands.includes(request.command) && request.data?.reportActionID === reportActionID);
},
handleConflictingRequest: () => Onyx.update(successData),
shouldIncludeCurrentRequest: !isDeletedParentAction,
},
);
}

/**
Expand Down
35 changes: 31 additions & 4 deletions src/types/onyx/Request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,40 @@ type RequestData = {
successData?: OnyxUpdate[];
failureData?: OnyxUpdate[];
finallyData?: OnyxUpdate[];
idempotencyKey?: string;

getConflictingRequests?: (persistedRequests: Request[]) => Request[];
handleConflictingRequest?: (persistedRequest: Request) => unknown;
resolve?: (value: Response) => void;
reject?: (value?: unknown) => void;
};

type Request = RequestData & OnyxData;
type RequestConflictResolver = {
/**
* A callback that's provided with all the currently serialized functions in the sequential queue.
* Should return a subset of the requests passed in that conflict with the new request.
* Any conflicting requests will be cancelled and removed from the queue.
*
* @example - In ReconnectApp, you'd only want to have one instance of that command serialized to run on reconnect. The callback for that might look like this:
* (persistedRequests) => persistedRequests.filter((request) => request.command === 'ReconnectApp')
* */
getConflictingRequests?: (persistedRequests: Request[]) => Request[];

/**
* Should the requests provided to getConflictingRequests include the new request?
* This is useful if the new request and an existing request cancel eachother out completely.
*
* @example - In DeleteComment, if you're deleting an optimistic comment, you'd want to cancel the optimistic AddComment call AND the DeleteComment call.
* */
shouldIncludeCurrentRequest?: boolean;

/**
* Callback to handle a single conflicting request.
* This is useful if you need to clean up some optimistic data for a request that was queue but never sent.
* In these cases the optimisticData will be applied immediately, but the successData, failureData, and/or finallyData will never be applied unless you do it manually in this callback.
*/
handleConflictingRequest?: (persistedRequest: Request) => unknown;
};

type Request = RequestData & OnyxData & RequestConflictResolver;

export default Request;
export type {OnyxData, RequestType};
export type {OnyxData, RequestType, RequestConflictResolver};
49 changes: 15 additions & 34 deletions tests/unit/PersistedRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ import type Request from '../../src/types/onyx/Request';

const request: Request = {
command: 'OpenReport',
data: {
idempotencyKey: 'OpenReport_1',
},
successData: [{key: 'reportMetadata_1', onyxMethod: 'merge', value: {}}],
failureData: [{key: 'reportMetadata_2', onyxMethod: 'merge', value: {}}],
};
Expand All @@ -20,44 +17,28 @@ afterEach(() => {
});

describe('PersistedRequests', () => {
it('save a new request with an idempotency key which currently exists in the PersistedRequests array', () => {
it('save a request without conflicts', () => {
PersistedRequests.save(request);
expect(PersistedRequests.getAll().length).toBe(1);
expect(PersistedRequests.getAll().length).toBe(2);
});

it('save a new request with a new idempotency key', () => {
it('save a new request with conflict resolution', () => {
const handleConflictingRequest = jest.fn();
const newRequest = {
command: 'OpenReport',
data: {
idempotencyKey: 'OpenReport_2',
},
command: 'ThingA',
getConflictingRequests: (requests: Request[]) => requests,
handleConflictingRequest,
};
PersistedRequests.save(newRequest);
expect(PersistedRequests.getAll().length).toBe(2);
});

it('replace a request existing in the PersistedRequests array with a new one', () => {
const newRequest: Request = {
command: 'OpenReport',
data: {
idempotencyKey: 'OpenReport_1',
},
successData: [{key: 'reportMetadata_3', onyxMethod: 'merge', value: {}}],
failureData: [{key: 'reportMetadata_4', onyxMethod: 'merge', value: {}}],
const secondRequest = {
command: 'ThingB',
getConflictingRequests: (requests: Request[]) => requests,
shouldIncludeCurrentRequest: true,
};

PersistedRequests.save(newRequest);

const persistedRequests = PersistedRequests.getAll();

expect(persistedRequests.length).toBe(1);

const mergedRequest = persistedRequests[0];

expect(mergedRequest.successData?.length).toBe(1);
expect(mergedRequest.failureData?.length).toBe(1);
expect(mergedRequest.successData?.[0]?.key).toBe('reportMetadata_3');
expect(mergedRequest.failureData?.[0]?.key).toBe('reportMetadata_4');
PersistedRequests.save(secondRequest);
expect(PersistedRequests.getAll().length).toBe(1);
expect(handleConflictingRequest).toHaveBeenCalledWith(request);
expect(handleConflictingRequest).toHaveBeenCalledTimes(1);
});

it('remove a request from the PersistedRequests array', () => {
Expand Down

0 comments on commit 3f3869a

Please sign in to comment.