From 16120c413437eea84ebf8b45ab0c00ef9d35541f Mon Sep 17 00:00:00 2001 From: Aubin <60398825+aubin-tchoi@users.noreply.github.com> Date: Fri, 15 Nov 2024 15:46:48 +0100 Subject: [PATCH] [connectors] Implement incremental sync for Zendesk tickets (#8617) * feat: add a function that fetches the recently updated tickets * feat: add activities to update the recently updated tickets * feat: add an incremental sync workflow * perf: use the `show_many` endpoint instead of fetching all the users when updating the tickets * fix: prevent extra syncs of help centers/tickets when the whole brand is synced * fix: fix date conversions * fix: fix the structure of the response of the incremental endpoint * feat: implement ticket scrub * docs: add descriptions for a few functions * feat: add a log to the ticket deletion * fix: fix the use of the `show_many` endpoint * refactor: remove leading underscores in function names * refactor: move node-zendesk types to @connectors/@types (consistent with talisman) * refactor: remove an unnecessary import * refactor: inline getZendeskConnectorOrRaise and getZendeskCategoryOrRaise * refactor: inline syncBrandWithPermissions and fix incorrect permissions for brand * refactor: move sync_(article|ticket) from temporal to lib * fix: use the lastSyncSuccessfulTime as the start time instead of now - 5 min * fix: add a column lastSuccessfulSyncStartTs to the ZendeskConfiguration and use it * lint * fix: only sync solved tickets * feat: add a table to store workspace information * fix: update the type of ticket subjects and category descriptions from string to text * feat: add the migration file * feat: add ZendeskWorkspace to db.ts * feat: add the migration file * fix naming * fix: add the connectorId when creating a WorkspaceResource * refactor: renaming the table zendesk_workspaces into zendesk_timestamp_cursors, removing the Resource * fix migration file * fix: rename zendesk_workspaces * fix: add create the cursors upon saving * fix: recast cursor as date (received as a string) * fix: delete the cursor when deleting the connector * fix: fix how we handle StartTimeTooRecent errors --- connectors/migrations/db/migration_34.sql | 14 ++ .../node-zendesk.d.ts} | 7 +- connectors/src/admin/db.ts | 2 + .../zendesk/lib/brand_permissions.ts | 45 ++++- .../zendesk/lib/help_center_permissions.ts | 41 ++-- .../zendesk/{temporal => lib}/sync_article.ts | 4 +- .../zendesk/{temporal => lib}/sync_ticket.ts | 38 +++- .../zendesk/lib/ticket_permissions.ts | 43 +++- .../src/connectors/zendesk/lib/utils.ts | 104 ---------- .../src/connectors/zendesk/lib/zendesk_api.ts | 43 +++- .../connectors/zendesk/temporal/activities.ts | 183 ++++++++++++++++-- .../connectors/zendesk/temporal/workflows.ts | 65 +++++-- connectors/src/lib/models/zendesk.ts | 52 ++++- connectors/src/resources/connector/zendesk.ts | 5 + connectors/src/resources/zendesk_resources.ts | 22 +++ 15 files changed, 501 insertions(+), 167 deletions(-) create mode 100644 connectors/migrations/db/migration_34.sql rename connectors/src/{connectors/zendesk/lib/node-zendesk-types.d.ts => @types/node-zendesk.d.ts} (97%) rename connectors/src/connectors/zendesk/{temporal => lib}/sync_article.ts (98%) rename connectors/src/connectors/zendesk/{temporal => lib}/sync_ticket.ts (87%) delete mode 100644 connectors/src/connectors/zendesk/lib/utils.ts diff --git a/connectors/migrations/db/migration_34.sql b/connectors/migrations/db/migration_34.sql new file mode 100644 index 000000000000..2c8289e1da43 --- /dev/null +++ b/connectors/migrations/db/migration_34.sql @@ -0,0 +1,14 @@ +-- Migration created on Nov 14, 2024 +CREATE TABLE IF NOT EXISTS "zendesk_timestamp_cursors" +( + "id" SERIAL, + "createdAt" TIMESTAMP WITH TIME ZONE NOT NULL, + "updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL, + "timestampCursor" TIMESTAMP WITH TIME ZONE DEFAULT NULL, + "connectorId" INTEGER NOT NULL REFERENCES "connectors" ("id") ON DELETE RESTRICT ON UPDATE CASCADE, + PRIMARY KEY ("id") +); +CREATE UNIQUE INDEX "zendesk_timestamp_cursors_connector_id" ON "zendesk_timestamp_cursors" ("connectorId"); + +ALTER TABLE "public"."zendesk_categories" ALTER COLUMN "description" TYPE TEXT; +ALTER TABLE "public"."zendesk_tickets" ALTER COLUMN "subject" TYPE TEXT; diff --git a/connectors/src/connectors/zendesk/lib/node-zendesk-types.d.ts b/connectors/src/@types/node-zendesk.d.ts similarity index 97% rename from connectors/src/connectors/zendesk/lib/node-zendesk-types.d.ts rename to connectors/src/@types/node-zendesk.d.ts index b18bdf7f14a6..c030b65f4e9b 100644 --- a/connectors/src/connectors/zendesk/lib/node-zendesk-types.d.ts +++ b/connectors/src/@types/node-zendesk.d.ts @@ -1,5 +1,3 @@ -import "node-zendesk"; - import type { ZendeskClientOptions } from "node-zendesk"; interface ZendeskFetchedBrand { @@ -118,7 +116,7 @@ interface ZendeskFetchedTicket { score: string; }; sharing_agreement_ids: number[]; - status: "new" | "open" | "pending" | "hold" | "solved" | "closed"; + status: "new" | "open" | "pending" | "hold" | "solved" | "closed" | "deleted"; subject: string; submitter_id: number; tags: string[]; @@ -239,6 +237,9 @@ declare module "node-zendesk" { show: ( userId: number ) => Promise<{ response: Response; result: ZendeskFetchedUser }>; + showMany: ( + userIds: number[] + ) => Promise<{ response: Response; result: ZendeskFetchedUser[] }>; }; } diff --git a/connectors/src/admin/db.ts b/connectors/src/admin/db.ts index 27617feda37e..79cc3bd4cfe9 100644 --- a/connectors/src/admin/db.ts +++ b/connectors/src/admin/db.ts @@ -67,6 +67,7 @@ import { ZendeskCategory, ZendeskConfiguration, ZendeskTicket, + ZendeskTimestampCursors, } from "@connectors/lib/models/zendesk"; import logger from "@connectors/logger/logger"; import { sequelizeConnection } from "@connectors/resources/storage"; @@ -120,6 +121,7 @@ async function main(): Promise { await RemoteDatabaseModel.sync({ alter: true }); await RemoteSchemaModel.sync({ alter: true }); await RemoteTableModel.sync({ alter: true }); + await ZendeskTimestampCursors.sync({ alter: true }); await ZendeskConfiguration.sync({ alter: true }); await ZendeskBrand.sync({ alter: true }); await ZendeskCategory.sync({ alter: true }); diff --git a/connectors/src/connectors/zendesk/lib/brand_permissions.ts b/connectors/src/connectors/zendesk/lib/brand_permissions.ts index bb806674487b..85fb64f2aee3 100644 --- a/connectors/src/connectors/zendesk/lib/brand_permissions.ts +++ b/connectors/src/connectors/zendesk/lib/brand_permissions.ts @@ -2,7 +2,8 @@ import type { ModelId } from "@dust-tt/types"; import { allowSyncZendeskHelpCenter } from "@connectors/connectors/zendesk/lib/help_center_permissions"; import { allowSyncZendeskTickets } from "@connectors/connectors/zendesk/lib/ticket_permissions"; -import { syncBrandWithPermissions } from "@connectors/connectors/zendesk/lib/utils"; +import { getZendeskSubdomainAndAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token"; +import { createZendeskClient } from "@connectors/connectors/zendesk/lib/zendesk_api"; import logger from "@connectors/logger/logger"; import { ZendeskBrandResource } from "@connectors/resources/zendesk_resources"; @@ -18,17 +19,43 @@ export async function allowSyncZendeskBrand({ connectionId: string; brandId: number; }): Promise { - const syncSuccess = await syncBrandWithPermissions({ + const brand = await ZendeskBrandResource.fetchByBrandId({ connectorId, - connectionId, brandId, - permissions: { - ticketsPermission: "none", - helpCenterPermission: "read", - }, }); - if (!syncSuccess) { - return false; // stopping early if the brand sync failed + + if (brand) { + await brand.grantTicketsPermissions(); + await brand.grantHelpCenterPermissions(); + } else { + // fetching the brand from Zendesk + const zendeskApiClient = createZendeskClient( + await getZendeskSubdomainAndAccessToken(connectionId) + ); + const { + result: { brand: fetchedBrand }, + } = await zendeskApiClient.brand.show(brandId); + + if (!fetchedBrand) { + logger.error( + { connectorId, brandId }, + "[Zendesk] Brand could not be fetched." + ); + return false; + } + + await ZendeskBrandResource.makeNew({ + blob: { + subdomain: fetchedBrand.subdomain, + connectorId: connectorId, + brandId: fetchedBrand.id, + name: fetchedBrand.name || "Brand", + ticketsPermission: "read", + helpCenterPermission: "read", + hasHelpCenter: fetchedBrand.has_help_center, + url: fetchedBrand.url, + }, + }); } await allowSyncZendeskHelpCenter({ diff --git a/connectors/src/connectors/zendesk/lib/help_center_permissions.ts b/connectors/src/connectors/zendesk/lib/help_center_permissions.ts index b074399153cf..dd80c6a87be8 100644 --- a/connectors/src/connectors/zendesk/lib/help_center_permissions.ts +++ b/connectors/src/connectors/zendesk/lib/help_center_permissions.ts @@ -1,6 +1,5 @@ import type { ModelId } from "@dust-tt/types"; -import { syncBrandWithPermissions } from "@connectors/connectors/zendesk/lib/utils"; import { getZendeskSubdomainAndAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token"; import { changeZendeskClientSubdomain, @@ -30,19 +29,39 @@ export async function allowSyncZendeskHelpCenter({ const zendeskApiClient = createZendeskClient( await getZendeskSubdomainAndAccessToken(connectionId) ); - - const syncSuccess = await syncBrandWithPermissions({ - zendeskApiClient, - connectionId, + const brand = await ZendeskBrandResource.fetchByBrandId({ connectorId, brandId, - permissions: { - ticketsPermission: "none", - helpCenterPermission: "read", - }, }); - if (!syncSuccess) { - return false; // stopping early if the brand sync failed + + if (brand) { + await brand.grantHelpCenterPermissions(); + } else { + // fetching the brand from Zendesk + const { + result: { brand: fetchedBrand }, + } = await zendeskApiClient.brand.show(brandId); + + if (!fetchedBrand) { + logger.error( + { connectorId, brandId }, + "[Zendesk] Brand could not be fetched." + ); + return false; + } + + await ZendeskBrandResource.makeNew({ + blob: { + subdomain: fetchedBrand.subdomain, + connectorId: connectorId, + brandId: fetchedBrand.id, + name: fetchedBrand.name || "Brand", + ticketsPermission: "none", + helpCenterPermission: "read", + hasHelpCenter: fetchedBrand.has_help_center, + url: fetchedBrand.url, + }, + }); } // updating permissions for all the children categories diff --git a/connectors/src/connectors/zendesk/temporal/sync_article.ts b/connectors/src/connectors/zendesk/lib/sync_article.ts similarity index 98% rename from connectors/src/connectors/zendesk/temporal/sync_article.ts rename to connectors/src/connectors/zendesk/lib/sync_article.ts index f70a72c80960..0774f6be3e14 100644 --- a/connectors/src/connectors/zendesk/temporal/sync_article.ts +++ b/connectors/src/connectors/zendesk/lib/sync_article.ts @@ -1,12 +1,12 @@ import type { ModelId } from "@dust-tt/types"; import TurndownService from "turndown"; -import { getArticleInternalId } from "@connectors/connectors/zendesk/lib/id_conversions"; import type { ZendeskFetchedArticle, ZendeskFetchedSection, ZendeskFetchedUser, -} from "@connectors/connectors/zendesk/lib/node-zendesk-types"; +} from "@connectors/@types/node-zendesk"; +import { getArticleInternalId } from "@connectors/connectors/zendesk/lib/id_conversions"; import { renderDocumentTitleAndContent, renderMarkdownSection, diff --git a/connectors/src/connectors/zendesk/temporal/sync_ticket.ts b/connectors/src/connectors/zendesk/lib/sync_ticket.ts similarity index 87% rename from connectors/src/connectors/zendesk/temporal/sync_ticket.ts rename to connectors/src/connectors/zendesk/lib/sync_ticket.ts index a2e1904f8459..3fc289880b44 100644 --- a/connectors/src/connectors/zendesk/temporal/sync_ticket.ts +++ b/connectors/src/connectors/zendesk/lib/sync_ticket.ts @@ -1,13 +1,14 @@ import type { ModelId } from "@dust-tt/types"; import TurndownService from "turndown"; -import { getTicketInternalId } from "@connectors/connectors/zendesk/lib/id_conversions"; import type { ZendeskFetchedTicket, ZendeskFetchedTicketComment, ZendeskFetchedUser, -} from "@connectors/connectors/zendesk/lib/node-zendesk-types"; +} from "@connectors/@types/node-zendesk"; +import { getTicketInternalId } from "@connectors/connectors/zendesk/lib/id_conversions"; import { + deleteFromDataSource, renderDocumentTitleAndContent, renderMarkdownSection, upsertToDatasource, @@ -18,6 +19,39 @@ import type { DataSourceConfig } from "@connectors/types/data_source_config"; const turndownService = new TurndownService(); +/** + * Deletes a ticket from the db and the data sources. + */ +export async function deleteTicket( + connectorId: ModelId, + ticket: ZendeskFetchedTicket, + dataSourceConfig: DataSourceConfig, + loggerArgs: Record +): Promise { + logger.info( + { + ...loggerArgs, + connectorId, + ticketId: ticket.id, + subject: ticket.subject, + }, + "[Zendesk] Deleting ticket." + ); + await Promise.all([ + ZendeskTicketResource.deleteByTicketId({ + connectorId, + ticketId: ticket.id, + }), + deleteFromDataSource( + dataSourceConfig, + getTicketInternalId(connectorId, ticket.id) + ), + ]); +} + +/** + * Syncs a ticket in the db and upserts it to the data sources. + */ export async function syncTicket({ connectorId, ticket, diff --git a/connectors/src/connectors/zendesk/lib/ticket_permissions.ts b/connectors/src/connectors/zendesk/lib/ticket_permissions.ts index a136020617c6..77bb53735b02 100644 --- a/connectors/src/connectors/zendesk/lib/ticket_permissions.ts +++ b/connectors/src/connectors/zendesk/lib/ticket_permissions.ts @@ -1,6 +1,7 @@ import type { ModelId } from "@dust-tt/types"; -import { syncBrandWithPermissions } from "@connectors/connectors/zendesk/lib/utils"; +import { getZendeskSubdomainAndAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token"; +import { createZendeskClient } from "@connectors/connectors/zendesk/lib/zendesk_api"; import logger from "@connectors/logger/logger"; import { ZendeskBrandResource, @@ -19,15 +20,43 @@ export async function allowSyncZendeskTickets({ connectionId: string; brandId: number; }): Promise { - return syncBrandWithPermissions({ + const brand = await ZendeskBrandResource.fetchByBrandId({ connectorId, - connectionId, brandId, - permissions: { - ticketsPermission: "read", - helpCenterPermission: "none", - }, }); + + if (brand) { + await brand.grantTicketsPermissions(); + return true; + } + // fetching the brand from Zendesk + const zendeskApiClient = createZendeskClient( + await getZendeskSubdomainAndAccessToken(connectionId) + ); + const { + result: { brand: fetchedBrand }, + } = await zendeskApiClient.brand.show(brandId); + + if (fetchedBrand) { + await ZendeskBrandResource.makeNew({ + blob: { + subdomain: fetchedBrand.subdomain, + connectorId: connectorId, + brandId: fetchedBrand.id, + name: fetchedBrand.name || "Brand", + ticketsPermission: "read", + helpCenterPermission: "none", + hasHelpCenter: fetchedBrand.has_help_center, + url: fetchedBrand.url, + }, + }); + return true; + } + logger.error( + { connectorId, brandId }, + "[Zendesk] Brand could not be fetched." + ); + return false; } /** diff --git a/connectors/src/connectors/zendesk/lib/utils.ts b/connectors/src/connectors/zendesk/lib/utils.ts deleted file mode 100644 index 8fe96b7dc37d..000000000000 --- a/connectors/src/connectors/zendesk/lib/utils.ts +++ /dev/null @@ -1,104 +0,0 @@ -import type { ModelId } from "@dust-tt/types"; -import type { Client } from "node-zendesk"; - -import { getZendeskSubdomainAndAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token"; -import { createZendeskClient } from "@connectors/connectors/zendesk/lib/zendesk_api"; -import logger from "@connectors/logger/logger"; -import { ConnectorResource } from "@connectors/resources/connector_resource"; -import { - ZendeskBrandResource, - ZendeskCategoryResource, -} from "@connectors/resources/zendesk_resources"; - -export async function _getZendeskConnectorOrRaise(connectorId: ModelId) { - const connector = await ConnectorResource.fetchById(connectorId); - if (!connector) { - throw new Error("[Zendesk] Connector not found."); - } - return connector; -} - -export async function _getZendeskCategoryOrRaise({ - connectorId, - categoryId, -}: { - connectorId: ModelId; - categoryId: number; -}) { - const category = await ZendeskCategoryResource.fetchByCategoryId({ - connectorId, - categoryId, - }); - if (!category) { - throw new Error( - `[Zendesk] Category not found, connectorId: ${connectorId}, categoryId: ${categoryId}` - ); - } - return category; -} - -/** - * Syncs the permissions of a brand, fetching it and pushing it if not found in the db. - * Only works to grant permissions, to revoke permissions you would always have it in db and thus directly update. - * @returns True if the fetch succeeded, false otherwise. - */ -export async function syncBrandWithPermissions({ - zendeskApiClient = null, - connectorId, - connectionId, - brandId, - permissions, -}: { - zendeskApiClient?: Client | null; - connectorId: ModelId; - connectionId: string; - brandId: number; - permissions: { - ticketsPermission: "read" | "none"; - helpCenterPermission: "read" | "none"; - }; -}): Promise { - const brand = await ZendeskBrandResource.fetchByBrandId({ - connectorId, - brandId, - }); - - if (brand) { - if (permissions.helpCenterPermission === "read") { - await brand.grantHelpCenterPermissions(); - } - if (permissions.ticketsPermission === "read") { - await brand.grantTicketsPermissions(); - } - - return true; - } - - zendeskApiClient ||= createZendeskClient( - await getZendeskSubdomainAndAccessToken(connectionId) - ); - - const { - result: { brand: fetchedBrand }, - } = await zendeskApiClient.brand.show(brandId); - if (fetchedBrand) { - await ZendeskBrandResource.makeNew({ - blob: { - subdomain: fetchedBrand.subdomain, - connectorId: connectorId, - brandId: fetchedBrand.id, - name: fetchedBrand.name || "Brand", - ...permissions, - hasHelpCenter: fetchedBrand.has_help_center, - url: fetchedBrand.url, - }, - }); - return true; - } else { - logger.error( - { connectorId, brandId }, - "[Zendesk] Brand could not be fetched." - ); - return false; - } -} diff --git a/connectors/src/connectors/zendesk/lib/zendesk_api.ts b/connectors/src/connectors/zendesk/lib/zendesk_api.ts index 80f3db357db4..ac164346a8d9 100644 --- a/connectors/src/connectors/zendesk/lib/zendesk_api.ts +++ b/connectors/src/connectors/zendesk/lib/zendesk_api.ts @@ -7,7 +7,7 @@ import { createClient } from "node-zendesk"; import type { ZendeskFetchedArticle, ZendeskFetchedTicket, -} from "@connectors/connectors/zendesk/lib/node-zendesk-types"; +} from "@connectors/@types/node-zendesk"; import { ExternalOAuthTokenError } from "@connectors/lib/error"; import logger from "@connectors/logger/logger"; import { ZendeskBrandResource } from "@connectors/resources/zendesk_resources"; @@ -184,6 +184,47 @@ export async function fetchZendeskArticlesInCategory({ ); } +/** + * Fetches a batch of the recently updated tickets from the Zendesk API using the incremental API endpoint. + */ +export async function fetchRecentlyUpdatedTickets({ + subdomain, + accessToken, + startTime = null, + cursor = null, +}: // pass either a cursor or a start time, but not both +| { + subdomain: string; + accessToken: string; + startTime: number | null; + cursor?: never; + } + | { + subdomain: string; + accessToken: string; + startTime?: never; + cursor: string | null; + }): Promise<{ + tickets: ZendeskFetchedTicket[]; + end_of_stream: boolean; + after_cursor: string; +}> { + const response = await fetchFromZendeskWithRetries({ + url: + `https://${subdomain}.zendesk.com/api/v2/incremental/tickets/cursor.json` + + (cursor ? `?cursor=${cursor}` : "") + + (startTime ? `?start_time=${startTime}` : ""), + accessToken, + }); + return ( + response || { + tickets: [], + end_of_stream: false, + after_cursor: "", + } + ); +} + export async function fetchSolvedZendeskTicketsInBrand({ brandSubdomain, accessToken, diff --git a/connectors/src/connectors/zendesk/temporal/activities.ts b/connectors/src/connectors/zendesk/temporal/activities.ts index 4457fb899177..484ffe9ef773 100644 --- a/connectors/src/connectors/zendesk/temporal/activities.ts +++ b/connectors/src/connectors/zendesk/temporal/activities.ts @@ -4,24 +4,26 @@ import { getArticleInternalId, getTicketInternalId, } from "@connectors/connectors/zendesk/lib/id_conversions"; +import { syncArticle } from "@connectors/connectors/zendesk/lib/sync_article"; import { - _getZendeskCategoryOrRaise, - _getZendeskConnectorOrRaise, -} from "@connectors/connectors/zendesk/lib/utils"; + deleteTicket, + syncTicket, +} from "@connectors/connectors/zendesk/lib/sync_ticket"; import { getZendeskSubdomainAndAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token"; import { changeZendeskClientSubdomain, createZendeskClient, + fetchRecentlyUpdatedTickets, fetchSolvedZendeskTicketsInBrand, fetchZendeskArticlesInCategory, } from "@connectors/connectors/zendesk/lib/zendesk_api"; -import { syncArticle } from "@connectors/connectors/zendesk/temporal/sync_article"; -import { syncTicket } from "@connectors/connectors/zendesk/temporal/sync_ticket"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import { concurrentExecutor } from "@connectors/lib/async_utils"; import { deleteFromDataSource } from "@connectors/lib/data_sources"; +import { ZendeskTimestampCursors } from "@connectors/lib/models/zendesk"; import { syncStarted, syncSucceeded } from "@connectors/lib/sync_status"; import logger from "@connectors/logger/logger"; +import { ConnectorResource } from "@connectors/resources/connector_resource"; import { ZendeskArticleResource, ZendeskBrandResource, @@ -40,7 +42,10 @@ export async function saveZendeskConnectorStartSync({ }: { connectorId: ModelId; }) { - const connector = await _getZendeskConnectorOrRaise(connectorId); + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("[Zendesk] Connector not found."); + } const res = await syncStarted(connector.id); if (res.isErr()) { throw res.error; @@ -52,10 +57,29 @@ export async function saveZendeskConnectorStartSync({ */ export async function saveZendeskConnectorSuccessSync({ connectorId, + currentSyncDateMs, }: { connectorId: ModelId; + currentSyncDateMs: number; }) { - const connector = await _getZendeskConnectorOrRaise(connectorId); + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("[Zendesk] Connector not found."); + } + const cursors = await ZendeskTimestampCursors.findOne({ + where: { connectorId }, + }); + if (!cursors) { + // can be missing if the first sync was not within an incremental workflow + await ZendeskTimestampCursors.create({ + connectorId, + timestampCursor: new Date(currentSyncDateMs), // setting this as the start date of the sync (last successful sync) + }); + } else { + await cursors.update({ + timestampCursor: new Date(currentSyncDateMs), // setting this as the start date of the sync (last successful sync) + }); + } const res = await syncSucceeded(connector.id); if (res.isErr()) { throw res.error; @@ -81,7 +105,10 @@ export async function syncZendeskBrandActivity({ brandId: number; currentSyncDateMs: number; }): Promise<{ helpCenterAllowed: boolean; ticketsAllowed: boolean }> { - const connector = await _getZendeskConnectorOrRaise(connectorId); + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("[Zendesk] Connector not found."); + } const dataSourceConfig = dataSourceConfigFromConnector(connector); const brandInDb = await ZendeskBrandResource.fetchByBrandId({ @@ -205,6 +232,39 @@ export async function getAllZendeskBrandsIdsActivity({ return ZendeskBrandResource.fetchAllBrandIds({ connectorId }); } +/** + * Retrieves the timestamp cursor, which is the start date of the last successful sync. + */ +export async function getZendeskTimestampCursorActivity( + connectorId: ModelId +): Promise { + let cursors = await ZendeskTimestampCursors.findOne({ + where: { connectorId }, + }); + if (!cursors) { + cursors = await ZendeskTimestampCursors.create({ + connectorId, + timestampCursor: null, // start date of the last successful sync, null for now since we do not know it will succeed + }); + } + // we get a StartTimeTooRecent error before 1 minute + const minAgo = Date.now() - 60 * 1000; // 1 minute ago + return cursors.timestampCursor + ? new Date(Math.min(cursors.timestampCursor.getTime(), minAgo)) + : new Date(minAgo); +} + +/** + * Retrieves the IDs of every brand stored in db that has read permissions on their Tickets. + */ +export async function getZendeskTicketsAllowedBrandIdsActivity( + connectorId: ModelId +): Promise { + return ZendeskBrandResource.fetchTicketsAllowedBrandIds({ + connectorId, + }); +} + /** * Retrieves the categories for a given Brand. */ @@ -215,7 +275,10 @@ export async function getZendeskCategoriesActivity({ connectorId: ModelId; brandId: number; }): Promise { - const connector = await _getZendeskConnectorOrRaise(connectorId); + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("[Zendesk] Connector not found."); + } const client = createZendeskClient( await getZendeskSubdomainAndAccessToken(connector.connectionId) ); @@ -246,12 +309,20 @@ export async function syncZendeskCategoryActivity({ brandId: number; currentSyncDateMs: number; }): Promise { - const connector = await _getZendeskConnectorOrRaise(connectorId); + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("[Zendesk] Connector not found."); + } const dataSourceConfig = dataSourceConfigFromConnector(connector); - const categoryInDb = await _getZendeskCategoryOrRaise({ + const categoryInDb = await ZendeskCategoryResource.fetchByCategoryId({ connectorId, categoryId, }); + if (!categoryInDb) { + throw new Error( + `[Zendesk] Category not found, connectorId: ${connectorId}, categoryId: ${categoryId}` + ); + } // if all rights were revoked, we delete the category data. if (categoryInDb.permission === "none") { @@ -302,7 +373,10 @@ export async function syncZendeskArticleBatchActivity({ forceResync: boolean; cursor: string | null; }): Promise<{ hasMore: boolean; afterCursor: string | null }> { - const connector = await _getZendeskConnectorOrRaise(connectorId); + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("[Zendesk] Connector not found."); + } const dataSourceConfig = dataSourceConfigFromConnector(connector); const loggerArgs = { workspaceId: dataSourceConfig.workspaceId, @@ -310,10 +384,15 @@ export async function syncZendeskArticleBatchActivity({ provider: "zendesk", dataSourceId: dataSourceConfig.dataSourceId, }; - const category = await _getZendeskCategoryOrRaise({ + const category = await ZendeskCategoryResource.fetchByCategoryId({ connectorId, categoryId, }); + if (!category) { + throw new Error( + `[Zendesk] Category not found, connectorId: ${connectorId}, categoryId: ${categoryId}` + ); + } const { accessToken, subdomain } = await getZendeskSubdomainAndAccessToken( connector.connectionId @@ -374,7 +453,10 @@ export async function syncZendeskTicketBatchActivity({ forceResync: boolean; cursor: string | null; }): Promise<{ hasMore: boolean; afterCursor: string }> { - const connector = await _getZendeskConnectorOrRaise(connectorId); + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("[Zendesk] Connector not found."); + } const dataSourceConfig = dataSourceConfigFromConnector(connector); const loggerArgs = { workspaceId: dataSourceConfig.workspaceId, @@ -440,6 +522,79 @@ export async function syncZendeskTicketBatchActivity({ }; } +/** + * This activity is responsible for syncing the next batch of recently updated articles to process. + * It is based on the incremental endpoint, which returns a diff. + */ +export async function syncZendeskTicketUpdateBatchActivity({ + connectorId, + brandId, + startTime, + currentSyncDateMs, + cursor, +}: { + connectorId: ModelId; + brandId: number; + startTime: number; + currentSyncDateMs: number; + cursor: string | null; +}): Promise<{ hasMore: boolean; afterCursor: string | null }> { + const connector = await ConnectorResource.fetchById(connectorId); + if (!connector) { + throw new Error("[Zendesk] Connector not found."); + } + const dataSourceConfig = dataSourceConfigFromConnector(connector); + const loggerArgs = { + workspaceId: dataSourceConfig.workspaceId, + connectorId, + provider: "zendesk", + dataSourceId: dataSourceConfig.dataSourceId, + }; + + const { accessToken, subdomain } = await getZendeskSubdomainAndAccessToken( + connector.connectionId + ); + const zendeskApiClient = createZendeskClient({ accessToken, subdomain }); + const brandSubdomain = await changeZendeskClientSubdomain(zendeskApiClient, { + connectorId, + brandId, + }); + + const { tickets, after_cursor, end_of_stream } = + await fetchRecentlyUpdatedTickets({ + subdomain: brandSubdomain, + accessToken, + ...(cursor ? { cursor } : { startTime }), + }); + + await concurrentExecutor( + tickets, + async (ticket) => { + if (ticket.status === "deleted") { + return deleteTicket(connectorId, ticket, dataSourceConfig, loggerArgs); + } else if (ticket.status === "solved") { + const comments = await zendeskApiClient.tickets.getComments(ticket.id); + const { result: users } = await zendeskApiClient.users.showMany( + comments.map((c) => c.author_id) + ); + return syncTicket({ + connectorId, + ticket, + brandId, + users, + comments, + dataSourceConfig, + currentSyncDateMs, + loggerArgs, + forceResync: false, + }); + } + }, + { concurrency: 10 } + ); + return { hasMore: !end_of_stream, afterCursor: after_cursor }; +} + /** * Deletes all the data stored in the db and in the data source relative to a brand (category, articles and tickets). */ diff --git a/connectors/src/connectors/zendesk/temporal/workflows.ts b/connectors/src/connectors/zendesk/temporal/workflows.ts index e9dfffe8ae56..28a028bf6ee6 100644 --- a/connectors/src/connectors/zendesk/temporal/workflows.ts +++ b/connectors/src/connectors/zendesk/temporal/workflows.ts @@ -20,6 +20,7 @@ const { syncZendeskCategoryActivity, syncZendeskArticleBatchActivity, syncZendeskTicketBatchActivity, + syncZendeskTicketUpdateBatchActivity, } = proxyActivities({ startToCloseTimeout: "5 minutes", }); @@ -29,7 +30,8 @@ const { checkZendeskTicketsPermissionsActivity, saveZendeskConnectorStartSync, saveZendeskConnectorSuccessSync, - getAllZendeskBrandsIdsActivity, + getZendeskTicketsAllowedBrandIdsActivity, + getZendeskTimestampCursorActivity, } = proxyActivities({ startToCloseTimeout: "1 minute", }); @@ -96,6 +98,14 @@ export async function zendeskSyncWorkflow({ } ); + const { + workflowId, + searchAttributes: parentSearchAttributes, + memo, + } = workflowInfo(); + + const currentSyncDateMs = new Date().getTime(); + // If we got no signal, then we're on the scheduled execution if ( brandIds.size === 0 && @@ -103,18 +113,14 @@ export async function zendeskSyncWorkflow({ brandTicketsIds.size === 0 && categoryIds.size === 0 ) { - const allBrandIds = await getAllZendeskBrandsIdsActivity({ connectorId }); - allBrandIds.forEach((brandId) => brandIds.add(brandId)); + await executeChild(zendeskIncrementalSyncWorkflow, { + workflowId: `${workflowId}-incremental`, + searchAttributes: parentSearchAttributes, + args: [{ connectorId, currentSyncDateMs }], + memo, + }); } - const { - workflowId, - searchAttributes: parentSearchAttributes, - memo, - } = workflowInfo(); - - const currentSyncDateMs = new Date().getTime(); - // Async operations allow Temporal's event loop to process signals. // If a signal arrives during an async operation, it will update the set before the next iteration. while (brandIds.size > 0) { @@ -133,6 +139,8 @@ export async function zendeskSyncWorkflow({ memo, }); brandIds.delete(brandId); + brandHelpCenterIds.delete(brandId); + brandTicketsIds.delete(brandId); } } while (brandHelpCenterIds.size > 0) { @@ -197,7 +205,40 @@ export async function zendeskSyncWorkflow({ // run cleanup here if needed - await saveZendeskConnectorSuccessSync({ connectorId }); + await saveZendeskConnectorSuccessSync({ connectorId, currentSyncDateMs }); +} + +/** + * Syncs the tickets updated since the last scheduled execution. + */ +export async function zendeskIncrementalSyncWorkflow({ + connectorId, + currentSyncDateMs, +}: { + connectorId: ModelId; + currentSyncDateMs: number; +}) { + const [cursor, brandIds] = await Promise.all([ + getZendeskTimestampCursorActivity(connectorId), + getZendeskTicketsAllowedBrandIdsActivity(connectorId), + ]); + + const startTimeMs = cursor + ? new Date(cursor).getTime() // recasting the date since error may occur during Temporal's serialization + : currentSyncDateMs - 1000 * 60 * 5; // 5 min ago, previous scheduled execution + const startTime = Math.floor(startTimeMs / 1000); + + for (const brandId of brandIds) { + await runZendeskActivityWithPagination((cursor) => + syncZendeskTicketUpdateBatchActivity({ + connectorId, + startTime, + brandId, + currentSyncDateMs, + cursor, + }) + ); + } } /** diff --git a/connectors/src/lib/models/zendesk.ts b/connectors/src/lib/models/zendesk.ts index dfe3b9eba6b3..ab0cc781af5e 100644 --- a/connectors/src/lib/models/zendesk.ts +++ b/connectors/src/lib/models/zendesk.ts @@ -9,6 +9,54 @@ import { DataTypes, Model } from "sequelize"; import { sequelizeConnection } from "@connectors/resources/storage"; import { ConnectorModel } from "@connectors/resources/storage/models/connector_model"; +export class ZendeskTimestampCursors extends Model< + InferAttributes, + InferCreationAttributes +> { + declare id: CreationOptional; + declare createdAt: CreationOptional; + declare updatedAt: CreationOptional; + + declare timestampCursor: Date | null; // start date of the last successful sync, null if never successfully synced + + declare connectorId: ForeignKey; +} + +ZendeskTimestampCursors.init( + { + id: { + type: DataTypes.INTEGER, + autoIncrement: true, + primaryKey: true, + }, + createdAt: { + type: DataTypes.DATE, + allowNull: false, + defaultValue: DataTypes.NOW, + }, + updatedAt: { + type: DataTypes.DATE, + allowNull: false, + defaultValue: DataTypes.NOW, + }, + timestampCursor: { + type: DataTypes.DATE, + allowNull: true, + defaultValue: null, + }, + }, + { + sequelize: sequelizeConnection, + modelName: "zendesk_timestamp_cursors", + indexes: [{ fields: ["connectorId"], unique: true }], + } +); +ConnectorModel.hasMany(ZendeskTimestampCursors, { + foreignKey: { allowNull: false }, + onDelete: "RESTRICT", +}); +ZendeskTimestampCursors.belongsTo(ConnectorModel); + export class ZendeskConfiguration extends Model< InferAttributes, InferCreationAttributes @@ -204,7 +252,7 @@ ZendeskCategory.init( allowNull: false, }, description: { - type: DataTypes.STRING, + type: DataTypes.TEXT, allowNull: true, }, url: { @@ -371,7 +419,7 @@ ZendeskTicket.init( allowNull: false, }, subject: { - type: DataTypes.STRING, + type: DataTypes.TEXT, allowNull: false, }, ticketId: { diff --git a/connectors/src/resources/connector/zendesk.ts b/connectors/src/resources/connector/zendesk.ts index 483762d12e82..1b1d6c71d4f6 100644 --- a/connectors/src/resources/connector/zendesk.ts +++ b/connectors/src/resources/connector/zendesk.ts @@ -2,6 +2,7 @@ import type { ModelId } from "@dust-tt/types"; import type { Transaction } from "sequelize"; import type { ZendeskConfiguration } from "@connectors/lib/models/zendesk"; +import { ZendeskTimestampCursors } from "@connectors/lib/models/zendesk"; import type { ConnectorProviderConfigurationType, ConnectorProviderModelResourceMapping, @@ -44,6 +45,10 @@ export class ZendeskConnectorStrategy transaction ); await ZendeskBrandResource.deleteByConnectorId(connector.id, transaction); + await ZendeskTimestampCursors.destroy({ + where: { connectorId: connector.id }, + transaction, + }); await ZendeskConfigurationResource.deleteByConnectorId( connector.id, transaction diff --git a/connectors/src/resources/zendesk_resources.ts b/connectors/src/resources/zendesk_resources.ts index 075051639a93..1b2379823e74 100644 --- a/connectors/src/resources/zendesk_resources.ts +++ b/connectors/src/resources/zendesk_resources.ts @@ -263,6 +263,18 @@ export class ZendeskBrandResource extends BaseResource { return brands.map((brand) => new this(this.model, brand.get())); } + static async fetchTicketsAllowedBrandIds({ + connectorId, + }: { + connectorId: number; + }): Promise { + const brands = await ZendeskBrand.findAll({ + where: { connectorId, ticketsPermission: "read" }, + attributes: ["brandId"], + }); + return brands.map((brand) => brand.get().brandId); + } + static async deleteByConnectorId( connectorId: number, transaction: Transaction @@ -650,6 +662,16 @@ export class ZendeskTicketResource extends BaseResource { return tickets.map((ticket) => new this(this.model, ticket.get())); } + static async deleteByTicketId({ + connectorId, + ticketId, + }: { + connectorId: number; + ticketId: number; + }): Promise { + await ZendeskTicket.destroy({ where: { connectorId, ticketId } }); + } + static async deleteByBrandId({ connectorId, brandId,