Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Production checks] Gdrive GC #2505

Merged
merged 10 commits into from
Nov 13, 2023
Merged
72 changes: 72 additions & 0 deletions front/production_checks/checks/managed_data_source_gdrive_gc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { QueryTypes, Sequelize } from "sequelize";

import { getCoreDocuments } from "@app/production_checks/lib/managed_ds";
import { CheckFunction } from "@app/production_checks/types/check";

const {
CONNECTORS_DATABASE_READ_REPLICA_URI,
FRONT_DATABASE_READ_REPLICA_URI,
} = process.env;

export const managedDataSourceGCGdriveCheck: CheckFunction = async (
checkName,
reportSuccess,
reportFailure
) => {
const connectorsSequelize = new Sequelize(
CONNECTORS_DATABASE_READ_REPLICA_URI as string,
{
logging: false,
}
);
const frontSequelize = new Sequelize(
FRONT_DATABASE_READ_REPLICA_URI as string,
{
logging: false,
}
);
const GdriveDataSources: { id: number; connectorId: string }[] =
await frontSequelize.query(
`SELECT id, "connectorId" FROM data_sources WHERE "connectorProvider" = 'google_drive'`,
{ type: QueryTypes.SELECT }
);

for (const ds of GdriveDataSources) {
const coreDocumentsRes = await getCoreDocuments(ds.id);
if (coreDocumentsRes.isErr()) {
reportFailure(
{ frontDataSourceId: ds.id },
"Could not get core documents"
);
continue;
}
const coreDocuments = coreDocumentsRes.value;

const connectorDocuments: { id: number; coreDocumentId: string }[] =
await connectorsSequelize.query(
'SELECT id, "dustFileId" as "coreDocumentId" FROM google_drive_files WHERE "connectorId" = :connectorId',
{
replacements: {
connectorId: ds.connectorId,
},
type: QueryTypes.SELECT,
}
);

const coreDocumentIds = coreDocuments.map((d) => d.document_id);
const connectorDocumentIds = new Set(
connectorDocuments.map((d) => d.coreDocumentId)
);
const notDeleted = coreDocumentIds.filter(
(coreId) => !connectorDocumentIds.has(coreId)
);
if (notDeleted.length > 0) {
reportFailure(
{ notDeleted },
"Google Drive documents not properly Garbage collected"
);
} else {
reportSuccess({});
}
}
};
14 changes: 0 additions & 14 deletions front/production_checks/checks/super_check.ts

This file was deleted.

77 changes: 77 additions & 0 deletions front/production_checks/lib/managed_ds.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { QueryTypes, Sequelize } from "sequelize";

import { Err, Ok, Result } from "@app/lib/result";
const { CORE_DATABASE_READ_REPLICA_URI, FRONT_DATABASE_READ_REPLICA_URI } =
process.env;

export type CoreDSDocument = {
id: number;
document_id: string;
parents: string[];
};

export async function getCoreDocuments(
frontDataSourceId: number
): Promise<Result<CoreDSDocument[], Error>> {
const core_sequelize = new Sequelize(
CORE_DATABASE_READ_REPLICA_URI as string,
{
logging: false,
}
);
const front_sequelize = new Sequelize(
FRONT_DATABASE_READ_REPLICA_URI as string,
{
logging: false,
}
);

const managedDsData = await front_sequelize.query(
'SELECT id, "connectorId", "connectorProvider", "dustAPIProjectId" \
FROM data_sources WHERE id = :frontDataSourceId',
{
type: QueryTypes.SELECT,
replacements: {
frontDataSourceId: frontDataSourceId,
},
}
);
const managedDs = managedDsData as {
id: number;
dustAPIProjectId: string;
}[];
if (!managedDs.length) {
return new Err(
new Error(`Front data source not found for id ${frontDataSourceId}`)
);
}
const ds = managedDs[0];
const coreDsData = await core_sequelize.query(
`SELECT id FROM data_sources WHERE "project" = :dustAPIProjectId`,
{
replacements: {
dustAPIProjectId: ds.dustAPIProjectId,
},
type: QueryTypes.SELECT,
}
);
const coreDs = coreDsData as { id: number }[];
if (coreDs.length === 0) {
return new Err(
new Error(`Core data source not found for front datasource ${ds.id}`)
);
}
const coreDocumentsData = await core_sequelize.query(
`SELECT id, document_id, parents FROM data_sources_documents WHERE "data_source" = :coreDsId AND status = 'latest'`,
{
replacements: {
coreDsId: coreDs[0].id,
},
type: QueryTypes.SELECT,
}
);

const coreDocuments = coreDocumentsData as CoreDSDocument[];

return new Ok(coreDocuments);
}
11 changes: 7 additions & 4 deletions front/production_checks/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { Context } from "@temporalio/activity";
import { v4 as uuidv4 } from "uuid";

import mainLogger from "@app/logger/logger";
import { mySuperCheck } from "@app/production_checks/checks/super_check";
import { managedDataSourceGCGdriveCheck } from "@app/production_checks/checks/managed_data_source_gdrive_gc";
import { Check } from "@app/production_checks/types/check";

export async function runAllChecksActivity() {
const checks: Check[] = [
{
name: "mySuperCheck",
check: mySuperCheck,
name: "managed_data_source_gdrive_gc",
check: managedDataSourceGCGdriveCheck,
},
];
await runAllChecks(checks);
Expand All @@ -30,7 +30,10 @@ async function runAllChecks(checks: Check[]) {
logger.info({ reportPayload }, "Check succeeded");
};
const reportFailure = (reportPayload: unknown, message: string) => {
logger.error({ reportPayload, errorMessage: message }, "Check failed");
logger.error(
{ reportPayload, errorMessage: message },
"Production check failed"
);
};
Context.current().heartbeat({
type: "start",
Expand Down