Skip to content

Commit

Permalink
Trackers: frequency is a cron string
Browse files Browse the repository at this point in the history
  • Loading branch information
PopDaph committed Dec 17, 2024
1 parent c3bb3ad commit 4b5eeb8
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 43 deletions.
18 changes: 11 additions & 7 deletions front/components/trackers/TrackerBuilder.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import type {
} from "@dust-tt/types";
import {
CLAUDE_3_5_SONNET_DEFAULT_MODEL_CONFIG,
TRACKER_FREQUENCY_TYPES,
TRACKER_FREQUENCIES,
} from "@dust-tt/types";
import { useRouter } from "next/router";
import { useMemo, useState } from "react";
Expand Down Expand Up @@ -68,7 +68,7 @@ export const TrackerBuilder = ({
descriptionError: null,
prompt: null,
promptError: null,
frequency: "daily",
frequency: TRACKER_FREQUENCIES[0].value,
frequencyError: null,
recipients: "",
recipientsError: null,
Expand Down Expand Up @@ -379,20 +379,24 @@ export const TrackerBuilder = ({
<DropdownMenu>
<DropdownMenuTrigger asChild>
<Button
label={tracker.frequency}
label={
TRACKER_FREQUENCIES.find(
(f) => f.value === tracker.frequency
)?.label || "Select Frequency"
}
variant="outline"
isSelect
/>
</DropdownMenuTrigger>
<DropdownMenuContent>
{TRACKER_FREQUENCY_TYPES.map((f) => (
{TRACKER_FREQUENCIES.map(({ label, value }) => (
<DropdownMenuItem
key={f}
label={f}
key={label}
label={label}
onClick={() => {
setTracker((t) => ({
...t,
frequency: f,
frequency: value,
}));
if (!edited) {
setEdited(true);
Expand Down
39 changes: 24 additions & 15 deletions front/lib/resources/tracker_resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
} from "@dust-tt/types";
import { Err, Ok, removeNulls } from "@dust-tt/types";
import assert from "assert";
import { parseExpression } from "cron-parser";
import _ from "lodash";
import type { Attributes, CreationAttributes, ModelStatic } from "sequelize";
import { Op } from "sequelize";
Expand All @@ -26,6 +27,7 @@ import { frontSequelize } from "@app/lib/resources/storage";
import type { ReadonlyAttributesType } from "@app/lib/resources/storage/types";
import { getResourceIdFromSId, makeSId } from "@app/lib/resources/string_ids";
import type { ResourceFindOptions } from "@app/lib/resources/types";
import logger from "@app/logger/logger";

// Attributes are marked as read-only to reflect the stateless nature of our Resource.
// This design will be moved up to BaseResource once we transition away from Sequelize.
Expand Down Expand Up @@ -300,12 +302,12 @@ export class TrackerConfigurationResource extends ResourceWithSpace<TrackerConfi

// Internal method for fetching trackers without any authorization checks.
// Not intended for use outside of the Tracker workflow.
// Fetches the active trackers that have generations to consume.
static async internalFetchAllActiveWithUnconsumedGenerations(): Promise<
TrackerIdWorkspaceId[]
> {
// Fetches the active trackers that need to be processed for notifications.
static async internalFetchTrackersToNotify(
currentSyncMs: number
): Promise<TrackerIdWorkspaceId[]> {
const trackers = await TrackerConfigurationResource.model.findAll({
attributes: ["id"],
attributes: ["id", "frequency", "lastNotifiedAt"],
where: {
status: "active",
},
Expand All @@ -314,19 +316,26 @@ export class TrackerConfigurationResource extends ResourceWithSpace<TrackerConfi
model: Workspace,
attributes: ["sId"],
},
{
model: TrackerGenerationModel,
as: "generations",
where: {
consumedAt: null,
},
},
],
});

const filteredTrackers = trackers.filter(
(tracker) => tracker.generations.length
);
const filteredTrackers = trackers.filter((tracker) => {
if (!tracker.frequency) {
return false;
}
try {
const interval = parseExpression(tracker.frequency, {
currentDate: tracker.lastNotifiedAt || tracker.createdAt,
});
return interval.next().toDate().getTime() <= currentSyncMs;
} catch (e) {
logger.error(
{ trackerId: tracker.id },
"[Tracker] Invalid cron expression."
);
return false;
}
});

return filteredTrackers.map((tracker) => ({
trackerId: tracker.id,
Expand Down
12 changes: 12 additions & 0 deletions front/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions front/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"blake3": "^2.1.7",
"class-variance-authority": "^0.7.0",
"cmdk": "^1.0.0",
"cron-parser": "^4.9.0",
"csv-parse": "^5.5.2",
"csv-stringify": "^6.4.5",
"date-fns": "^3.6.0",
Expand Down
8 changes: 2 additions & 6 deletions front/pages/api/w/[wId]/spaces/[spaceId]/trackers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ import type {
TrackerConfigurationType,
WithAPIErrorResponse,
} from "@dust-tt/types";
import {
FrequencyCodec,
ModelIdCodec,
ModelProviderIdCodec,
} from "@dust-tt/types";
import { ModelIdCodec, ModelProviderIdCodec } from "@dust-tt/types";
import { isLeft } from "fp-ts/lib/Either";
import * as t from "io-ts";
import * as reporter from "io-ts-reporters";
Expand Down Expand Up @@ -46,7 +42,7 @@ export const PostTrackersRequestBodySchema = t.type({
prompt: t.union([t.string, t.null]),
modelId: ModelIdCodec,
providerId: ModelProviderIdCodec,
frequency: FrequencyCodec,
frequency: t.string,
temperature: t.number,
recipients: t.array(t.string),
maintainedDataSources: TrackerDataSourcesConfigurationBodySchema,
Expand Down
10 changes: 6 additions & 4 deletions front/temporal/tracker/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,12 @@ async function getDataSourceDocument({
* They are the active trackers that have generations to consume.
* @returns TrackerIdWorkspaceId[]
*/
export const getTrackerIdsToNotifyActivity = async (): Promise<
TrackerIdWorkspaceId[]
> => {
return TrackerConfigurationResource.internalFetchAllActiveWithUnconsumedGenerations();
export const getTrackerIdsToNotifyActivity = async (
currentSyncMs: number
): Promise<TrackerIdWorkspaceId[]> => {
return TrackerConfigurationResource.internalFetchTrackersToNotify(
currentSyncMs
);
};

/**
Expand Down
2 changes: 1 addition & 1 deletion front/temporal/tracker/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export async function trackersNotificationsWorkflow() {

// If we got no signal then we're on the scheduled execution: we process all trackers.
if (uniqueTrackers.size === 0) {
const trackers = await getTrackerIdsToNotifyActivity();
const trackers = await getTrackerIdsToNotifyActivity(currentSyncMs);
trackers.forEach((tracker) => uniqueTrackers.add(tracker));
}

Expand Down
13 changes: 3 additions & 10 deletions types/src/front/tracker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { ModelId } from "../shared/model_id";
import { ioTsEnum } from "../shared/utils/iots_utils";
import { DataSourceViewSelectionConfigurations } from "./data_source_view";
import { ModelIdType, ModelProviderIdType } from "./lib/assistant";
import { SpaceType } from "./space";
Expand Down Expand Up @@ -50,16 +49,10 @@ export type TrackerConfigurationStateType = {
watchedDataSources: DataSourceViewSelectionConfigurations;
};

export const TRACKER_FREQUENCY_TYPES: TrackerFrequencyType[] = [
"daily",
"weekly",
"monthly",
export const TRACKER_FREQUENCIES = [
{ label: "Daily", value: "0 17 * * 1-5" },
{ label: "Weekly", value: "0 17 * * 5" },
];
export type TrackerFrequencyType = "daily" | "weekly" | "monthly";

export const FrequencyCodec = ioTsEnum<
(typeof TRACKER_FREQUENCY_TYPES)[number]
>(TRACKER_FREQUENCY_TYPES);

export type TrackerIdWorkspaceId = {
trackerId: number;
Expand Down

0 comments on commit 4b5eeb8

Please sign in to comment.