Skip to content

Commit

Permalink
[connectors] Implement incremental sync for the Zendesk articles (#8688)
Browse files Browse the repository at this point in the history
* feat: add a method that fetches the recently updated articles

* feat: add activities to sync the recently updated articles

* feat: add the article diff sync to the incremental sync workflow

* refactor: rename syncZendeskRecentlyUpdatedArticleBatchActivity into a shorter
word

* fix: pass the startTime in the incremental endpoint in seconds instead of ms

* fix: implement time-based pagination (cursor-based not supported for incremental articles

* refactor: simplify the naming

* refactor: prevent StartTimeTooRecent errors

* docs: add small note

* bump the workflow version

* feat: add functions to delete articles

* refactor: remove unused activity

* fix: fix how we handle StartTimeTooRecent errors

* fix: delete articles/tickets from data sources before deleting in pg
  • Loading branch information
aubin-tchoi authored Nov 18, 2024
1 parent 387f236 commit fe5e87c
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 24 deletions.
29 changes: 29 additions & 0 deletions connectors/src/connectors/zendesk/lib/sync_article.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
} from "@connectors/@types/node-zendesk";
import { getArticleInternalId } from "@connectors/connectors/zendesk/lib/id_conversions";
import {
deleteFromDataSource,
renderDocumentTitleAndContent,
renderMarkdownSection,
upsertToDatasource,
Expand All @@ -19,6 +20,34 @@ import type { DataSourceConfig } from "@connectors/types/data_source_config";

const turndownService = new TurndownService();

/**
* Deletes an article from the db and the data sources.
*/
export async function deleteArticle(
connectorId: ModelId,
article: ZendeskFetchedArticle,
dataSourceConfig: DataSourceConfig,
loggerArgs: Record<string, string | number | null>
): Promise<void> {
logger.info(
{
...loggerArgs,
connectorId,
articleId: article.id,
name: article.name,
},
"[Zendesk] Deleting article."
);
await deleteFromDataSource(
dataSourceConfig,
getArticleInternalId(connectorId, article.id)
);
await ZendeskArticleResource.deleteByArticleId({
connectorId,
articleId: article.id,
});
}

/**
* Syncs an article from Zendesk to the postgres db and to the data sources.
*/
Expand Down
18 changes: 8 additions & 10 deletions connectors/src/connectors/zendesk/lib/sync_ticket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@ export async function deleteTicket(
},
"[Zendesk] Deleting ticket."
);
await Promise.all([
ZendeskTicketResource.deleteByTicketId({
connectorId,
ticketId: ticket.id,
}),
deleteFromDataSource(
dataSourceConfig,
getTicketInternalId(connectorId, ticket.id)
),
]);
await deleteFromDataSource(
dataSourceConfig,
getTicketInternalId(connectorId, ticket.id)
);
await ZendeskTicketResource.deleteByTicketId({
connectorId,
ticketId: ticket.id,
});
}

/**
Expand Down
30 changes: 30 additions & 0 deletions connectors/src/connectors/zendesk/lib/zendesk_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,36 @@ async function fetchFromZendeskWithRetries({
return response;
}

/**
* Fetches a batch of the recently updated articles from the Zendesk API using the incremental API endpoint.
*/
export async function fetchRecentlyUpdatedArticles({
subdomain,
accessToken,
startTime, // start time in Unix epoch time, in seconds
}: {
subdomain: string;
accessToken: string;
startTime: number;
}): Promise<{
articles: ZendeskFetchedArticle[];
next_page: string | null;
end_time: number;
}> {
// this endpoint retrieves changes in content despite what is mentioned in the documentation.
const response = await fetchFromZendeskWithRetries({
url: `https://${subdomain}.zendesk.com/api/v2/help_center/incremental/articles.json?start_time=${startTime}`,
accessToken,
});
return (
response || {
articles: [],
next_page: null,
end_time: startTime,
}
);
}

/**
* Fetches a batch of articles in a category from the Zendesk API.
*/
Expand Down
100 changes: 89 additions & 11 deletions connectors/src/connectors/zendesk/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { getZendeskSubdomainAndAccessToken } from "@connectors/connectors/zendes
import {
changeZendeskClientSubdomain,
createZendeskClient,
fetchRecentlyUpdatedArticles,
fetchRecentlyUpdatedTickets,
fetchSolvedZendeskTicketsInBrand,
fetchZendeskArticlesInCategory,
Expand Down Expand Up @@ -221,17 +222,6 @@ export async function checkZendeskTicketsPermissionsActivity({
return brandInDb.ticketsPermission === "read";
}

/**
* Retrieves the IDs of every brand stored in db.
*/
export async function getAllZendeskBrandsIdsActivity({
connectorId,
}: {
connectorId: ModelId;
}): Promise<number[]> {
return ZendeskBrandResource.fetchAllBrandIds({ connectorId });
}

/**
* Retrieves the timestamp cursor, which is the start date of the last successful sync.
*/
Expand All @@ -254,6 +244,17 @@ export async function getZendeskTimestampCursorActivity(
: new Date(minAgo);
}

/**
* Retrieves the IDs of every brand stored in db that has read permissions on their Help Center.
*/
export async function getZendeskHelpCenterReadAllowedBrandIdsActivity(
connectorId: ModelId
): Promise<number[]> {
return ZendeskBrandResource.fetchHelpCenterReadAllowedBrandIds({
connectorId,
});
}

/**
* Retrieves the IDs of every brand stored in db that has read permissions on their Tickets.
*/
Expand Down Expand Up @@ -525,6 +526,83 @@ export async function syncZendeskTicketBatchActivity({
/**
* This activity is responsible for syncing the next batch of recently updated articles to process.
* It is based on the incremental endpoint, which returns a diff.
* @returns The next start time if there is any more data to fetch, null otherwise.
*/
export async function syncZendeskArticleUpdateBatchActivity({
connectorId,
brandId,
currentSyncDateMs,
startTime,
}: {
connectorId: ModelId;
brandId: number;
currentSyncDateMs: number;
startTime: number;
}): Promise<number | null> {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
throw new Error("[Zendesk] Connector not found.");
}
const dataSourceConfig = dataSourceConfigFromConnector(connector);
const loggerArgs = {
workspaceId: dataSourceConfig.workspaceId,
connectorId,
provider: "zendesk",
dataSourceId: dataSourceConfig.dataSourceId,
};

const { accessToken, subdomain } = await getZendeskSubdomainAndAccessToken(
connector.connectionId
);
const zendeskApiClient = createZendeskClient({ accessToken, subdomain });
const brandSubdomain = await changeZendeskClientSubdomain(zendeskApiClient, {
connectorId,
brandId,
});

const { articles, end_time, next_page } = await fetchRecentlyUpdatedArticles({
subdomain: brandSubdomain,
accessToken,
startTime,
});

await concurrentExecutor(
articles,
async (article) => {
const { result: section } =
await zendeskApiClient.helpcenter.sections.show(article.section_id);
const { result: user } = await zendeskApiClient.users.show(
article.author_id
);

if (section.category_id) {
const category = await ZendeskCategoryResource.fetchByCategoryId({
connectorId,
categoryId: section.category_id,
});
if (category && category.permission === "read") {
return syncArticle({
connectorId,
category,
article,
section,
user,
dataSourceConfig,
currentSyncDateMs,
loggerArgs,
forceResync: false,
});
}
}
},
{ concurrency: 10 }
);
return next_page !== null ? end_time : null;
}

/**
* This activity is responsible for syncing the next batch of recently updated tickets to process.
* It is based on the incremental endpoint, which returns a diff.
*/
export async function syncZendeskTicketUpdateBatchActivity({
connectorId,
Expand Down
2 changes: 1 addition & 1 deletion connectors/src/connectors/zendesk/temporal/config.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export const WORKFLOW_VERSION = 3;
export const WORKFLOW_VERSION = 4;
export const QUEUE_NAME = `zendesk-queue-v${WORKFLOW_VERSION}`;
19 changes: 17 additions & 2 deletions connectors/src/connectors/zendesk/temporal/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ const {
syncZendeskArticleBatchActivity,
syncZendeskTicketBatchActivity,
syncZendeskTicketUpdateBatchActivity,
syncZendeskArticleUpdateBatchActivity,
} = proxyActivities<typeof activities>({
startToCloseTimeout: "5 minutes",
});

const {
checkZendeskHelpCenterPermissionsActivity,
checkZendeskTicketsPermissionsActivity,
getZendeskHelpCenterReadAllowedBrandIdsActivity,
saveZendeskConnectorStartSync,
saveZendeskConnectorSuccessSync,
getZendeskTicketsAllowedBrandIdsActivity,
Expand Down Expand Up @@ -218,17 +220,30 @@ export async function zendeskIncrementalSyncWorkflow({
connectorId: ModelId;
currentSyncDateMs: number;
}) {
const [cursor, brandIds] = await Promise.all([
const [cursor, ticketBrandIds, helpCenterBrandIds] = await Promise.all([
getZendeskTimestampCursorActivity(connectorId),
getZendeskTicketsAllowedBrandIdsActivity(connectorId),
getZendeskHelpCenterReadAllowedBrandIdsActivity(connectorId),
]);

const startTimeMs = cursor
? new Date(cursor).getTime() // recasting the date since error may occur during Temporal's serialization
: currentSyncDateMs - 1000 * 60 * 5; // 5 min ago, previous scheduled execution
const startTime = Math.floor(startTimeMs / 1000);

for (const brandId of brandIds) {
for (const brandId of helpCenterBrandIds) {
let articleSyncStartTime: number | null = startTime;
while (articleSyncStartTime !== null) {
articleSyncStartTime = await syncZendeskArticleUpdateBatchActivity({
connectorId,
brandId,
currentSyncDateMs,
startTime: articleSyncStartTime,
});
}
}

for (const brandId of ticketBrandIds) {
await runZendeskActivityWithPagination((cursor) =>
syncZendeskTicketUpdateBatchActivity({
connectorId,
Expand Down
21 changes: 21 additions & 0 deletions connectors/src/resources/zendesk_resources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,18 @@ export class ZendeskBrandResource extends BaseResource<ZendeskBrand> {
return brands.map((brand) => brand.get().brandId);
}

static async fetchHelpCenterReadAllowedBrandIds({
connectorId,
}: {
connectorId: number;
}): Promise<number[]> {
const brands = await ZendeskBrand.findAll({
where: { connectorId, helpCenterPermission: "read" },
attributes: ["brandId"],
});
return brands.map((brand) => brand.get().brandId);
}

static async fetchAllWithHelpCenter({
connectorId,
}: {
Expand Down Expand Up @@ -856,6 +868,15 @@ export class ZendeskArticleResource extends BaseResource<ZendeskArticle> {
return articles.map((article) => new this(this.model, article.get()));
}

static async deleteByArticleId({
connectorId,
articleId,
}: {
connectorId: number;
articleId: number;
}) {
await ZendeskArticle.destroy({ where: { connectorId, articleId } });
}
static async deleteByCategoryId({
connectorId,
categoryId,
Expand Down

0 comments on commit fe5e87c

Please sign in to comment.