Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reset to snapshot after successful sync #1221

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions packages/signaldb/__tests__/sync/SyncManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,42 @@ it('should handle error in remote changes with data', async () => {
expect(onError).toHaveBeenCalledWith({ name: 'test' }, new Error('Pull failed'))
})

it('should sync second time if there were changes during sync', async () => {
const mockPull = vi.fn<() => Promise<LoadResponse<TestItem>>>().mockResolvedValue({
items: [{ id: '1', name: 'Test Item' }],
})

const mockPush = vi.fn<(options: any, pushParams: any) => Promise<void>>()
.mockResolvedValue()

const onRemoteChangeHandler = vi.fn<(data?: LoadResponse<TestItem>) => void | Promise<void>>()
const onError = vi.fn()
const syncManager = new SyncManager({
onError,
persistenceAdapter: () => memoryPersistenceAdapter([]),
pull: mockPull,
push: mockPush,
registerRemoteChange: (_options, onRemoteChange) => {
onRemoteChangeHandler.mockImplementation(onRemoteChange)
},
})

const mockCollection = new Collection<TestItem, string, any>()

syncManager.addCollection(mockCollection, { name: 'test' })

// Simulate a remote change
const promise = onRemoteChangeHandler({ items: [{ id: '2', name: 'Remote Item' }] })
mockCollection.insert({ id: '1', name: 'Test Item' })
await expect(promise).resolves.not.toThrow()
expect(onError).toHaveBeenCalledTimes(0)

// wait to next tick
await new Promise((resolve) => { setTimeout(resolve, 0) })

expect(mockCollection.find().fetch()).toEqual([{ id: '1', name: 'Test Item' }])
})

it('should sync after a empty remote change was received', async () => {
const mockPull = vi.fn<() => Promise<LoadResponse<TestItem>>>().mockResolvedValue({
items: [{ id: '1', name: 'Test Item' }],
Expand Down Expand Up @@ -818,3 +854,62 @@ it('should not leave any remote changes after successful pull', async () => {
// @ts-expect-error - private property
expect(syncManager.remoteChanges.length).toBe(0)
})

it('should reset if syncmanager snapshot and collection are not in sync', async () => {
const mockPull = vi.fn<() => Promise<LoadResponse<TestItem>>>().mockResolvedValue({
items: [
{ id: '1', name: 'Test Item' },
{ id: '2', name: 'Test Item 2' },
],
})

const mockPush = vi.fn<(options: any, pushParams: any) => Promise<void>>()
.mockResolvedValue()
const onError = vi.fn()

const syncManager = new SyncManager({
onError,
persistenceAdapter: () => memoryPersistenceAdapter([]),
pull: mockPull,
push: mockPush,
})

const mockCollection = new Collection<TestItem, string, any>({
memory: [
{ id: '1', name: 'Test Item' },
{ id: 'x', name: 'Test Item 3' },
],
})

syncManager.addCollection(mockCollection, { name: 'test' })

await syncManager.sync('test')
expect(mockCollection.find().fetch()).toEqual([
{ id: '1', name: 'Test Item' },
{ id: '2', name: 'Test Item 2' },
])

// @ts-expect-error - private property
syncManager.snapshots.updateOne({ collectionName: 'test' }, {
// monkey patch the snapshot and add one item
$set: {
items: [
{ id: '1', name: 'Test Item' },
{ id: '2', name: 'Test Item 2' },
{ id: 'xxx', name: 'Test Item xxx' },
],
},
})

await syncManager.sync('test')
expect(mockCollection.find().fetch()).toEqual([
{ id: '1', name: 'Test Item' },
{ id: '2', name: 'Test Item 2' },
])

expect(onError).not.toHaveBeenCalled()
expect(mockPull).toHaveBeenCalled()

// @ts-expect-error - private property
expect(syncManager.remoteChanges.length).toBe(0)
})
14 changes: 14 additions & 0 deletions packages/signaldb/__tests__/sync/getSnapshot.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,17 @@ it('should handle undefined lastSnapshot and apply changes', () => {
const result = getSnapshot(lastSnapshot, data)
expect(result).toEqual([{ id: 3, name: 'Item 3' }])
})

it('should upsert changes', () => {
const lastSnapshot: TestItem[] = [{ id: 2, name: 'Item 2' }]
const data: LoadResponse<TestItem> = {
changes: {
added: [{ id: 2, name: 'Item 23' }],
modified: [{ id: 3, name: 'Item 3' }],
removed: [],
},
}

const result = getSnapshot(lastSnapshot, data)
expect(result).toEqual([{ id: 2, name: 'Item 23' }, { id: 3, name: 'Item 3' }])
})
15 changes: 13 additions & 2 deletions packages/signaldb/src/SyncManager/getSnapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,21 @@ export default function getSnapshot<ItemType extends BaseItem<IdType>, IdType>(
if (data.items != null) return data.items

const items = lastSnapshot || []
data.changes.added.forEach(item => items.push(item))
data.changes.added.forEach((item) => {
const index = items.findIndex(i => i.id === item.id)
if (index !== -1) {
items[index] = item
} else {
items.push(item)
}
})
data.changes.modified.forEach((item) => {
const index = items.findIndex(i => i.id === item.id)
if (index !== -1) items[index] = item
if (index !== -1) {
items[index] = item
} else {
items.push(item)
}
})
data.changes.removed.forEach((item) => {
const index = items.findIndex(i => i.id === item.id)
Expand Down
52 changes: 46 additions & 6 deletions packages/signaldb/src/SyncManager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ export default class SyncManager<
collection.updateOne({ id: itemId } as Record<string, any>, modifier)
},
remove: (itemId) => {
if (!collection.findOne({ id: itemId } as Record<string, any>)) return
this.remoteChanges.push({
collectionName: name,
type: 'remove',
Expand Down Expand Up @@ -538,12 +539,51 @@ export default class SyncManager<

// delay sync operation update to next tick to allow other tasks to run first
await new Promise((resolve) => { setTimeout(resolve, 0) })

const hasChanges = this.changes.find({
collectionName: name,
}).count() > 0

if (hasChanges) {
// check if there are unsynced changes to push
// and sync again if there are any
await this.sync(name, {
force: true,
onlyWithChanges: true,
})
return
}

// if there are no unsynced changes apply the last snapshot
// to make sure that collection and snapshot are in sync

// find all items that are not in the snapshot
const nonExistingItemIds = collection.find({
id: { $nin: snapshot.map(item => item.id) } as any,
}).map(item => item.id) as IdType[]

// find all items that are in the snapshot but not in the collection
const existingItemIds = new Set(collection.find({
id: { $in: snapshot.map(item => item.id) } as any,
}).map(item => item.id) as IdType[])

collection.batch(() => {
// update all items that are in the snapshot
snapshot.forEach((item) => {
const itemExists = existingItemIds.has(item.id)
/* istanbul ignore else -- @preserve */
if (itemExists) {
collection.updateOne({ id: item.id as any }, { $set: item })
} else { // this case should never happen
collection.insert(item)
}
})

// remove all items that are not in the snapshot
nonExistingItemIds.forEach((id) => {
collection.removeOne({ id: id as any })
})
})
})
// check if there are unsynced changes to push
// after the sync was finished successfully
.then(() => this.sync(name, {
force: true,
onlyWithChanges: true,
}))
}
}
Loading