Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking: Split conflict handler functionality #6524

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions src/plugin-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,37 +312,6 @@ export function wrapRxStorageInstance<RxDocType>(
tap(() => processingChangesCount$.next(processingChangesCount$.getValue() - 1))
);
},
conflictResultionTasks: () => {
return instance.conflictResultionTasks().pipe(
mergeMap(async (task) => {
const assumedMasterState = await fromStorage(task.input.assumedMasterState);
const newDocumentState = await fromStorage(task.input.newDocumentState);
const realMasterState = await fromStorage(task.input.realMasterState);
return {
id: task.id,
context: task.context,
input: {
assumedMasterState,
realMasterState,
newDocumentState
}
};
})
);
},
resolveConflictResultionTask: (taskSolution) => {
if (taskSolution.output.isEqual) {
return instance.resolveConflictResultionTask(taskSolution);
}
const useSolution = {
id: taskSolution.id,
output: {
isEqual: false,
documentData: taskSolution.output.documentData
}
};
return instance.resolveConflictResultionTask(useSolution);
}
};

return wrappedInstance;
Expand Down
39 changes: 15 additions & 24 deletions src/plugins/crdt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,31 +308,22 @@ export function getCRDTConflictHandler<RxDocType>(
const crdtField = crdtOptions.field;
const getCRDTValue = objectPathMonad<WithDeleted<RxDocType> | RxDocType, CRDTDocumentField<RxDocType>>(crdtField);

const conflictHandler: RxConflictHandler<RxDocType> = async (
i: RxConflictHandlerInput<RxDocType>,
_context: string
) => {
const newDocCrdt = getCRDTValue(i.newDocumentState);
const masterDocCrdt = getCRDTValue(i.realMasterState);

if (newDocCrdt.hash === masterDocCrdt.hash) {
return Promise.resolve({
isEqual: true
});
const conflictHandler: RxConflictHandler<RxDocType> = {
isEqual(a, b, ctx) {
return getCRDTValue(a).hash === getCRDTValue(b).hash;
},
async resolve(i) {
const newDocCrdt = getCRDTValue(i.newDocumentState);
const masterDocCrdt = getCRDTValue(i.realMasterState);
const mergedCrdt = await mergeCRDTFields(hashFunction, newDocCrdt, masterDocCrdt);
const mergedDoc = rebuildFromCRDT(
schema,
i.newDocumentState,
mergedCrdt
);
return mergedDoc;
}

const mergedCrdt = await mergeCRDTFields(hashFunction, newDocCrdt, masterDocCrdt);
const mergedDoc = rebuildFromCRDT(
schema,
i.newDocumentState,
mergedCrdt
);
return Promise.resolve({
isEqual: false,
documentData: mergedDoc
});
};

}
return conflictHandler;
}

Expand Down
5 changes: 1 addition & 4 deletions src/plugins/replication-couchdb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,7 @@ export function replicateCouchDB<RxDocType>(

if (
pushRow.assumedMasterState &&
(await conflictHandler({
realMasterState,
newDocumentState: pushRow.assumedMasterState
}, 'couchdb-push-1')).isEqual
conflictHandler.isEqual(realMasterState, pushRow.assumedMasterState, 'couchdb-push-1')
) {
remoteRevById.set(row.id, row.doc._rev);
nonConflictRows.push(pushRow);
Expand Down
8 changes: 0 additions & 8 deletions src/plugins/storage-denokv/rx-storage-instance-denokv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ import type {
RxStorageInstanceCreationParams,
EventBulk,
StringKeys,
RxConflictResultionTaskSolution,
RxStorageDefaultCheckpoint,
RxStorageCountResult,
RxConflictResultionTask,
PreparedQuery
} from '../../types/index.d.ts';
import { getPrimaryFieldOfPrimaryKey } from '../../rx-schema-helper.ts';
Expand Down Expand Up @@ -346,12 +344,6 @@ export class RxStorageInstanceDenoKV<RxDocType> implements RxStorageInstance<
await Promise.all(promises);
return this.close();
}
conflictResultionTasks(): Observable<RxConflictResultionTask<RxDocType>> {
return new Subject<any>().asObservable();
}
resolveConflictResultionTask(_taskSolution: RxConflictResultionTaskSolution<RxDocType>): Promise<void> {
return PROMISE_RESOLVE_VOID;
}
}


Expand Down
8 changes: 0 additions & 8 deletions src/plugins/storage-dexie/rx-storage-instance-dexie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import type {
RxStorageInstanceCreationParams,
EventBulk,
StringKeys,
RxConflictResultionTask,
RxConflictResultionTaskSolution,
RxStorageDefaultCheckpoint,
CategorizeBulkWriteRowsOutput,
RxStorageCountResult,
Expand Down Expand Up @@ -349,12 +347,6 @@ export class RxStorageInstanceDexie<RxDocType> implements RxStorageInstance<
})();
return this.closed;
}

conflictResultionTasks(): Observable<RxConflictResultionTask<RxDocType>> {
return new Subject();
}
async resolveConflictResultionTask(_taskSolution: RxConflictResultionTaskSolution<RxDocType>): Promise<void> { }

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import type {
EventBulk,
PreparedQuery,
RxAttachmentWriteData,
RxConflictResultionTask,
RxConflictResultionTaskSolution,
RxDocumentData,
RxJsonSchema,
RxStorageBulkWriteResponse,
Expand Down Expand Up @@ -321,14 +319,6 @@ export class RxStorageInstanceFoundationDB<RxDocType> implements RxStorageInstan

return noMoreUndeleted;
}

conflictResultionTasks(): Observable<RxConflictResultionTask<RxDocType>> {
return new Subject<any>().asObservable();
}
resolveConflictResultionTask(_taskSolution: RxConflictResultionTaskSolution<RxDocType>): Promise<void> {
return PROMISE_RESOLVE_VOID;
}

async close() {
if (this.closed) {
return this.closed;
Expand Down
10 changes: 0 additions & 10 deletions src/plugins/storage-lokijs/rx-storage-instance-loki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import type {
EventBulk,
StringKeys,
DeepReadonly,
RxConflictResultionTask,
RxConflictResultionTaskSolution,
RxStorageDefaultCheckpoint,
RxStorageCountResult,
PreparedQuery
Expand Down Expand Up @@ -106,12 +104,10 @@ export class RxStorageInstanceLoki<RxDocType> implements RxStorageInstance<
findDocumentsById: this.findDocumentsById.bind(this),
collectionName: this.collectionName,
databaseName: this.databaseName,
conflictResultionTasks: this.conflictResultionTasks.bind(this),
getAttachmentData: this.getAttachmentData.bind(this),
internals: this.internals,
options: this.options,
remove: this.remove.bind(this),
resolveConflictResultionTask: this.resolveConflictResultionTask.bind(this),
schema: this.schema
};

Expand Down Expand Up @@ -375,12 +371,6 @@ export class RxStorageInstanceLoki<RxDocType> implements RxStorageInstance<
await localState.databaseState.saveQueue.run();
return this.close();
}

conflictResultionTasks(): Observable<RxConflictResultionTask<RxDocType>> {
return new Subject();
}
async resolveConflictResultionTask(_taskSolution: RxConflictResultionTaskSolution<RxDocType>): Promise<void> { }

}

export async function createLokiLocalState<RxDocType>(
Expand Down
7 changes: 0 additions & 7 deletions src/plugins/storage-memory/memory-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import type {
CategorizeBulkWriteRowsOutput,
EventBulk,
RxAttachmentWriteData,
RxConflictResultionTask,
RxDocumentData,
RxJsonSchema,
RxStorage,
Expand Down Expand Up @@ -81,12 +80,6 @@ export type MemoryStorageInternals<RxDocType> = {
ensurePersistenceTask?: CategorizeBulkWriteRowsOutput<RxDocType>;
ensurePersistenceIdlePromise?: Promise<void>;

/**
* To easier test the conflict resolution,
* the memory storage exposes the conflict resolution task subject
* so that we can inject own tasks during tests.
*/
conflictResultionTasks$: Subject<RxConflictResultionTask<RxDocType>>;
changes$: Subject<EventBulk<RxStorageChangeEvent<RxDocumentData<RxDocType>>, RxStorageDefaultCheckpoint>>;
};

Expand Down
10 changes: 0 additions & 10 deletions src/plugins/storage-memory/rx-storage-instance-memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import type {
EventBulk,
PreparedQuery,
QueryMatcher,
RxConflictResultionTask,
RxConflictResultionTaskSolution,
RxDocumentData,
RxJsonSchema,
RxStorageBulkWriteResponse,
Expand Down Expand Up @@ -476,13 +474,6 @@ export class RxStorageInstanceMemory<RxDocType> implements RxStorageInstance<
this.internals.refCount = this.internals.refCount - 1;
return PROMISE_RESOLVE_VOID;
}

conflictResultionTasks(): Observable<RxConflictResultionTask<RxDocType>> {
return this.internals.conflictResultionTasks$.asObservable();
}
resolveConflictResultionTask(_taskSolution: RxConflictResultionTaskSolution<RxDocType>): Promise<void> {
return PROMISE_RESOLVE_VOID;
}
}

export function createMemoryStorageInstance<RxDocType>(
Expand All @@ -506,7 +497,6 @@ export function createMemoryStorageInstance<RxDocType>(
documents: new Map(),
attachments: params.schema.attachments ? new Map() : undefined as any,
byIndex: {},
conflictResultionTasks$: new Subject(),
changes$: new Subject()
};
addIndexesToInternalsState(internals, params.schema);
Expand Down
7 changes: 0 additions & 7 deletions src/plugins/storage-mongodb/rx-storage-instance-mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import type {
BulkWriteRow,
EventBulk,
PreparedQuery,
RxConflictResultionTask,
RxConflictResultionTaskSolution,
RxDocumentData,
RxJsonSchema,
RxStorageBulkWriteResponse,
Expand Down Expand Up @@ -445,11 +443,6 @@ export class RxStorageInstanceMongoDB<RxDocType> implements RxStorageInstance<
})();
return this.closed;
}

conflictResultionTasks(): Observable<RxConflictResultionTask<RxDocType>> {
return new Subject();
}
async resolveConflictResultionTask(_taskSolution: RxConflictResultionTaskSolution<RxDocType>): Promise<void> { }
}

export function createMongoDBStorageInstance<RxDocType>(
Expand Down
12 changes: 0 additions & 12 deletions src/plugins/storage-remote/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,6 @@ export function exposeRxStorageRemote(settings: RxStorageRemoteExposeSettings):
settings.send(message);
})
);
subs.push(
storageInstance.conflictResultionTasks().subscribe(conflicts => {
const message: MessageFromRemote = {
connectionId,
answerTo: 'conflictResultionTasks',
method: 'conflictResultionTasks',
return: conflicts
};
settings.send(message);
})
);


let connectionClosed = false;
function closeThisConnection() {
Expand Down
12 changes: 0 additions & 12 deletions src/plugins/storage-remote/rx-storage-remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import {
import type {
BulkWriteRow,
EventBulk,
RxConflictResultionTask,
RxConflictResultionTaskSolution,
RxDocumentData,
RxJsonSchema,
RxStorage,
Expand Down Expand Up @@ -167,7 +165,6 @@ function getMessageReturn(

export class RxStorageInstanceRemote<RxDocType> implements RxStorageInstance<RxDocType, RxStorageRemoteInternals, any, any> {
private changes$: Subject<EventBulk<RxStorageChangeEvent<RxDocumentData<RxDocType>>, any>> = new Subject();
private conflicts$: Subject<RxConflictResultionTask<RxDocType>> = new Subject();
private subs: Subscription[] = [];

private closed?: Promise<void>;
Expand All @@ -189,9 +186,6 @@ export class RxStorageInstanceRemote<RxDocType> implements RxStorageInstance<RxD
if (msg.method === 'changeStream') {
this.changes$.next(getMessageReturn(msg));
}
if (msg.method === 'conflictResultionTasks') {
this.conflicts$.next(msg.return);
}
})
);
}
Expand Down Expand Up @@ -280,12 +274,6 @@ export class RxStorageInstanceRemote<RxDocType> implements RxStorageInstance<RxD
})();
return this.closed;
}
conflictResultionTasks(): Observable<RxConflictResultionTask<RxDocType>> {
return this.conflicts$;
}
async resolveConflictResultionTask(taskSolution: RxConflictResultionTaskSolution<RxDocType>): Promise<void> {
await this.requestRemote('resolveConflictResultionTask', [taskSolution]);
}
}

export function getRxStorageRemote(settings: RxStorageRemoteSettings): RxStorageRemote {
Expand Down
Loading
Loading