Skip to content

Commit

Permalink
fix(database): high-availability issues especially on SQL dbs (#899)
Browse files Browse the repository at this point in the history
* refactor(database): propagate instanceSync to adapter schema creation
fix(database): mongoose will no longer try to create indexes during instance syncs
fix(datbase): sequelize will no longer try to sync schemas during instance syncs

* feat(database): add basic replica recognition to stop system schema syncs from replicas
fix(database): manually modify "sync" variable of sequelize models when on instance syncs
  • Loading branch information
kkopanidis authored Jan 18, 2024
1 parent 87884d3 commit db7c1f8
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 83 deletions.
15 changes: 9 additions & 6 deletions modules/database/src/Database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,23 @@ export default class DatabaseModule extends ManagedModule<void> {
}

async onServerStart() {
await this._activeAdapter.registerSystemSchema(models.DeclaredSchema);
await this._activeAdapter.registerSystemSchema(models.MigratedSchemas);
const isReplica = this.grpcSdk.isAvailable('database');
await this._activeAdapter.registerSystemSchema(models.DeclaredSchema, isReplica);
await this._activeAdapter.registerSystemSchema(models.MigratedSchemas, isReplica);
let modelPromises = Object.values(models).flatMap((model: ConduitSchema) => {
if (['_DeclaredSchema', 'MigratedSchemas'].includes(model.name)) return [];
return this._activeAdapter.registerSystemSchema(model);
return this._activeAdapter.registerSystemSchema(model, isReplica);
});
await Promise.all(modelPromises);
await this._activeAdapter.retrieveForeignSchemas();
await this._activeAdapter.recoverSchemasFromDatabase();
await this._activeAdapter.recoverViewsFromDatabase();
await runMigrations(this._activeAdapter);
if (!isReplica) {
await runMigrations(this._activeAdapter);
}
modelPromises = Object.values(models).flatMap((model: ConduitSchema) => {
return this._activeAdapter.registerSystemSchema(model).then(() => {
if (this._activeAdapter.getDatabaseType() !== 'MongoDB') {
return this._activeAdapter.registerSystemSchema(model, isReplica).then(() => {
if (this._activeAdapter.getDatabaseType() !== 'MongoDB' && !isReplica) {
return this._activeAdapter.syncSchema(model.name);
}
});
Expand Down
13 changes: 9 additions & 4 deletions modules/database/src/adapters/DatabaseAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ export abstract class DatabaseAdapter<T extends Schema> {
this.legacyDeployment = await this.hasLegacyCollections();
}

async registerSystemSchema(schema: ConduitSchema) {
async registerSystemSchema(schema: ConduitSchema, isReplica: boolean) {
// @dirty-type-cast
await this.createSchemaFromAdapter(schema);
await this.createSchemaFromAdapter(schema, false, false, isReplica);
this._systemSchemas.add(schema.name);
}

Expand Down Expand Up @@ -85,7 +85,11 @@ export abstract class DatabaseAdapter<T extends Schema> {
await this.addExtensionsFromSchemaModel(schema, gRPC);
stitchSchema(schema as ConduitDatabaseSchema); // @dirty-type-cast
const schemaUpdate = this.registeredSchemas.has(schema.name);
const createdSchema = await this._createSchemaFromAdapter(schema, !instanceSync);
const createdSchema = await this._createSchemaFromAdapter(
schema,
!instanceSync,
instanceSync,
);
this.hashSchemaFields(schema as ConduitDatabaseSchema); // @dirty-type-cast
if (!instanceSync && !schemaUpdate) {
ConduitGrpcSdk.Metrics?.increment('registered_schemas_total', 1, {
Expand Down Expand Up @@ -278,7 +282,7 @@ export abstract class DatabaseAdapter<T extends Schema> {
model,
!!model.modelOptions.conduit?.imported,
true,
false,
true,
);
});

Expand Down Expand Up @@ -377,6 +381,7 @@ export abstract class DatabaseAdapter<T extends Schema> {
protected abstract _createSchemaFromAdapter(
schema: ConduitSchema,
saveToDb: boolean,
instanceSync: boolean,
): Promise<Schema>;

protected async saveSchemaToDatabase(schema: ConduitSchema) {
Expand Down
3 changes: 2 additions & 1 deletion modules/database/src/adapters/mongoose-adapter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ export class MongooseAdapter extends DatabaseAdapter<MongooseSchema> {
protected async _createSchemaFromAdapter(
schema: ConduitDatabaseSchema,
saveToDb: boolean = true,
isInstanceSync: boolean = false,
): Promise<MongooseSchema> {
let compiledSchema = JSON.parse(JSON.stringify(schema));
validateFieldConstraints(compiledSchema, 'mongodb');
Expand Down Expand Up @@ -424,7 +425,7 @@ export class MongooseAdapter extends DatabaseAdapter<MongooseSchema> {
await this.saveSchemaToDatabase(schema);
}

if (indexes) {
if (indexes && !isInstanceSync) {
await this.createIndexes(schema.name, indexes, schema.ownerModule);
}
return this.models[schema.name];
Expand Down
81 changes: 9 additions & 72 deletions modules/database/src/adapters/sequelize-adapter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
} from '../../interfaces';
import { sqlSchemaConverter } from './sql-adapter/SqlSchemaConverter';
import { pgSchemaConverter } from './postgres-adapter/PgSchemaConverter';
import { isNil, merge } from 'lodash';
import { isNil } from 'lodash';

const sqlSchemaName = process.env.SQL_SCHEMA ?? 'public';

Expand Down Expand Up @@ -203,77 +203,10 @@ export abstract class SequelizeAdapter extends DatabaseAdapter<SequelizeSchema>
: schema.name;
}

private async processExtractedSchemas(
schema: ConduitDatabaseSchema,
extractedSchemas: Indexable,
associatedSchemas: { [key: string]: SequelizeSchema | SequelizeSchema[] },
) {
for (const extractedSchema in extractedSchemas) {
const modelOptions = merge({}, schema.modelOptions, {
conduit: {
cms: {
enabled: false,
crudOperations: {
read: {
enabled: false,
},
create: {
enabled: false,
},
delete: {
enabled: false,
},
update: {
enabled: false,
},
},
},

permissions: {
extendable: false,
canCreate: false,
canModify: 'Nothing',
canDelete: false,
},
},
});
let modeledSchema;
let isArray = false;
if (Array.isArray(extractedSchemas[extractedSchema])) {
isArray = true;
modeledSchema = new ConduitSchema(
`${schema.name}_${extractedSchema}`,
extractedSchemas[extractedSchema][0],
modelOptions,
`${schema.collectionName}_${extractedSchema}`,
);
} else {
modeledSchema = new ConduitSchema(
`${schema.name}_${extractedSchema}`,
extractedSchemas[extractedSchema],
modelOptions,
`${schema.collectionName}_${extractedSchema}`,
);
}

modeledSchema.ownerModule = schema.ownerModule;
(modeledSchema as ConduitDatabaseSchema).compiledFields = modeledSchema.fields;
// check index compatibility
const sequelizeSchema = await this._createSchemaFromAdapter(
modeledSchema as ConduitDatabaseSchema,
false,
{
parentSchema: schema.name,
},
);
associatedSchemas[extractedSchema] = isArray ? [sequelizeSchema] : sequelizeSchema;
}
}

protected async _createSchemaFromAdapter(
schema: ConduitDatabaseSchema,
saveToDb: boolean = true,
options?: { parentSchema: string },
isInstanceSync: boolean = false,
): Promise<SequelizeSchema> {
for (const [key, value] of Object.entries(this.views)) {
if (value.originalSchema.name === schema.name) {
Expand Down Expand Up @@ -309,13 +242,17 @@ export abstract class SequelizeAdapter extends DatabaseAdapter<SequelizeSchema>
objectPaths,
);

const noSync = this.models[schema.name].originalSchema.modelOptions.conduit!.noSync;
const noSync =
this.models[schema.name].originalSchema.modelOptions.conduit!.noSync ||
isInstanceSync;
// do not sync extracted schemas
if ((isNil(noSync) || !noSync) && !options) {
if (isNil(noSync) || !noSync) {
await this.models[schema.name].sync();
} else {
this.models[schema.name].synced = true;
}
// do not store extracted schemas to db
if (!options && saveToDb) {
if (saveToDb && !isInstanceSync) {
await this.compareAndStoreMigratedSchema(schema);
await this.saveSchemaToDatabase(schema);
}
Expand Down

0 comments on commit db7c1f8

Please sign in to comment.