Skip to content

Commit

Permalink
Save owners in batch to avoid slow save issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Falinor committed Nov 21, 2023
1 parent 7e2f239 commit ea712f4
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 41 deletions.
7 changes: 7 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
"npm-run-all": "^4.1.5",
"prettier": "2.8.0",
"supertest": "^6.2.4",
"tinybench": "^2.5.1",
"ts-jest": "^29.0.3",
"ts-node-dev": "^1.1.8",
"type-fest": "^3.5.0",
Expand Down
75 changes: 56 additions & 19 deletions scripts/import-datafoncier/ownerImporter.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
import Stream = Highland.Stream;
import highland from 'highland';
import { OwnerApi } from '../../server/models/OwnerApi';
import createDatafoncierOwnersRepository from './datafoncierOwnersRepository';
import { DatafoncierOwner, evaluate, tapAsync, toOwnerApi } from '../shared';
import ownerMatchRepository from '../../server/repositories/ownerMatchRepository';
import { DatafoncierOwner, evaluate, toOwnerApi } from '../shared';
import OwnerMatchRepository, {
OwnerMatchDBO,
} from '../../server/repositories/ownerMatchRepository';
import { isMatch } from '../shared/owner-processor/duplicates';
import ownerRepository from '../../server/repositories/ownerRepository';
import { logger } from '../../server/utils/logger';
import { isDefined } from '../../shared';
import OwnerRepository from '../../server/repositories/ownerRepository';
import Stream = Highland.Stream;
import ownerMatchRepository from '../../server/repositories/ownerMatchRepository';

export function ownerImporter(): Stream<OwnerApi> {
export function ownerImporter(
stream: Stream<DatafoncierOwner> = createDatafoncierOwnersRepository().stream()
) {
logger.info('Importing owners...');
return createDatafoncierOwnersRepository()
.stream()
.consume(tapAsync(processOwner))
.map(toOwnerApi)
return stream
.flatMap((dfOwner) => highland(processOwner(dfOwner)))
.batch(1_000)
.tap((owners) => {
logger.info(`Saving ${owners.length} owners...`);
})
.flatMap(save)
.errors((error) => {
logger.error(error);
});
}

interface Result {
match?: OwnerMatchDBO;
owner?: OwnerApi;
}

/**
* Link a DataFoncier owner to our owner.
* @param dfOwner
*/
export async function processOwner(dfOwner: DatafoncierOwner): Promise<void> {
export async function processOwner(dfOwner: DatafoncierOwner): Promise<Result> {
logger.debug(`Processing ${dfOwner.idpersonne}...`);
const dfOwnerApi = toOwnerApi(dfOwner);

Expand All @@ -33,18 +48,40 @@ export async function processOwner(dfOwner: DatafoncierOwner): Promise<void> {
if (!ownerMatch) {
const comparison = await evaluate(dfOwnerApi);
if (isMatch(comparison.score) && !comparison.needsReview) {
await ownerMatchRepository.save({
owner_id: comparison.duplicates[0].value.id,
idpersonne: dfOwner.idpersonne,
});
return {
match: {
owner_id: comparison.duplicates[0].value.id,
idpersonne: dfOwner.idpersonne,
},
};
} else {
await ownerRepository.save(dfOwnerApi);
await ownerMatchRepository.save({
owner_id: dfOwnerApi.id,
idpersonne: dfOwner.idpersonne,
});
return {
owner: dfOwnerApi,
match: {
owner_id: dfOwnerApi.id,
idpersonne: dfOwner.idpersonne,
},
};
}
}

return {};
}

function save(results: Result[]): Stream<void> {
async function saveResult(): Promise<void> {
const owners = results.map((result) => result.owner).filter(isDefined);
if (owners.length) {
await OwnerRepository.saveMany(owners);
}

const matches = results.map((result) => result.match).filter(isDefined);
if (matches.length) {
await OwnerMatchRepository.saveMany(matches);
}
}

return highland(saveResult());
}

export default ownerImporter;
74 changes: 53 additions & 21 deletions scripts/import-datafoncier/test/ownerImporter.test.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,71 @@
import { v4 as uuidv4 } from 'uuid';
import { genDatafoncierOwner } from '../../../server/test/testFixtures';
import { processOwner } from '../ownerImporter';
import { ownerImporter, processOwner } from '../ownerImporter';
import {
formatOwnerApi,
Owners,
} from '../../../server/repositories/ownerRepository';
import { OwnerMatches } from '../../../server/repositories/ownerMatchRepository';
import { OwnerApi } from '../../../server/models/OwnerApi';
import { toOwnerApi } from '../../shared';
import { DatafoncierOwner, toOwnerApi } from '../../shared';
import { OwnerEvents } from '../../../server/repositories/eventRepository';
import { HousingOwners } from '../../../server/repositories/housingOwnerRepository';
import highland from 'highland';
import { startTimer } from '../../shared/elapsed';
import { logger } from '../../../server/utils/logger';

describe('Import owners', () => {
describe('Benchmark', () => {
beforeEach(async () => {
await Promise.all([
HousingOwners().delete(),
OwnerEvents().delete(),
OwnerMatches().delete(),
]);
await Owners().delete();
});

it('should process a large amount of data', (done) => {
function* createGenerator(n: number) {
let i = 0;
while (i < n) {
yield genDatafoncierOwner();
i++;
}
}

const iterations = 5_000;
const generator = createGenerator(iterations);
const stream = highland<DatafoncierOwner>(generator);
startTimer((stopTimer) => {
stream.through(ownerImporter).done(() => {
const elapsed = stopTimer();
logger.info(`Done in ${elapsed}.`);
done();
});
});
// It should succeed within 2 minutes
}, 120_000 /* A specific timeout */);
});

describe('processOwner', () => {
const datafoncierOwner = genDatafoncierOwner();

describe('If there is no link between Datafoncier and ZLV owners', () => {
describe('If the DF owner cannot be matched with a ZLV owner', () => {
it('should create a new owner', async () => {
await processOwner(datafoncierOwner);
const actual = await processOwner(datafoncierOwner);

const actual = await Owners()
.where({ full_name: datafoncierOwner.ddenom })
.first();
expect(actual).toBeDefined();
expect(actual.owner).toBeDefined();
expect(actual.owner?.fullName).toBe(datafoncierOwner.ddenom);
});

it('should link the DF owner to the newly created owner', async () => {
await processOwner(datafoncierOwner);
const actual = await processOwner(datafoncierOwner);

const actual = await OwnerMatches()
.where({ idpersonne: datafoncierOwner.idpersonne })
.first();
expect(actual).toHaveProperty('owner_id', expect.any(String));
expect(actual.match).toBeDefined();
expect(actual.match?.idpersonne).toBe(datafoncierOwner.idpersonne);
expect(actual.match?.owner_id).toBeString();
});
});

Expand All @@ -45,15 +80,12 @@ describe('Import owners', () => {
});

it('should create the link', async () => {
await processOwner(datafoncierOwner);

const actual = await OwnerMatches()
.where({
idpersonne: datafoncierOwner.idpersonne,
owner_id: zlvOwner.id,
})
.first();
expect(actual).toBeDefined();
const actual = await processOwner(datafoncierOwner);

expect(actual.match).toStrictEqual({
idpersonne: datafoncierOwner.idpersonne,
owner_id: zlvOwner.id,
});
});
});
});
Expand Down
7 changes: 6 additions & 1 deletion server/repositories/ownerMatchRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ const findOne = async (opts: FindOneOptions): Promise<OwnerMatchDBO | null> => {
};

const save = async (ownerMatch: OwnerMatchDBO): Promise<void> => {
await OwnerMatches().insert(ownerMatch).onConflict().ignore();
await saveMany([ownerMatch]);
};

const saveMany = async (ownerMatches: OwnerMatchDBO[]): Promise<void> => {
await OwnerMatches().insert(ownerMatches).onConflict().ignore();
};

export interface OwnerMatchDBO {
Expand All @@ -31,6 +35,7 @@ const ownerMatchRepository = {
find,
findOne,
save,
saveMany,
};

export default ownerMatchRepository;

0 comments on commit ea712f4

Please sign in to comment.