Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
RIP pubsub (almost) (#91)
Browse files Browse the repository at this point in the history
* Add a retry "decorator" for retrying on errors with an exponential backoff

Don't retry during feature tests

* Remove pubsub specific files

* Move cloud tasks functions out of folder

* Remove unnecessary recipe repo logic

* Remove pubsub tests

* Rename cloud tasks tests folder

* Bring back pubsub logic to dlx

* Move cloud tasks utils to utils folder

* Remove old logger

* Remove unused packages

* Bump version

* Format
  • Loading branch information
MattiasOlla authored Jul 3, 2024
1 parent 7c9f6f7 commit 44cedc7
Show file tree
Hide file tree
Showing 51 changed files with 426 additions and 4,253 deletions.
20 changes: 9 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ import express from "express";
import expressPromiseRouter from "express-promise-router";
import config from "exp-config";
import assert from "assert";

import "express-async-errors";
import { init } from "./lib/recipe-repo.js";
import buildLogger from "./lib/logger.js";
import messageHandler from "./lib/message-handler.js";
import { trigger } from "./lib/trigger-handler.js";
import cloudTasksRouter from "./lib/cloud-tasks/router.js";
import { logger } from "lu-logger";

import { validate } from "./lib/recipe-repo.js";
import cloudTasksRouter from "./lib/router.js";

export { default as buildContext } from "./lib/context.js";

Expand All @@ -26,7 +24,7 @@ export function start({ recipes, triggers, startServer = true }) {
// use PubSubs message size limit
app.use(express.json({ limit: "32mb" }));

const recipeMap = init(recipes, triggers);
validate(recipes, triggers);

router.use((req, _, next) => {
// middleware to handle requests via a proxy
Expand All @@ -40,9 +38,9 @@ export function start({ recipes, triggers, startServer = true }) {
res.send("Im alive - som fan!");
});

router.post("/message", messageHandler.bind(messageHandler, recipeMap));
router.post("/trigger/:namespace/:sequence", trigger.bind(trigger, recipeMap));
router.post("/trigger/:name", trigger.bind(trigger, recipeMap));
router.get("/_status", (req, res) => {
res.send({ status: "ok" });
});

app.use(router);
app.use("/v2", cloudTasksRouter(recipes, triggers));
Expand All @@ -51,7 +49,7 @@ export function start({ recipes, triggers, startServer = true }) {
if (startServer) {
const port = process.env.PORT || 8080;
app.listen(port, () => {
buildLogger().info(`${config.appName}: listening on port ${port}, env ${config.envName}`);
logger.info(`${config.appName}: listening on port ${port}, env ${config.envName}`);
});
}
/* c8 ignore stop */
Expand Down
18 changes: 0 additions & 18 deletions lib/cloud-tasks/dlx.js

This file was deleted.

3 changes: 1 addition & 2 deletions lib/context.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import assert from "assert";
import { logger } from "lu-logger";

import httpClient from "./http-client.js";
import buildLogger from "./logger.js";
import { findAttribute, findOrReject } from "./find-attributes.js";

export default function buildContext(correlationId, key) {
const http = httpClient(correlationId, key);
const logger = buildLogger(correlationId, key);
return {
correlationId,
retryIf,
Expand Down
27 changes: 27 additions & 0 deletions lib/dlx.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import config from "exp-config";
import { PubSub } from "@google-cloud/pubsub";
import { logger } from "lu-logger";

import { filterUndefinedNullValues } from "./utils/sequences.js";

const pubSubClient = new PubSub();

export async function sendToDlx(req, errorMessage) {
const message = errorMessage || req.body?.error?.message;
const attributes = Object.fromEntries(
Object.entries(filterUndefinedNullValues(req.attributes)).map(([ key, value ]) => [ key, value.toString() ])
);

const pubsubClient = pubSubClient.topic(config.deadLetterTopic);

try {
await pubsubClient.publishMessage({
json: { ...req.body, error: { ...req.body?.error, message } },
attributes: { origin: "cloudTasks", appName: config.appName, ...attributes },
});
logger.warning("Sent message to DLX");
} catch (err) {
logger.error(`Error publishing PubSub message: "${err}". Full message: ${JSON.stringify(message)}`);
throw err;
}
}
File renamed without changes.
5 changes: 1 addition & 4 deletions lib/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ import util from "util";
import config from "exp-config";
import axios from "axios";
import assert from "assert";

import buildLogger from "./logger.js";
import { logger } from "lu-logger";

const auth = new GoogleAuth.GoogleAuth();

async function performRequest(method, params) {
const logger = buildLogger(params.correlationId, params.key);

const { url, audience } = baseOpts(params);
logger.info(`HTTP ${method}, ${url}, params: ${logFriendlyParams(params)}`);

Expand Down
21 changes: 9 additions & 12 deletions lib/job-storage/firestore-job-storage.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { Firestore } from "@google-cloud/firestore";
import { logger } from "lu-logger";

import { bucketHash, parentPayload, scanForInvalidKeys } from "./utils/job-storage-helper.js";
import buildLogger from "../logger.js";

const db = new Firestore();
db.settings({ ignoreUndefinedProperties: true });

// We store a parent and all child jobs to be started.
async function storeParent(parentCorrelationId, children, message, nextKey) {
const logger = buildLogger(parentCorrelationId, "storeParent");
logger.info(`Storing parent ${parentCorrelationId} with ${children?.length} children`);
scanForInvalidKeys(message);
await db
Expand All @@ -19,8 +18,7 @@ async function storeParent(parentCorrelationId, children, message, nextKey) {

// When a child is completed we add the correlation id to the child
// we use a bunch of hash buckets to avoid contention on the parent document
async function completedChild({ correlationId, parentCorrelationId, key }) {
const logger = buildLogger(correlationId, key);
async function completedChild({ correlationId, parentCorrelationId }) {
const bucket = bucketHash(correlationId);
const bucketRef = await db.collection(parentCorrelationId).doc(bucket);

Expand Down Expand Up @@ -50,12 +48,12 @@ async function completedChild({ correlationId, parentCorrelationId, key }) {

async function parentIsComplete({ parentCorrelationId, key, siblingCount }) {
const originalCorrelationId = parentCorrelationId.split(":").slice(1).join(":");
const logger = buildLogger(originalCorrelationId, key);
try {
// first we'll check if we're done
const completedJobs = await completedCheck(parentCorrelationId);
logger.info(
`Have currently completed ${completedJobs?.length} of ${siblingCount} jobs for parent ${parentCorrelationId} on ${key}`
`Have currently completed ${completedJobs?.length} of ${siblingCount} jobs for parent ${parentCorrelationId} on ${key}`,
{ correlationId: originalCorrelationId }
);
const allChildrenComplete = completedJobs?.length === Number(siblingCount);

Expand All @@ -68,7 +66,7 @@ async function parentIsComplete({ parentCorrelationId, key, siblingCount }) {
}
return { isLast: allChildrenComplete, parentData, completedJobCount: completedJobs?.length };
} catch (e) {
logger.error(`Parent is complete failed ${e}`);
logger.error(`Parent is complete failed ${e}`, { correlationId: originalCorrelationId });
throw e;
}
}
Expand All @@ -87,12 +85,11 @@ async function completedCheck(parentCorrelationId) {

async function removeParent(parentCorrelationId) {
const originalCorrelationId = parentCorrelationId.split(":").slice(1).join(":");
const logger = buildLogger(originalCorrelationId, "removeParent");
try {
// remove the parent document from the processed collection
const parentDoc = await db.collection("processed").doc(parentCorrelationId).get();
if (!parentDoc.exists) {
logger.warn(`Parent ${parentCorrelationId} not found, it has probably been deleted already. Exiting.`);
logger.warn(`Parent ${parentCorrelationId} not found, it has probably been deleted already. Exiting.`, { correlationId: originalCorrelationId });
return false;
}
await parentDoc.ref.delete();
Expand All @@ -103,17 +100,17 @@ async function removeParent(parentCorrelationId) {

let snapshot;
while ((snapshot = await query.get()).size > 0) {
logger.info(`Removing ${snapshot.size} children for parent ${parentCorrelationId}`);
logger.info(`Removing ${snapshot.size} children for parent ${parentCorrelationId}`, { correlationId: originalCorrelationId });
const batch = db.batch();
snapshot.docs.forEach((doc) => {
batch.delete(doc.ref);
});
await batch.commit();
}
logger.info(`Removed all children for parent ${parentCorrelationId}`);
logger.info(`Removed all children for parent ${parentCorrelationId}`, { correlationId: originalCorrelationId });
return true;
} catch (e) {
logger.error(`Remove parent failed ${e}`);
logger.error(`Remove parent failed ${e}`, { correlationId: originalCorrelationId });
throw e;
}
}
Expand Down
20 changes: 10 additions & 10 deletions lib/job-storage/memory-job-storage.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
// we try and copy the functionality of firestore here as best we can
import buildLogger from "../logger.js";
import { logger } from "lu-logger";

import { bucketHash, parentPayload, scanForInvalidKeys } from "./utils/job-storage-helper.js";

let db = { processed: {}, idempotencyLocks: {} };

function storeParent(parentCorrelationId, children, message, nextKey) {
const logger = buildLogger(parentCorrelationId, "storeParent");
logger.info(`Storing parent ${parentCorrelationId} with ${children?.length} children`);
scanForInvalidKeys(message);
if (db.processed[parentCorrelationId]) {
const error = new Error(`6 ALREADY_EXISTS: Document already exists: memory/databases/(default)/documents/processed/${parentCorrelationId}`);
const error = new Error(
`6 ALREADY_EXISTS: Document already exists: memory/databases/(default)/documents/processed/${parentCorrelationId}`
);
error.code = "already-exists";
throw error;
}
db.processed[parentCorrelationId] = parentPayload(message, nextKey, children, "memory");
}

function completedChild({ correlationId, parentCorrelationId, key }) {
const logger = buildLogger(correlationId, key);
function completedChild({ correlationId, parentCorrelationId }) {
try {
if (!db[parentCorrelationId]) db[parentCorrelationId] = {};

Expand All @@ -39,11 +40,11 @@ function completedChild({ correlationId, parentCorrelationId, key }) {

function parentIsComplete({ parentCorrelationId, key, siblingCount }) {
const originalCorrelationId = parentCorrelationId.split(":").slice(1).join(":");
const logger = buildLogger(originalCorrelationId, key);
try {
const completedJobs = completedCheck(parentCorrelationId);
logger.info(
`Have currently completed ${completedJobs?.length} of ${siblingCount} jobs for parent ${parentCorrelationId} on ${key}`
`Have currently completed ${completedJobs?.length} of ${siblingCount} jobs for parent ${parentCorrelationId} on ${key}`,
{ correlationId: originalCorrelationId }
);
const allChildrenComplete = completedJobs?.length === Number(siblingCount);

Expand All @@ -57,7 +58,7 @@ function parentIsComplete({ parentCorrelationId, key, siblingCount }) {
return { isLast: allChildrenComplete, parentData, completedJobCount: completedJobs?.length };
} catch (e) {
/* c8 ignore next 3 */
logger.error(`Parent is complete failed ${e}`);
logger.error(`Parent is complete failed ${e}`, { correlationId: originalCorrelationId });
throw e;
}
}
Expand All @@ -77,8 +78,7 @@ function completedCheck(parentCorrelationId) {

function removeParent(parentCorrelationId) {
const originalCorrelationId = parentCorrelationId.split(":").slice(1).join(":");
const logger = buildLogger(originalCorrelationId, "removeParent");
logger.info(`Removing parent ${parentCorrelationId}`);
logger.info(`Removing parent ${parentCorrelationId}`, { correlationId: originalCorrelationId });
// we don't delete anything in the memory store, since we want the data to be available for testing
return true;
}
Expand Down
45 changes: 0 additions & 45 deletions lib/logger.js

This file was deleted.

Loading

0 comments on commit 44cedc7

Please sign in to comment.