From db7c1f8ee16c32f0de5af68afbfc7a0afa68464c Mon Sep 17 00:00:00 2001 From: Konstantinos Kopanidis Date: Thu, 18 Jan 2024 15:11:43 +0200 Subject: [PATCH] fix(database): high-availability issues especially on SQL dbs (#899) * refactor(database): propagate instanceSync to adapter schema creation fix(database): mongoose will no longer try to create indexes during instance syncs fix(datbase): sequelize will no longer try to sync schemas during instance syncs * feat(database): add basic replica recognition to stop system schema syncs from replicas fix(database): manually modify "sync" variable of sequelize models when on instance syncs --- modules/database/src/Database.ts | 15 ++-- .../database/src/adapters/DatabaseAdapter.ts | 13 ++- .../src/adapters/mongoose-adapter/index.ts | 3 +- .../src/adapters/sequelize-adapter/index.ts | 81 +++---------------- 4 files changed, 29 insertions(+), 83 deletions(-) diff --git a/modules/database/src/Database.ts b/modules/database/src/Database.ts index 6f468d07a..f53b63ebf 100644 --- a/modules/database/src/Database.ts +++ b/modules/database/src/Database.ts @@ -105,20 +105,23 @@ export default class DatabaseModule extends ManagedModule { } async onServerStart() { - await this._activeAdapter.registerSystemSchema(models.DeclaredSchema); - await this._activeAdapter.registerSystemSchema(models.MigratedSchemas); + const isReplica = this.grpcSdk.isAvailable('database'); + await this._activeAdapter.registerSystemSchema(models.DeclaredSchema, isReplica); + await this._activeAdapter.registerSystemSchema(models.MigratedSchemas, isReplica); let modelPromises = Object.values(models).flatMap((model: ConduitSchema) => { if (['_DeclaredSchema', 'MigratedSchemas'].includes(model.name)) return []; - return this._activeAdapter.registerSystemSchema(model); + return this._activeAdapter.registerSystemSchema(model, isReplica); }); await Promise.all(modelPromises); await this._activeAdapter.retrieveForeignSchemas(); await this._activeAdapter.recoverSchemasFromDatabase(); await this._activeAdapter.recoverViewsFromDatabase(); - await runMigrations(this._activeAdapter); + if (!isReplica) { + await runMigrations(this._activeAdapter); + } modelPromises = Object.values(models).flatMap((model: ConduitSchema) => { - return this._activeAdapter.registerSystemSchema(model).then(() => { - if (this._activeAdapter.getDatabaseType() !== 'MongoDB') { + return this._activeAdapter.registerSystemSchema(model, isReplica).then(() => { + if (this._activeAdapter.getDatabaseType() !== 'MongoDB' && !isReplica) { return this._activeAdapter.syncSchema(model.name); } }); diff --git a/modules/database/src/adapters/DatabaseAdapter.ts b/modules/database/src/adapters/DatabaseAdapter.ts index 76bdfdde2..02c53a709 100644 --- a/modules/database/src/adapters/DatabaseAdapter.ts +++ b/modules/database/src/adapters/DatabaseAdapter.ts @@ -44,9 +44,9 @@ export abstract class DatabaseAdapter { this.legacyDeployment = await this.hasLegacyCollections(); } - async registerSystemSchema(schema: ConduitSchema) { + async registerSystemSchema(schema: ConduitSchema, isReplica: boolean) { // @dirty-type-cast - await this.createSchemaFromAdapter(schema); + await this.createSchemaFromAdapter(schema, false, false, isReplica); this._systemSchemas.add(schema.name); } @@ -85,7 +85,11 @@ export abstract class DatabaseAdapter { await this.addExtensionsFromSchemaModel(schema, gRPC); stitchSchema(schema as ConduitDatabaseSchema); // @dirty-type-cast const schemaUpdate = this.registeredSchemas.has(schema.name); - const createdSchema = await this._createSchemaFromAdapter(schema, !instanceSync); + const createdSchema = await this._createSchemaFromAdapter( + schema, + !instanceSync, + instanceSync, + ); this.hashSchemaFields(schema as ConduitDatabaseSchema); // @dirty-type-cast if (!instanceSync && !schemaUpdate) { ConduitGrpcSdk.Metrics?.increment('registered_schemas_total', 1, { @@ -278,7 +282,7 @@ export abstract class DatabaseAdapter { model, !!model.modelOptions.conduit?.imported, true, - false, + true, ); }); @@ -377,6 +381,7 @@ export abstract class DatabaseAdapter { protected abstract _createSchemaFromAdapter( schema: ConduitSchema, saveToDb: boolean, + instanceSync: boolean, ): Promise; protected async saveSchemaToDatabase(schema: ConduitSchema) { diff --git a/modules/database/src/adapters/mongoose-adapter/index.ts b/modules/database/src/adapters/mongoose-adapter/index.ts index 4f0360af3..66eaa3856 100644 --- a/modules/database/src/adapters/mongoose-adapter/index.ts +++ b/modules/database/src/adapters/mongoose-adapter/index.ts @@ -391,6 +391,7 @@ export class MongooseAdapter extends DatabaseAdapter { protected async _createSchemaFromAdapter( schema: ConduitDatabaseSchema, saveToDb: boolean = true, + isInstanceSync: boolean = false, ): Promise { let compiledSchema = JSON.parse(JSON.stringify(schema)); validateFieldConstraints(compiledSchema, 'mongodb'); @@ -424,7 +425,7 @@ export class MongooseAdapter extends DatabaseAdapter { await this.saveSchemaToDatabase(schema); } - if (indexes) { + if (indexes && !isInstanceSync) { await this.createIndexes(schema.name, indexes, schema.ownerModule); } return this.models[schema.name]; diff --git a/modules/database/src/adapters/sequelize-adapter/index.ts b/modules/database/src/adapters/sequelize-adapter/index.ts index 5790db1f1..4ae2974e4 100644 --- a/modules/database/src/adapters/sequelize-adapter/index.ts +++ b/modules/database/src/adapters/sequelize-adapter/index.ts @@ -28,7 +28,7 @@ import { } from '../../interfaces'; import { sqlSchemaConverter } from './sql-adapter/SqlSchemaConverter'; import { pgSchemaConverter } from './postgres-adapter/PgSchemaConverter'; -import { isNil, merge } from 'lodash'; +import { isNil } from 'lodash'; const sqlSchemaName = process.env.SQL_SCHEMA ?? 'public'; @@ -203,77 +203,10 @@ export abstract class SequelizeAdapter extends DatabaseAdapter : schema.name; } - private async processExtractedSchemas( - schema: ConduitDatabaseSchema, - extractedSchemas: Indexable, - associatedSchemas: { [key: string]: SequelizeSchema | SequelizeSchema[] }, - ) { - for (const extractedSchema in extractedSchemas) { - const modelOptions = merge({}, schema.modelOptions, { - conduit: { - cms: { - enabled: false, - crudOperations: { - read: { - enabled: false, - }, - create: { - enabled: false, - }, - delete: { - enabled: false, - }, - update: { - enabled: false, - }, - }, - }, - - permissions: { - extendable: false, - canCreate: false, - canModify: 'Nothing', - canDelete: false, - }, - }, - }); - let modeledSchema; - let isArray = false; - if (Array.isArray(extractedSchemas[extractedSchema])) { - isArray = true; - modeledSchema = new ConduitSchema( - `${schema.name}_${extractedSchema}`, - extractedSchemas[extractedSchema][0], - modelOptions, - `${schema.collectionName}_${extractedSchema}`, - ); - } else { - modeledSchema = new ConduitSchema( - `${schema.name}_${extractedSchema}`, - extractedSchemas[extractedSchema], - modelOptions, - `${schema.collectionName}_${extractedSchema}`, - ); - } - - modeledSchema.ownerModule = schema.ownerModule; - (modeledSchema as ConduitDatabaseSchema).compiledFields = modeledSchema.fields; - // check index compatibility - const sequelizeSchema = await this._createSchemaFromAdapter( - modeledSchema as ConduitDatabaseSchema, - false, - { - parentSchema: schema.name, - }, - ); - associatedSchemas[extractedSchema] = isArray ? [sequelizeSchema] : sequelizeSchema; - } - } - protected async _createSchemaFromAdapter( schema: ConduitDatabaseSchema, saveToDb: boolean = true, - options?: { parentSchema: string }, + isInstanceSync: boolean = false, ): Promise { for (const [key, value] of Object.entries(this.views)) { if (value.originalSchema.name === schema.name) { @@ -309,13 +242,17 @@ export abstract class SequelizeAdapter extends DatabaseAdapter objectPaths, ); - const noSync = this.models[schema.name].originalSchema.modelOptions.conduit!.noSync; + const noSync = + this.models[schema.name].originalSchema.modelOptions.conduit!.noSync || + isInstanceSync; // do not sync extracted schemas - if ((isNil(noSync) || !noSync) && !options) { + if (isNil(noSync) || !noSync) { await this.models[schema.name].sync(); + } else { + this.models[schema.name].synced = true; } // do not store extracted schemas to db - if (!options && saveToDb) { + if (saveToDb && !isInstanceSync) { await this.compareAndStoreMigratedSchema(schema); await this.saveSchemaToDatabase(schema); }