Skip to content

Commit

Permalink
More robust connector update (#8489)
Browse files Browse the repository at this point in the history
* Gracefully handle GitHub repo not found

* Throw in `concurrentExecutor`

* 📖

* Add logs on Temporal workflows termination

* Unpause connector on connection update.

* Unpause connectors
  • Loading branch information
flvndvd authored Nov 6, 2024
1 parent 38c1c53 commit 04955b6
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 1 deletion.
5 changes: 5 additions & 0 deletions connectors/src/connectors/confluence/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ export class ConfluenceConnectorManager extends BaseConnectorManager<null> {
newConfluenceCloudInformation.id === currentCloudInformation.cloudId
) {
await connector.update({ connectionId });

// If connector was previously paused, unpause it.
if (connector.isPaused()) {
await this.unpause();
}
} else {
logger.info(
{
Expand Down
5 changes: 5 additions & 0 deletions connectors/src/connectors/github/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ export class GithubConnectorManager extends BaseConnectorManager<null> {

await c.update({ connectionId });

// If connector was previously paused, unpause it.
if (c.isPaused()) {
await this.unpause();
}

await launchGithubFullSyncWorkflow({
connectorId: this.connectorId,
syncCodeOnly: false,
Expand Down
5 changes: 5 additions & 0 deletions connectors/src/connectors/google_drive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ export class GoogleDriveConnectorManager extends BaseConnectorManager<null> {
}

await connector.update({ connectionId });

// If connector was previously paused, unpause it.
if (connector.isPaused()) {
await this.unpause();
}
}

return new Ok(connector.id.toString());
Expand Down
5 changes: 5 additions & 0 deletions connectors/src/connectors/intercom/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ export class IntercomConnectorManager extends BaseConnectorManager<null> {

await connector.update({ connectionId: newConnectionId });

// If connector was previously paused, unpause it.
if (connector.isPaused()) {
await this.unpause();
}

await IntercomWorkspace.update(
{
intercomWorkspaceId: newIntercomWorkspace.id,
Expand Down
8 changes: 7 additions & 1 deletion connectors/src/connectors/microsoft/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,16 @@ export class MicrosoftConnectorManager extends BaseConnectorManager<null> {
{
error: e,
},
`Error checking Microsoft organization - lets update the connector regardless`
"Error checking Microsoft organization - lets update the connector regardless"
);
}

await connector.update({ connectionId });

// If connector was previously paused, unpause it.
if (connector.isPaused()) {
await this.unpause();
}
}

return new Ok(connector.id.toString());
Expand Down
5 changes: 5 additions & 0 deletions connectors/src/connectors/notion/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ export class NotionConnectorManager extends BaseConnectorManager<null> {

await c.update({ connectionId });

// If connector was previously paused, unpause it.
if (c.isPaused()) {
await this.unpause();
}

const dataSourceConfig = dataSourceConfigFromConnector(c);
try {
await launchNotionSyncWorkflow(c.id);
Expand Down
5 changes: 5 additions & 0 deletions connectors/src/connectors/slack/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ export class SlackConnectorManager extends BaseConnectorManager<SlackConfigurati

await c.update(updateParams);

// If connector was previously paused, unpause it.
if (c.isPaused()) {
await this.unpause();
}

return new Ok(c.id.toString());
}

Expand Down
14 changes: 14 additions & 0 deletions connectors/src/lib/temporal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { Client, Connection, WorkflowNotFoundError } from "@temporalio/client";
import { NativeConnection } from "@temporalio/worker";
import fs from "fs-extra";

import logger from "@connectors/logger/logger";

// Assuming one cached workflows takes 2MB on average,
// we can cache 292 workflows in 4096MB, which is the max heap size
// we give to our temporal workers.
Expand Down Expand Up @@ -135,7 +137,19 @@ export async function terminateAllWorkflowsForConnectorId(
query: `ExecutionStatus = 'Running' AND connectorId = ${connectorId}`,
});

logger.info(
{
connectorId,
},
"About to terminate all workflows for connectorId"
);

for await (const handle of workflowInfos) {
logger.info(
{ connectorId, workflowId: handle.workflowId },
"Terminating Temporal workflow"
);

const workflowHandle = client.workflow.getHandle(handle.workflowId);
try {
await workflowHandle.terminate();
Expand Down

0 comments on commit 04955b6

Please sign in to comment.