Skip to content

Commit

Permalink
[Production checks] Gdrive GC (#2505)
Browse files Browse the repository at this point in the history
* [Production checks] Gdrive GC

* Clean up after self review

* Adding Slack

* Search Gdrive files by connectorId

* Using read only database

* Delete show case check

* Adjusting comment for failed check to "Production check failed"

* npm run format

* Simplified version. One file per check (Gdrive only)

* Clean up + tested
  • Loading branch information
lasryaric authored Nov 13, 2023
1 parent 8d0af84 commit 1389379
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 18 deletions.
72 changes: 72 additions & 0 deletions front/production_checks/checks/managed_data_source_gdrive_gc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { QueryTypes, Sequelize } from "sequelize";

import { getCoreDocuments } from "@app/production_checks/lib/managed_ds";
import { CheckFunction } from "@app/production_checks/types/check";

const {
CONNECTORS_DATABASE_READ_REPLICA_URI,
FRONT_DATABASE_READ_REPLICA_URI,
} = process.env;

export const managedDataSourceGCGdriveCheck: CheckFunction = async (
checkName,
reportSuccess,
reportFailure
) => {
const connectorsSequelize = new Sequelize(
CONNECTORS_DATABASE_READ_REPLICA_URI as string,
{
logging: false,
}
);
const frontSequelize = new Sequelize(
FRONT_DATABASE_READ_REPLICA_URI as string,
{
logging: false,
}
);
const GdriveDataSources: { id: number; connectorId: string }[] =
await frontSequelize.query(
`SELECT id, "connectorId" FROM data_sources WHERE "connectorProvider" = 'google_drive'`,
{ type: QueryTypes.SELECT }
);

for (const ds of GdriveDataSources) {
const coreDocumentsRes = await getCoreDocuments(ds.id);
if (coreDocumentsRes.isErr()) {
reportFailure(
{ frontDataSourceId: ds.id },
"Could not get core documents"
);
continue;
}
const coreDocuments = coreDocumentsRes.value;

const connectorDocuments: { id: number; coreDocumentId: string }[] =
await connectorsSequelize.query(
'SELECT id, "dustFileId" as "coreDocumentId" FROM google_drive_files WHERE "connectorId" = :connectorId',
{
replacements: {
connectorId: ds.connectorId,
},
type: QueryTypes.SELECT,
}
);

const coreDocumentIds = coreDocuments.map((d) => d.document_id);
const connectorDocumentIds = new Set(
connectorDocuments.map((d) => d.coreDocumentId)
);
const notDeleted = coreDocumentIds.filter(
(coreId) => !connectorDocumentIds.has(coreId)
);
if (notDeleted.length > 0) {
reportFailure(
{ notDeleted },
"Google Drive documents not properly Garbage collected"
);
} else {
reportSuccess({});
}
}
};
14 changes: 0 additions & 14 deletions front/production_checks/checks/super_check.ts

This file was deleted.

77 changes: 77 additions & 0 deletions front/production_checks/lib/managed_ds.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { QueryTypes, Sequelize } from "sequelize";

import { Err, Ok, Result } from "@app/lib/result";
const { CORE_DATABASE_READ_REPLICA_URI, FRONT_DATABASE_READ_REPLICA_URI } =
process.env;

export type CoreDSDocument = {
id: number;
document_id: string;
parents: string[];
};

export async function getCoreDocuments(
frontDataSourceId: number
): Promise<Result<CoreDSDocument[], Error>> {
const core_sequelize = new Sequelize(
CORE_DATABASE_READ_REPLICA_URI as string,
{
logging: false,
}
);
const front_sequelize = new Sequelize(
FRONT_DATABASE_READ_REPLICA_URI as string,
{
logging: false,
}
);

const managedDsData = await front_sequelize.query(
'SELECT id, "connectorId", "connectorProvider", "dustAPIProjectId" \
FROM data_sources WHERE id = :frontDataSourceId',
{
type: QueryTypes.SELECT,
replacements: {
frontDataSourceId: frontDataSourceId,
},
}
);
const managedDs = managedDsData as {
id: number;
dustAPIProjectId: string;
}[];
if (!managedDs.length) {
return new Err(
new Error(`Front data source not found for id ${frontDataSourceId}`)
);
}
const ds = managedDs[0];
const coreDsData = await core_sequelize.query(
`SELECT id FROM data_sources WHERE "project" = :dustAPIProjectId`,
{
replacements: {
dustAPIProjectId: ds.dustAPIProjectId,
},
type: QueryTypes.SELECT,
}
);
const coreDs = coreDsData as { id: number }[];
if (coreDs.length === 0) {
return new Err(
new Error(`Core data source not found for front datasource ${ds.id}`)
);
}
const coreDocumentsData = await core_sequelize.query(
`SELECT id, document_id, parents FROM data_sources_documents WHERE "data_source" = :coreDsId AND status = 'latest'`,
{
replacements: {
coreDsId: coreDs[0].id,
},
type: QueryTypes.SELECT,
}
);

const coreDocuments = coreDocumentsData as CoreDSDocument[];

return new Ok(coreDocuments);
}
11 changes: 7 additions & 4 deletions front/production_checks/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { Context } from "@temporalio/activity";
import { v4 as uuidv4 } from "uuid";

import mainLogger from "@app/logger/logger";
import { mySuperCheck } from "@app/production_checks/checks/super_check";
import { managedDataSourceGCGdriveCheck } from "@app/production_checks/checks/managed_data_source_gdrive_gc";
import { Check } from "@app/production_checks/types/check";

export async function runAllChecksActivity() {
const checks: Check[] = [
{
name: "mySuperCheck",
check: mySuperCheck,
name: "managed_data_source_gdrive_gc",
check: managedDataSourceGCGdriveCheck,
},
];
await runAllChecks(checks);
Expand All @@ -30,7 +30,10 @@ async function runAllChecks(checks: Check[]) {
logger.info({ reportPayload }, "Check succeeded");
};
const reportFailure = (reportPayload: unknown, message: string) => {
logger.error({ reportPayload, errorMessage: message }, "Check failed");
logger.error(
{ reportPayload, errorMessage: message },
"Production check failed"
);
};
Context.current().heartbeat({
type: "start",
Expand Down

0 comments on commit 1389379

Please sign in to comment.