Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record async migration execution time #615

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/migrator/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import {
filterAndSortPendingMigrations,
MigrationStatus,
BaseAsyncMigration,
migrationMetricsEmitter,
MigrationMetricsEventName,
} from './utils';

export const run = async (tx: Tx, model: ApiRootModel): Promise<void> => {
Expand Down Expand Up @@ -122,7 +124,7 @@ const $run = async (
let asyncRunnerMigratorFn: (tx: Tx) => Promise<number>;
let initMigrationState: InitialMigrationStatus = {
migration_key: key,
start_time: new Date(),
last_execution_time_ms: 0,
last_run_time: new Date(),
run_count: 0,
migrated_row_count: 0,
Expand Down Expand Up @@ -212,7 +214,7 @@ const $run = async (
);
return false;
}
// sync on the last execution time between instances
// sync on the execution time between instances
// precondition: All running instances are running on the same time/block
// skip execution
if (migrationState.last_run_time) {
Expand All @@ -229,11 +231,15 @@ const $run = async (
try {
// here a separate transaction is needed as this migration may fail
// when it fails it would break the transaction for managing the migration status
const executionStartTimeMS = Date.now();
const migratedRows = await sbvrUtils.db.transaction(
async (migrationTx) => {
return (await asyncRunnerMigratorFn?.(migrationTx)) ?? 0;
},
);
migrationState.last_execution_time_ms =
Date.now() - executionStartTimeMS;

migrationState.migrated_row_count += migratedRows;
if (migratedRows === 0) {
// when all rows have been catched up once we only catch up less frequently
Expand All @@ -250,7 +256,7 @@ const $run = async (
if (err instanceof Error) {
if (
migrationState.error_count %
initMigrationState.errorThreshold ===
initMigrationState.errorThreshold ===
0
) {
(sbvrUtils.api.migrations?.logger.error ?? console.error)(
Expand All @@ -268,6 +274,12 @@ const $run = async (
// either success or error release the lock
migrationState.last_run_time = new Date();
migrationState.run_count += 1;
// just emit
migrationMetricsEmitter.emit(
MigrationMetricsEventName.async_migration_status,
migrationState,
);

await updateMigrationStatus(tx, migrationState);
}
return migrationState;
Expand Down
8 changes: 4 additions & 4 deletions src/migrator/migrations.sbvr
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ Fact Type: migration lock has model name

Term: migration key
Concept Type: Short Text (Type)
Term: start time
Concept Type: Date Time (Type)
Term: last execution time ms
Concept Type: Integer (Type)
Term: last run time
Concept Type: Date Time (Type)
Term: run count
Expand All @@ -45,8 +45,8 @@ Term: migration status
Fact Type: migration status has migration key
Necessity: each migration status has exactly one migration key

Fact Type: migration status has start time
Necessity: each migration status has at most one start time
Fact Type: migration status has last execution time ms
Necessity: each migration status has at most one last execution time ms

Fact Type: migration status has last run time
Necessity: each migration status has at most one last run time
Expand Down
17 changes: 17 additions & 0 deletions src/migrator/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
RunnableMigrations,
filterAndSortPendingMigrations,
getRunnableSyncMigrations,
migrationMetricsEmitter,
MigrationMetricsEventName,
} from './utils';
import type { Tx } from '../database-layer/db';
import type { Config, Model } from '../config-loader/config-loader';
Expand Down Expand Up @@ -122,13 +124,24 @@ const executeMigration = async (
`Running migration ${JSON.stringify(key)}`,
);

const migrationStartTimeMs = Date.now();

if (typeof migration === 'function') {
await migration(tx, sbvrUtils);
} else if (typeof migration === 'string') {
await tx.executeSql(migration);
} else {
throw new MigrationError(`Invalid migration type: ${typeof migration}`);
}

// follow the same interface for migration_key and last_execution_time_ms as migration status
migrationMetricsEmitter.emit(
MigrationMetricsEventName.sync_migration_status,
{
migration_key: key,
last_execution_time_ms: Date.now() - migrationStartTimeMs,
},
);
};

export const config: Config = {
Expand All @@ -146,6 +159,10 @@ export const config: Config = {
ALTER TABLE "migration lock"
ADD COLUMN IF NOT EXISTS "modified at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL;
`,
'14.55.0-last-execution-time': `
ALTER TABLE "migration status"
ADD COLUMN IF NOT EXISTS "last execution time ms" INTEGER NULL;
Comment on lines +163 to +164
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also need to drop the start time column, and that potentially makes it a breaking change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Page-
I was unsure how to handle it. Basically PineJS doesn't know about old instances that reference the start time. My non favourite solution is to keep the column as is. My preferred solution would be a 2 staged deployment.

Basically it's not breaking the code it's breaking old-still running instances which require start time to be existing.
How was this handled in the past?

`,
},
},
],
Expand Down
22 changes: 16 additions & 6 deletions src/migrator/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import { TypedError } from 'typed-error';
import { migrator as migratorEnv } from '../config-loader/env';
export { migrator as migratorEnv } from '../config-loader/env';
import { delay } from '../sbvr-api/control-flow';
import { EventEmitter } from 'eventemitter3';

export const migrationMetricsEmitter = new EventEmitter();

export enum MigrationMetricsEventName {
'sync_migration_status' = 'sync_migration_status',
'async_migration_status' = 'async_migration_status',
}

// tslint:disable-next-line:no-var-requires
export const modelText = require('./migrations.sbvr');
Expand Down Expand Up @@ -87,7 +95,7 @@ export class MigrationError extends TypedError {}

export type MigrationStatus = {
migration_key: string;
start_time: Date;
last_execution_time_ms: number;
last_run_time: Date | null;
run_count: number;
migrated_row_count: number;
Expand Down Expand Up @@ -283,13 +291,13 @@ export const initMigrationStatus = async (
try {
return await tx.executeSql(
binds`
INSERT INTO "migration status" ("migration key", "start time", "is backing off", "run count")
INSERT INTO "migration status" ("migration key", "last execution time ms", "is backing off", "run count")
SELECT ${1}, ${2}, ${3}, ${4}
WHERE NOT EXISTS (SELECT 1 FROM "migration status" WHERE "migration key" = ${5})
`,
[
migrationStatus['migration_key'],
migrationStatus['start_time'],
migrationStatus['last_execution_time_ms'],
migrationStatus['is_backing_off'] ? 1 : 0,
migrationStatus['run_count'],
migrationStatus['migration_key'],
Expand All @@ -316,15 +324,17 @@ SET
"migrated row count" = ${3},
"error count" = ${4},
"converged time" = ${5},
"is backing off" = ${6}
WHERE "migration status"."migration key" = ${7};`,
"is backing off" = ${6},
"last execution time ms" = ${7}
WHERE "migration status"."migration key" = ${8};`,
[
migrationStatus['run_count'],
migrationStatus['last_run_time'],
migrationStatus['migrated_row_count'],
migrationStatus['error_count'],
migrationStatus['converged_time'],
migrationStatus['is_backing_off'] ? 1 : 0,
migrationStatus['last_execution_time_ms'],
migrationStatus['migration_key'],
],
);
Expand Down Expand Up @@ -355,7 +365,7 @@ LIMIT 1;`,

return {
migration_key: data['migration key'],
start_time: data['start time'],
last_execution_time_ms: data['last execution time ms'],
last_run_time: data['last run time'],
run_count: data['run count'],
migrated_row_count: data['migrated row count'],
Expand Down
2 changes: 1 addition & 1 deletion src/server-glue/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export * as env from '../config-loader/env';
export * as types from '../sbvr-api/common-types';
export * as hooks from '../sbvr-api/hooks';
export type { configLoader as ConfigLoader };
export type { migratorUtils as Migrator };
export { migratorUtils as Migrator };

let envDatabaseOptions: dbModule.DatabaseOptions<string>;
if (dbModule.engines.websql != null) {
Expand Down
25 changes: 24 additions & 1 deletion test/03-async-migrator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,29 @@ describe('03 Async Migrations', async function () {
expect(result[0]?.migrated_row_count - firstRowsMigrated).to.equal(1);
expect(Date.now().valueOf() - startTime).to.be.greaterThan(4000); // backOff time from migrator
});

it('should record last execution time ms of the last migration run', async function () {
let result: MigrationStatus[] = [];

const startTime = Date.now().valueOf();
// Wait until all migrations have run at least once
while (result.length === 0 || !result.every((row) => row.run_count > 0)) {
result = await getMigrationStatus();
}

const checkMigration0003 = result.find(
(row) => row.migration_key === '0003',
);
expect(checkMigration0003?.last_execution_time_ms).to.be.greaterThan(500); // should be reported greater than 500 ms (from the async migration function)
expect(checkMigration0003?.last_execution_time_ms).to.be.lessThan(2 * 500); // should not exceed twice the last execution time ms defined in the async migration function

/**
* Here it may need to be checked if the event for the async_migration_status metric was emitted.
* As the pine instance is running in a childprocess the EventEmitter is decoupled from the test instance.
* The EventEmitter exported from the migration utilities cannot be used for registering a listener, as it would
* instantiate a separate EventEmitter in this process, which is not linked to the child-process in which pine actually runs.
*/
});
});

describe('Init sync and async migrations for new model', function () {
Expand All @@ -226,7 +249,7 @@ describe('03 Async Migrations', async function () {
.to.be.an('array');
expect(res.body.d[0]?.executed_migrations)
.to.be.an('array')
.to.have.all.members(['0001', '0002']);
.to.have.all.members(['0001', '0002', '0003']);
});
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { AsyncMigration } from '../../../../../src/migrator/utils';

const migration: AsyncMigration = {
asyncFn: async (tx, options) => {
options;
return await tx.executeSql(`SELECT pg_sleep(0.5);`);
},
asyncBatchSize: 1,
syncFn: async (tx) => {
await tx.executeSql(`SELECT pg_sleep(0.5);`);
},
delayMS: 1000,
backoffDelayMS: 4000,
errorThreshold: 15,
finalize: false,
};

export default migration;