Skip to content

Commit

Permalink
Flav/start notion connector worker on its own (#3933)
Browse files Browse the repository at this point in the history
* Starts Notion worker on its own

* Lower to 2 replicas

* 👕

* ✨

* Address comments from review
  • Loading branch information
flvndvd authored Feb 26, 2024
1 parent 96346cf commit 35df7ca
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 130 deletions.
179 changes: 71 additions & 108 deletions connectors/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion connectors/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
"tar": "^6.2.0",
"tsconfig-paths-webpack-plugin": "^4.1.0",
"turndown": "^7.1.2",
"uuid": "^9.0.0"
"uuid": "^9.0.0",
"yargs": "^17.7.2"
},
"devDependencies": {
"@types/eslint": "^8.21.3",
Expand All @@ -66,6 +67,7 @@
"@types/node": "^18.15.5",
"@types/tar": "^6.1.10",
"@types/turndown": "^5.0.4",
"@types/yargs": "^17.0.32",
"@typescript-eslint/eslint-plugin": "^5.56.0",
"@typescript-eslint/parser": "^5.56.0",
"eslint": "^8.36.0",
Expand Down
61 changes: 40 additions & 21 deletions connectors/src/start_worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import type { ConnectorProvider } from "@dust-tt/types";
import { setupGlobalErrorHandler } from "@dust-tt/types";
import yargs from "yargs";
import { hideBin } from "yargs/helpers";

import { runConfluenceWorker } from "@connectors/connectors/confluence/temporal/worker";
import { runWebCrawlerWorker } from "@connectors/connectors/webcrawler/temporal/worker";
Expand All @@ -13,24 +16,40 @@ import logger from "./logger/logger";

setupGlobalErrorHandler(logger);

runConfluenceWorker().catch((err) =>
logger.error(errorFromAny(err), "Error running confluence worker")
);
runSlackWorker().catch((err) =>
logger.error(errorFromAny(err), "Error running slack worker")
);
runNotionWorker().catch((err) =>
logger.error(errorFromAny(err), "Error running notion worker")
);
runGithubWorker().catch((err) =>
logger.error(errorFromAny(err), "Error running github worker")
);
runGoogleWorker().catch((err) =>
logger.error(errorFromAny(err), "Error running google worker")
);
runIntercomWorker().catch((err) =>
logger.error(errorFromAny(err), "Error running intercom worker")
);
runWebCrawlerWorker().catch((err) =>
logger.error(errorFromAny(err), "Error running webcrawler worker")
);
const workerFunctions: Record<ConnectorProvider, () => Promise<void>> = {
confluence: runConfluenceWorker,
github: runGithubWorker,
google_drive: runGoogleWorker,
intercom: runIntercomWorker,
notion: runNotionWorker,
slack: runSlackWorker,
webcrawler: runWebCrawlerWorker,
};

const ALL_WORKERS = Object.keys(workerFunctions) as ConnectorProvider[];

async function runWorkers(workers: ConnectorProvider[]) {
for (const worker of workers) {
workerFunctions[worker]().catch((err) =>
logger.error(errorFromAny(err), `Error running ${worker} worker.`)
);
}
}

yargs(hideBin(process.argv))
.option("workers", {
alias: "w",
type: "array",
choices: ALL_WORKERS,
default: ALL_WORKERS,
demandOption: true,
description: "Choose one or multiple workers to run.",
})
.help()
.alias("help", "h")
.parseAsync()
.then(async (args) => runWorkers(args.workers as ConnectorProvider[]))
.catch((err) => {
logger.error(errorFromAny(err), "Error running workers");
process.exit(1);
});
2 changes: 2 additions & 0 deletions k8s/apply_infra.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ kubectl apply -f "$(dirname "$0")/configmaps/front-worker-configmap.yaml"
kubectl apply -f "$(dirname "$0")/configmaps/front-edge-configmap.yaml"
kubectl apply -f "$(dirname "$0")/configmaps/connectors-configmap.yaml"
kubectl apply -f "$(dirname "$0")/configmaps/connectors-worker-configmap.yaml"
kubectl apply -f "$(dirname "$0")/configmaps/connectors-worker-specific-configmap.yaml"
kubectl apply -f "$(dirname "$0")/configmaps/connectors-edge-configmap.yaml"
kubectl apply -f "$(dirname "$0")/configmaps/docs-configmap.yaml"
kubectl apply -f "$(dirname "$0")/configmaps/alerting-temporal-configmap.yaml"
Expand Down Expand Up @@ -94,6 +95,7 @@ apply_deployment front-worker-deployment
apply_deployment front-edge-deployment
apply_deployment connectors-deployment
apply_deployment connectors-worker-deployment
apply_deployment connectors-worker-notion-deployment
apply_deployment connectors-edge-deployment
apply_deployment docs-deployment
apply_deployment metabase-deployment
Expand Down
11 changes: 11 additions & 0 deletions k8s/configmaps/connectors-worker-specific-configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: connectors-worker-specific-config
data:
DD_ENV: "prod"
DD_SERVICE: "connectors-worker"
NODE_OPTIONS: "-r dd-trace/init --max-old-space-size=3276"
DD_LOGS_INJECTION: "true"
DD_RUNTIME_METRICS_ENABLED: "true"
NODE_ENV: "production"
Loading

0 comments on commit 35df7ca

Please sign in to comment.