Skip to content

Commit

Permalink
[connectors] Rework Zendesk full resync (#8899)
Browse files Browse the repository at this point in the history
* enh: move the launchFullSync logic to a function that can be triggered with forceResync = true (sync) or false (unpause, resume)

* 📝

* 📝

* feat: implement fromTs

* refactor: update stopZendeskWorkflows to take richer types in input

* fix: update the name of the ZendeskTimestampCursor model

* 📝

* 💄 improve the error message on fromTs without a cursor

* fix: delete the cursor when using SYNC without a fromTs

* fix: remove the forceResync when using fromTs
  • Loading branch information
aubin-tchoi authored Nov 26, 2024
1 parent 9f2b401 commit f1fdd03
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 43 deletions.
91 changes: 58 additions & 33 deletions connectors/src/connectors/zendesk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ import {
import { getZendeskSubdomainAndAccessToken } from "@connectors/connectors/zendesk/lib/zendesk_access_token";
import { fetchZendeskCurrentUser } from "@connectors/connectors/zendesk/lib/zendesk_api";
import {
launchZendeskFullSyncWorkflow,
launchZendeskGarbageCollectionWorkflow,
launchZendeskSyncWorkflow,
stopZendeskWorkflows,
} from "@connectors/connectors/zendesk/temporal/client";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { concurrentExecutor } from "@connectors/lib/async_utils";
import { ExternalOAuthTokenError } from "@connectors/lib/error";
import { ZendeskTimestampCursor } from "@connectors/lib/models/zendesk";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import {
Expand Down Expand Up @@ -158,6 +159,9 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
return new Ok(connector.id.toString());
}

/**
* Deletes the connector and all its related resources.
*/
async clean(): Promise<Result<undefined, Error>> {
const { connectorId } = this;
const connector = await ConnectorResource.fetchById(connectorId);
Expand All @@ -176,10 +180,23 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
return result;
}

/**
* Stops all workflows related to the connector (sync and garbage collection).
*/
async stop(): Promise<Result<undefined, Error>> {
return stopZendeskWorkflows(this.connectorId);
const { connectorId } = this;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
throw new Error(
`[Zendesk] Connector not found. ConnectorId: ${connectorId}`
);
}
return stopZendeskWorkflows(connector);
}

/**
* Launches an incremental workflow (sync workflow without signals) and the garbage collection workflow for the connector.
*/
async resume(): Promise<Result<undefined, Error>> {
const { connectorId } = this;
const connector = await ConnectorResource.fetchById(connectorId);
Expand Down Expand Up @@ -216,46 +233,39 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {

/**
* Launches a full re-sync workflow for the connector,
* restarting workflows with the signals necessary to resync every resource selected by the user.
* It sends signals for all the brands and for all the categories whose Help Center is not selected as a whole.
* syncing every resource selected by the user with forceResync = true.
*/
async sync(): Promise<Result<string, Error>> {
async sync({
fromTs,
}: {
fromTs: number | null;
}): Promise<Result<string, Error>> {
const { connectorId } = this;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}

// syncing all the brands syncs tickets and whole help centers when selected
const brandIds = await ZendeskBrandResource.fetchAllBrandIds(connectorId);
const noReadHelpCenterBrandIds =
await ZendeskBrandResource.fetchHelpCenterReadForbiddenBrandIds(
connectorId
);
// syncing individual categories syncs for ones where the Help Center is not selected as a whole
const categoryIds = (
await concurrentExecutor(
noReadHelpCenterBrandIds,
async (brandId) => {
const categoryIds =
await ZendeskCategoryResource.fetchReadOnlyCategoryIdsByBrandId({
connectorId,
brandId,
});
return categoryIds.map((categoryId) => ({ categoryId, brandId }));
},
{ concurrency: 10 }
)
).flat();

const result = await launchZendeskSyncWorkflow(connector, {
brandIds,
categoryIds,
forceResync: true,
});
// launching an incremental workflow taking the diff starting from the given timestamp
if (fromTs) {
const cursors = await ZendeskTimestampCursor.findOne({
where: { connectorId },
});
if (!cursors) {
throw new Error(
"[Zendesk] Cannot use fromTs on a connector that has never completed an initial sync."
);
}
await cursors.update({ timestampCursor: new Date(fromTs) });
const result = await launchZendeskSyncWorkflow(connector);
return result.isErr() ? result : new Ok(connector.id.toString());
} else {
await ZendeskTimestampCursor.destroy({ where: { connectorId } });
}

return result.isErr() ? result : new Ok(connector.id.toString());
// launching a full sync workflow otherwise
return launchZendeskFullSyncWorkflow(connector, { forceResync: true });
}

async retrievePermissions({
Expand Down Expand Up @@ -285,6 +295,11 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
}
}

/**
* Updates the permissions stored in db,
* then launches a sync workflow with the signals
* corresponding to the resources that were modified to reflect the changes.
*/
async setPermissions({
permissions,
}: {
Expand Down Expand Up @@ -443,6 +458,9 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
return new Ok(undefined);
}

/**
* Retrieves a batch of content nodes given their internal IDs.
*/
async retrieveBatchContentNodes({
internalIds,
}: {
Expand Down Expand Up @@ -633,6 +651,9 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
throw new Error("Method not implemented.");
}

/**
* Marks the connector as paused in db and stops all workflows.
*/
async pause(): Promise<Result<undefined, Error>> {
const { connectorId } = this;
const connector = await ConnectorResource.fetchById(connectorId);
Expand All @@ -644,13 +665,17 @@ export class ZendeskConnectorManager extends BaseConnectorManager<null> {
return this.stop();
}

/**
* Marks the connector as unpaused in db and restarts the workflows.
*/
async unpause(): Promise<Result<undefined, Error>> {
const { connectorId } = this;
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Zendesk] Connector not found.");
return new Err(new Error("Connector not found"));
}
// reset the cursor here to trigger a full resync
await connector.markAsUnpaused();
return this.resume();
}
Expand Down
60 changes: 50 additions & 10 deletions connectors/src/connectors/zendesk/temporal/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ import {
zendeskGarbageCollectionWorkflow,
zendeskSyncWorkflow,
} from "@connectors/connectors/zendesk/temporal/workflows";
import { concurrentExecutor } from "@connectors/lib/async_utils";
import { getTemporalClient } from "@connectors/lib/temporal";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import type { ConnectorResource } from "@connectors/resources/connector_resource";
import {
ZendeskBrandResource,
ZendeskCategoryResource,
} from "@connectors/resources/zendesk_resources";

export function getZendeskSyncWorkflowId(connectorId: ModelId): string {
return `zendesk-sync-${connectorId}`;
Expand Down Expand Up @@ -106,18 +111,13 @@ export async function launchZendeskSyncWorkflow(
}

export async function stopZendeskWorkflows(
connectorId: ModelId
connector: ConnectorResource
): Promise<Result<undefined, Error>> {
const client = await getTemporalClient();
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
throw new Error(
`[Zendesk] Connector not found. ConnectorId: ${connectorId}`
);
}

const workflowIds = [
getZendeskSyncWorkflowId(connectorId),
getZendeskGarbageCollectionWorkflowId(connectorId),
getZendeskSyncWorkflowId(connector.id),
getZendeskGarbageCollectionWorkflowId(connector.id),
];
for (const workflowId of workflowIds) {
try {
Expand All @@ -141,6 +141,46 @@ export async function stopZendeskWorkflows(
return new Ok(undefined);
}

/**
* Launches a Zendesk workflow that will sync everything that was selected by the user in the UI.
*
* It recreates the signals necessary to resync every resource selected by the user,
* which are all the brands and all the categories whose Help Center is not selected as a whole.
*/
export async function launchZendeskFullSyncWorkflow(
connector: ConnectorResource,
{ forceResync = false }: { forceResync?: boolean } = {}
): Promise<Result<string, Error>> {
const brandIds = await ZendeskBrandResource.fetchAllBrandIds(connector.id);
const noReadHelpCenterBrandIds =
await ZendeskBrandResource.fetchHelpCenterReadForbiddenBrandIds(
connector.id
);
// syncing individual categories syncs for ones where the Help Center is not selected as a whole
const categoryIds = (
await concurrentExecutor(
noReadHelpCenterBrandIds,
async (brandId) => {
const categoryIds =
await ZendeskCategoryResource.fetchReadOnlyCategoryIdsByBrandId({
connectorId: connector.id,
brandId,
});
return categoryIds.map((categoryId) => ({ categoryId, brandId }));
},
{ concurrency: 10 }
)
).flat();

const result = await launchZendeskSyncWorkflow(connector, {
brandIds,
categoryIds,
forceResync,
});

return result.isErr() ? result : new Ok(connector.id.toString());
}

export async function launchZendeskGarbageCollectionWorkflow(
connector: ConnectorResource
): Promise<Result<undefined, Error>> {
Expand Down

0 comments on commit f1fdd03

Please sign in to comment.