Skip to content

Commit

Permalink
Document Tracker Notification workflow - skeletton + activity logic
Browse files Browse the repository at this point in the history
  • Loading branch information
PopDaph committed Dec 16, 2024
1 parent cd5bfcb commit 5d60de6
Show file tree
Hide file tree
Showing 14 changed files with 412 additions and 16 deletions.
5 changes: 5 additions & 0 deletions front/lib/models/doc_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ export class TrackerConfigurationModel extends SoftDeletableModel<TrackerConfigu
declare workspace: NonAttribute<Workspace>;
declare space: NonAttribute<SpaceModel>;
declare user: NonAttribute<UserModel> | null;
declare dataSourceConfigurations: NonAttribute<
TrackerDataSourceConfigurationModel[]
>;
declare generations: NonAttribute<TrackerGenerationModel[]>;
}

TrackerConfigurationModel.init(
Expand Down Expand Up @@ -266,6 +270,7 @@ TrackerGenerationModel.init(

TrackerConfigurationModel.hasMany(TrackerGenerationModel, {
foreignKey: { allowNull: false },
as: "generations",
onDelete: "RESTRICT",
});
TrackerGenerationModel.belongsTo(TrackerConfigurationModel, {
Expand Down
2 changes: 2 additions & 0 deletions front/lib/resources/resource_with_space.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export abstract class ResourceWithSpace<
} & { model: ModelStaticSoftDeletable<M> },
auth: Authenticator,
{
attributes,
includes,
limit,
order,
Expand All @@ -80,6 +81,7 @@ export abstract class ResourceWithSpace<
];

const blobs = await this.model.findAll({
attributes,
where: where as WhereOptions<M>,
include: includeClauses,
limit,
Expand Down
100 changes: 97 additions & 3 deletions front/lib/resources/tracker_resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import type {
Result,
TrackerConfigurationType,
TrackerDataSourceConfigurationType,
TrackerGenerationToProcess,
TrackerIdWorkspaceId,
} from "@dust-tt/types";
import { Err, Ok, removeNulls } from "@dust-tt/types";
import assert from "assert";
Expand All @@ -12,7 +14,9 @@ import type { Authenticator } from "@app/lib/auth";
import {
TrackerConfigurationModel,
TrackerDataSourceConfigurationModel,
TrackerGenerationModel,
} from "@app/lib/models/doc_tracker";
import { Workspace } from "@app/lib/models/workspace";
import { DataSourceViewResource } from "@app/lib/resources/data_source_view_resource";
import { ResourceWithSpace } from "@app/lib/resources/resource_with_space";
import type { SpaceResource } from "@app/lib/resources/space_resource";
Expand All @@ -32,6 +36,7 @@ export class TrackerConfigurationResource extends ResourceWithSpace<TrackerConfi
TrackerConfigurationModel;

readonly dataSourceConfigurations: TrackerDataSourceConfigurationModel[];
readonly generations: TrackerGenerationToProcess[];

constructor(
model: ModelStatic<TrackerConfigurationModel>,
Expand All @@ -42,6 +47,7 @@ export class TrackerConfigurationResource extends ResourceWithSpace<TrackerConfi
) {
super(TrackerConfigurationResource.model, blob, space);
this.dataSourceConfigurations = blob.dataSourceConfigurations;
this.generations = [];
}

static async makeNew(
Expand Down Expand Up @@ -219,16 +225,17 @@ export class TrackerConfigurationResource extends ResourceWithSpace<TrackerConfi
const trackers = await this.baseFetchWithAuthorization(auth, {
...options,
includes: [
...(options?.includes || []),
{
model: TrackerDataSourceConfigurationModel,
as: "dataSourceConfigurations",
},
],
});

// This is what enforces the accessibility to an app.
// This is what enforces the accessibility to a Tracker.
return trackers.filter(
(tracker) => auth.isBuilder() || tracker.canRead(auth)
(tracker) => auth.isAdmin() || tracker.canRead(auth)
);
}

Expand All @@ -253,6 +260,19 @@ export class TrackerConfigurationResource extends ResourceWithSpace<TrackerConfi
return tracker ?? null;
}

static async fetchFromModelId(
auth: Authenticator,
id: ModelId
): Promise<TrackerConfigurationResource | null> {
const tracker = await this.baseFetch(auth, {
where: {
id,
},
});

return tracker[0] ?? null;
}

static async listBySpace(
auth: Authenticator,
space: SpaceResource
Expand All @@ -264,6 +284,67 @@ export class TrackerConfigurationResource extends ResourceWithSpace<TrackerConfi
});
}

static async fetchWithGenerationsToConsume(
auth: Authenticator,
id: ModelId
): Promise<TrackerConfigurationType | null> {
const tracker = await this.baseFetch(auth, {
where: {
id,
status: "active",
},
includes: [
{
// @ts-expect-error @todo(DOC_TRACKER) Fix to remove the ts-expect-error.
model: TrackerGenerationModel,
// @ts-expect-error @todo(DOC_TRACKER) Fix to remove the ts-expect-error.
as: "generations",
where: {
consumedAt: null,
},
},
],
});

return tracker ? tracker[0].toJSON() : null;
}

// Internal method for fetching trackers without any authorization checks.
// Not intended for use outside of the Tracker workflow.
// Fetches the active trackers that have generations to consume.
static async internalFetchActivetrackersToProcessByNotificationWorkflow(): Promise<
TrackerIdWorkspaceId[]
> {
const trackers = await TrackerConfigurationResource.model.findAll({
attributes: ["id"],
where: {
status: "active",
},
include: [
{
model: Workspace,
attributes: ["sId"],
},
{
model: TrackerGenerationModel,
as: "generations",
where: {
consumedAt: null,
},
},
],
});

const filteredTrackers = trackers.filter(
(tracker) => tracker.generations.length
);

return filteredTrackers.map((tracker) => ({
trackerId: tracker.id,
workspaceId: tracker.workspace.sId,
}));
}

// Deletion.

protected async hardDelete(
Expand Down Expand Up @@ -322,7 +403,7 @@ export class TrackerConfigurationResource extends ResourceWithSpace<TrackerConfi
},
});

return {
const tracker: TrackerConfigurationType = {
id: this.id,
sId: this.sId,
name: this.name,
Expand All @@ -342,5 +423,18 @@ export class TrackerConfigurationResource extends ResourceWithSpace<TrackerConfi
.filter((dsc) => dsc.scope === "watched")
.map(dataSourceToJSON),
};

if (this.generations.length) {
tracker.generations = this.generations.map((g) => {
return {
id: g.id,
content: g.content,
thinking: g.thinking,
documentId: g.documentId,
};
});
}

return tracker;
}
}
1 change: 1 addition & 0 deletions front/lib/resources/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export type TypedIncludeable<M> = {
}[NonAttributeKeys<M>];

export type ResourceFindOptions<M extends Model> = {
attributes?: FindOptions<M>["attributes"];
includes?: TypedIncludeable<M>[];
limit?: number;
order?: FindOptions<M>["order"];
Expand Down
7 changes: 6 additions & 1 deletion front/start_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import { hideBin } from "yargs/helpers";

import logger from "@app/logger/logger";
import { runPokeWorker } from "@app/poke/temporal/worker";
import { runDocumentTrackerWorker } from "@app/temporal/document_tracker/worker";
import {
runDocumentTrackerWorker,
runTrackerNotificationWorker,
} from "@app/temporal/document_tracker/worker";
import { runHardDeleteWorker } from "@app/temporal/hard_delete/worker";
import { runLabsWorker } from "@app/temporal/labs/worker";
import { runMentionsCountWorker } from "@app/temporal/mentions_count_queue/worker";
Expand All @@ -24,6 +27,7 @@ type WorkerName =
| "permissions_queue"
| "poke"
| "document_tracker"
| "tracker_notification"
| "production_checks"
| "scrub_workspace_queue"
| "update_workspace_usage"
Expand All @@ -37,6 +41,7 @@ const workerFunctions: Record<WorkerName, () => Promise<void>> = {
permissions_queue: runPermissionsWorker,
poke: runPokeWorker,
document_tracker: runDocumentTrackerWorker,
tracker_notification: runTrackerNotificationWorker,
production_checks: runProductionChecksWorker,
scrub_workspace_queue: runScrubWorkspaceQueueWorker,
update_workspace_usage: runUpdateWorkspaceUsageWorker,
Expand Down
63 changes: 63 additions & 0 deletions front/temporal/document_tracker/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type {
CoreAPIDataSource,
CoreAPIDocument,
Result,
TrackerIdWorkspaceId,
} from "@dust-tt/types";
import { CoreAPI, Err, Ok } from "@dust-tt/types";

Expand All @@ -11,6 +12,7 @@ import { Authenticator } from "@app/lib/auth";
import { documentTrackerSuggestChanges } from "@app/lib/document_upsert_hooks/hooks/document_tracker/lib";
import { Workspace } from "@app/lib/models/workspace";
import { DataSourceResource } from "@app/lib/resources/data_source_resource";
import { TrackerConfigurationResource } from "@app/lib/resources/tracker_resource";
import { withRetries } from "@app/lib/utils/retries";
import logger from "@app/logger/logger";

Expand Down Expand Up @@ -120,3 +122,64 @@ async function getDataSourceDocument({
}
return new Ok(docText.value);
}

/**
* Tracker Notification Workflow:
* Activity to get tracker ids to notify activity.
* They are the active trackers that have generations to consume.
* @returns TrackerIdWorkspaceId[]
*/
export const getTrackerIdsToNotifyActivity = async (): Promise<
TrackerIdWorkspaceId[]
> => {
return TrackerConfigurationResource.internalFetchActivetrackersToProcessByNotificationWorkflow();
};

/**
* Tracker Notification Workflow:
* Activity to process tracker notification workflow activity.
* Consumes generations for the tracker: fetches the generations, send notifications and consume them.
*/
export const processTrackerNotificationWorkflowActivity = async ({
trackerId,
workspaceId,
currentSyncMs,
}: {
trackerId: number;
workspaceId: string;
currentSyncMs: number;
}) => {
const auth = await Authenticator.internalAdminForWorkspace(workspaceId);
const tracker =
await TrackerConfigurationResource.fetchWithGenerationsToConsume(
auth,
trackerId
);
if (!tracker) {
logger.error(
{
trackerId,
},
"[Tracker] Tracker not found."
);
return;
}

const generations = tracker.generations;
if (!generations?.length) {
return;
}

if (!tracker.recipients?.length) {
logger.error(
{
trackerId: tracker.id,
},
"[Tracker] No recipients found for tracker. Should not be possible from the UI."
);
return;
}

// @todo Implement the notification logic here.
void currentSyncMs;
};
2 changes: 2 additions & 0 deletions front/temporal/document_tracker/admin/cli.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/sh
npx tsx temporal/document_tracker/admin/cli.ts "$@"
50 changes: 50 additions & 0 deletions front/temporal/document_tracker/admin/cli.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import parseArgs from "minimist";

import {
launchTrackerNotificationWorkflow,
stopTrackerNotificationWorkflow,
} from "@app/temporal/document_tracker/client";

const main = async () => {
const argv = parseArgs(process.argv.slice(2));

const [command] = argv._;

console.log(`Running command: ${command}`);

switch (command) {
case "start":
await launchTrackerNotificationWorkflow();
return;
case "stop":
await stopTrackerNotificationWorkflow();
return;
case "notify":
const workspaceId = argv.workspaceId;
const trackerId = argv.trackerId;
if (!workspaceId || !trackerId) {
console.error("workspaceId and trackerId are required");
return;
}
await launchTrackerNotificationWorkflow([
{
workspaceId,
trackerId,
},
]);
return;
default:
return;
}
};

main()
.then(() => {
console.error("\x1b[32m%s\x1b[0m", `Done`);
process.exit(0);
})
.catch((err) => {
console.error("\x1b[31m%s\x1b[0m", `Error: ${err.message}`);
console.log(err);
process.exit(1);
});
Loading

0 comments on commit 5d60de6

Please sign in to comment.