Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Add optional JOI schema for sequence (#101)
Browse files Browse the repository at this point in the history
* Cleanup: remove unused fields from recipe schema

* Add possibility to set a JOI schema for a sequence

This will validate the `attributes` field in the request body.

* Bump version
  • Loading branch information
MattiasOlla authored Jul 31, 2024
1 parent 6bb2ae8 commit a5d2853
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 16 deletions.
6 changes: 6 additions & 0 deletions lib/error-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ export async function errorHandler(err, req, res, next) {
return res.status(200).send({ type: "unrecoverable", message: err.extraMessage });
}

if (err.validation) {
logger.error(`Validation error: ${err.message}`);
await sendToDlx(req, `Validation error: ${err.message}`);
return res.status(200).send({ type: "validation_error", message: err.message });
}

logger.error(`Unexpected error ${err.message}: ${err.stack}`);

// If the request was not sent from cloud tasks, send it to the DLX so that it can be resent
Expand Down
9 changes: 1 addition & 8 deletions lib/recipe-repo.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,8 @@ const sequenceSchema = joi.object().keys({
.unique((a, b) => Object.keys(a)[0] === Object.keys(b)[0])
.required()
.items(joi.object().length(1)),
executionDelay: joi.alternatives().conditional("namespace", {
is: "sub-sequence",
then: joi
.number()
.min(0)
.max(60 * 60 * 1000),
}),
unrecoverable: joi.array().items(joi.object().length(1)),
useParentCorrelationId: joi.boolean().default(false),
schema: joi.object().schema().optional(),
});

const recipeSchema = joi
Expand Down
13 changes: 8 additions & 5 deletions lib/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { router as middlewareRouter } from "./middleware/index.js";
import { publishTask, publishTasksBulk } from "./publish-task.js";
import resend from "./resend.js";
import { appendData, buildNextKeyMapper, buildUrl, keyToUrl, sequenceIterator } from "./utils/sequences.js";
import validate from "./validator.js";

// Ugly hack to make lu-logger accept the same interface as pino.
// When all workers have been migrated, we'll rewrite them to use lu-logger
Expand All @@ -36,9 +37,9 @@ function buildCloudTaskTriggerRoutes(router, triggers) {
});
}

function buildCloudTaskSequenceRoutes(router, { namespace, name, sequence, unrecoverable }, nextKeyMapper) {
function buildCloudTaskSequenceRoutes(router, { namespace, name, sequence, unrecoverable, schema }, nextKeyMapper) {
for (const { key, func } of sequenceIterator(sequence)) {
router.post(buildUrl(namespace, name, key), messageHandler(func, nextKeyMapper(`${namespace}.${name}.${key}`)));
router.post(buildUrl(namespace, name, key), messageHandler(func, nextKeyMapper(`${namespace}.${name}.${key}`), schema));
}
// Allow to start a sequence/sub-sequence by posting to the sequence name
router.post(`/${namespace}/${name}`, startSequence(sequenceIterator(sequence).next().value));
Expand All @@ -62,8 +63,10 @@ function startSequence({ key: firstKeyInSequence, queue }) {
};
}

function messageHandler(func, { nextKey, queue } = {}) {
function messageHandler(func, { nextKey, queue } = {}, schema) {
return async (req, res) => {
const body = schema ? validate(req.body, schema) : req.body;

const {
key,
correlationId,
Expand All @@ -75,9 +78,9 @@ function messageHandler(func, { nextKey, queue } = {}) {
} = req.attributes;
const context = { ...buildContext(correlationId, key), logger }; // Overwrite the logger with the one from lu-logger;

const result = await func(req.body, context);
const result = await func(body, context);

const nextBody = appendData(req.body, result);
const nextBody = appendData(body, result);

if (result?.type === "trigger") {
const triggerResponse = await handleTriggerResult(result, nextBody, { nextKey, queue }, req.attributes);
Expand Down
19 changes: 19 additions & 0 deletions lib/validator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import Joi from "joi";

export default function validate(body, schema) {
const fullSchema = Joi.object({
type: Joi.string().required(),
id: Joi.string().required(),
attributes: schema,
data: Joi.array().optional(),
}).unknown(true);

const { error, value } = fullSchema.validate(body, { abortEarly: false });
if (error) {
const details = error.details.map((e) => e.message).join(", ");
const err = new Error(details);
err.validation = true;
throw err;
}
return value;
}
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@bonniernews/b0rker",
"version": "10.2.1",
"version": "10.3.0",
"engines": {
"node": ">=18"
},
Expand Down
153 changes: 153 additions & 0 deletions test/feature/validation-feature.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import { fakeCloudTasks, fakePubSub } from "@bonniernews/lu-test";
import Joi from "joi";

import { route, start } from "../../index.js";

const schema = Joi.object({ foo: Joi.string().required() });

Feature("Sequence with validation", () => {
afterEachScenario(() => {
fakeCloudTasks.reset();
fakePubSub.reset();
});

Scenario("Validation succeeds", () => {
let broker;
Given("broker is initiated with a recipe with a schema", () => {
broker = start({
startServer: false,
recipes: [
{
namespace: "sequence",
name: "test-order",
sequence: [
route(".perform.step-1", () => {
return { type: "step-1", id: "step-1-was-here" };
}),
],
schema,
},
],
});
});

const triggerMessage = {
type: "test-order",
id: "some-order-id",
attributes: { foo: "bar" },
};

let response;
When("a trigger message is received", async () => {
response = await fakeCloudTasks.runSequence(broker, "/v2/sequence/test-order", triggerMessage);
});

Then("the status code should be 201 Created", () => {
response.firstResponse.statusCode.should.eql(201, response.text);
});

And("the sequence should be processed", () => {
response.messages
.map(({ url }) => url)
.should.eql([ "/v2/sequence/test-order/perform.step-1", "/v2/sequence/test-order/processed" ]);
});
});

Scenario("Order is missing type and id", () => {
let broker;
Given("broker is initiated with a recipe with a schema", () => {
broker = start({
startServer: false,
recipes: [
{
namespace: "sequence",
name: "test-order",
sequence: [
route(".perform.step-1", () => {
return { type: "step-1", id: "step-1-was-here" };
}),
],
schema,
},
],
});
});

And("we can publish pubsub messages", () => {
fakePubSub.enablePublish(broker);
});

const triggerMessage = { attributes: { foo: "bar" } };

let response;
When("a trigger message is received", async () => {
response = await fakeCloudTasks.runSequence(broker, "/v2/sequence/test-order", triggerMessage, { "correlation-id": "some-epic-id" });
});

Then("the status code should be 201 Created", () => {
response.firstResponse.statusCode.should.eql(201, response.text);
});

And("the message should have been sent to the DLX", () => {
fakePubSub.recordedMessages().length.should.eql(1);
fakePubSub.recordedMessages()[0].message.error.message.should.eql('Validation error: "type" is required, "id" is required');
});

And("the sequence should not be processed", () => {
response.messages
.map(({ url }) => url)
.should.eql([ "/v2/sequence/test-order/perform.step-1" ]);
});
});

Scenario("Attribute validation fails", () => {
let broker;
Given("broker is initiated with a recipe with a schema", () => {
broker = start({
startServer: false,
recipes: [
{
namespace: "sequence",
name: "test-order",
sequence: [
route(".perform.step-1", () => {
return { type: "step-1", id: "step-1-was-here" };
}),
],
schema,
},
],
});
});

And("we can publish pubsub messages", () => {
fakePubSub.enablePublish(broker);
});

const triggerMessage = {
type: "test-order",
id: "some-order-id",
attributes: { foo: 42, iShouldNotBeHere: "nope" },
};

let response;
When("a trigger message is received", async () => {
response = await fakeCloudTasks.runSequence(broker, "/v2/sequence/test-order", triggerMessage, { "correlation-id": "some-epic-id" });
});

Then("the status code should be 201 Created", () => {
response.firstResponse.statusCode.should.eql(201, response.text);
});

And("the message should have been sent to the DLX", () => {
fakePubSub.recordedMessages().length.should.eql(1);
fakePubSub.recordedMessages()[0].message.error.message.should.eql('Validation error: "attributes.foo" must be a string, "attributes.iShouldNotBeHere" is not allowed');
});

And("the sequence should not be processed", () => {
response.messages
.map(({ url }) => url)
.should.eql([ "/v2/sequence/test-order/perform.step-1" ]);
});
});
});

0 comments on commit a5d2853

Please sign in to comment.