Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom auth in failure function #28

Merged
merged 25 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
484cefc
feat: add context.cancel
CahidArda Oct 23, 2024
00288e2
feat: add lazy fetch and rm context.rawInitialPayload
CahidArda Oct 24, 2024
9b94610
fix: context.cancel
CahidArda Oct 24, 2024
0ef8478
feat: rename errors
CahidArda Oct 24, 2024
30e5315
fix: add lazyFetch for initial payload and context.call
CahidArda Oct 31, 2024
14dce13
fix: add lazyFetch for parallel
CahidArda Oct 31, 2024
6db31ce
fix: docstings
CahidArda Nov 1, 2024
2630533
Merge branch 'main' into DX-1279-context-cancel
CahidArda Nov 1, 2024
87b6eb8
fix: tests
CahidArda Nov 1, 2024
2857350
Merge branch 'main' into DX-1279-context-cancel
CahidArda Nov 18, 2024
d78b08c
feat: add initialBody flag and large-payload integration tests
CahidArda Nov 18, 2024
c9c344b
fix: tests
CahidArda Nov 18, 2024
6a6c038
fix: increase test durations
CahidArda Nov 18, 2024
65015c6
fix: track counter in steps
CahidArda Nov 18, 2024
3e4bc70
ci: increase test durations
CahidArda Nov 18, 2024
b0197fb
fix: rename parsePayload as processRawSteps and change params
CahidArda Nov 19, 2024
ab7f459
ci: disable large-payload tests
CahidArda Nov 19, 2024
c91cf2b
Merge branch 'main' into DX-1279-context-cancel
CahidArda Nov 19, 2024
3b7e1a2
feat: run auth in failureFunction
CahidArda Nov 19, 2024
0955d53
fix: handle 0 retries and empty body in context.call
CahidArda Nov 19, 2024
2e20d5a
fix: increase test timeouts
CahidArda Nov 19, 2024
6875612
ci: add redis.fail and route for testing auth check in handleFailure
CahidArda Nov 19, 2024
ee69a65
Merge branch 'main' into DX-1393-auth-in-failure-function
CahidArda Nov 20, 2024
2467f53
fix: review
CahidArda Nov 21, 2024
16b47d3
ci: fix import
CahidArda Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/ci/app/ci/ci.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe("workflow integration tests", () => {
await initiateTest(testConfig.route, testConfig.waitForSeconds)
},
{
timeout: (testConfig.waitForSeconds + 10) * 1000
timeout: (testConfig.waitForSeconds + 15) * 1000
}
)
});
Expand Down
9 changes: 7 additions & 2 deletions examples/ci/app/ci/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ export const TEST_ROUTES: Pick<TestConfig, "route" | "waitForSeconds">[] = [
},
{
// checks auth
route: "auth",
route: "auth/success",
waitForSeconds: 1
},
{
// checks auth failing
route: "auth-fail",
route: "auth/fail",
waitForSeconds: 0
},
{
// checks custom auth
route: "auth/custom/workflow",
waitForSeconds: 5
},
{
// checks context.call (sucess and fail case)
route: "call/workflow",
Expand Down
17 changes: 17 additions & 0 deletions examples/ci/app/ci/upstash/redis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,21 @@ describe("redis", () => {
).not.toThrow()
})
})

test("should fail if marked as failed", async () => {

const route = "fail-route"
const randomId = `random-id-${nanoid()}`
const result = `random-result-${nanoid()}`

// increment, save and check
await redis.increment(route, randomId)
await redis.saveResultsWithoutContext(route, randomId, result)
await redis.checkRedisForResults(route, randomId, 1, result)
Comment on lines +91 to +93
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we check this? We already know it'd work

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a checkpoint to make sure that the state is set in redis, and only difference is the failWithoutContext method


// mark as failed and check
await redis.failWithoutContext(route, randomId)
expect(redis.checkRedisForResults(route, randomId, 1, result)).rejects.toThrow(redis.FAILED_TEXT)

})
})
46 changes: 40 additions & 6 deletions examples/ci/app/ci/upstash/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const redis = Redis.fromEnv();
const EXPIRE_IN_SECS = 60

const getRedisKey = (
kind: "increment" | "result",
kind: "increment" | "result" | "fail",
route: string,
randomTestId: string
): string => {
Expand Down Expand Up @@ -46,11 +46,7 @@ export const saveResultsWithoutContext = async (

// save result
const key = getRedisKey("result", route, randomTestId)

const pipe = redis.pipeline()
pipe.set<RedisResult>(key, { callCount, result, randomTestId })
pipe.expire(key, EXPIRE_IN_SECS)
await pipe.exec()
await redis.set<RedisResult>(key, { callCount, result, randomTestId }, { ex: EXPIRE_IN_SECS })
}

/**
Expand Down Expand Up @@ -80,6 +76,38 @@ export const saveResult = async (
)
}

export const failWithoutContext = async (
route: string,
randomTestId: string
) => {
const key = getRedisKey("fail", route, randomTestId)
await redis.set<boolean>(key, true, { ex: EXPIRE_IN_SECS })
}

/**
* marks the workflow as failed
*
* @param context
* @returns
*/
export const fail = async (
context: WorkflowContext<unknown>,
) => {
const randomTestId = context.headers.get(CI_RANDOM_ID_HEADER)
const route = context.headers.get(CI_ROUTE_HEADER)

if (randomTestId === null) {
throw new Error("randomTestId can't be null.")
}
if (route === null) {
throw new Error("route can't be null.")
}

await failWithoutContext(route, randomTestId)
}

export const FAILED_TEXT = "Test has failed because it was marked as failed with `fail` method."

export const checkRedisForResults = async (
route: string,
randomTestId: string,
Expand All @@ -101,6 +129,12 @@ export const checkRedisForResults = async (
throw new Error(`result not found for route ${route} with randomTestId ${randomTestId}`)
}

const failKey = getRedisKey("fail", route, randomTestId)
const failed = await redis.get<boolean>(failKey)
if (failed) {
throw new Error(FAILED_TEXT)
}

const { callCount, randomTestId: resultRandomTestId, result } = testResult

expect(resultRandomTestId, randomTestId)
Expand Down
4 changes: 4 additions & 0 deletions examples/ci/app/test-routes/auth/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
this directory has three tests
- success: checking auth correctly
- fail: auth failing
- custom: define an workflow endpoint secured with custom auth (instead of receiver) and try to call it as if failure callback
16 changes: 16 additions & 0 deletions examples/ci/app/test-routes/auth/custom/target/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { WorkflowContext } from "@upstash/workflow";
import { serve } from "@upstash/workflow/nextjs";
import { fail } from "app/ci/upstash/redis";
import { nanoid } from "app/ci/utils";


export const { POST } = serve(async (context) => {
if (context.headers.get("authorization") !== nanoid()) {
return;
};
}, {
receiver: undefined,
async failureFunction({ context }) {
await fail(context as WorkflowContext)
},
})
95 changes: 95 additions & 0 deletions examples/ci/app/test-routes/auth/custom/workflow/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { serve } from "@upstash/workflow/nextjs";
import { BASE_URL, CI_RANDOM_ID_HEADER, CI_ROUTE_HEADER, TEST_ROUTE_PREFIX } from "app/ci/constants";
import { testServe, expect } from "app/ci/utils";
import { FailureFunctionPayload, WorkflowContext } from "@upstash/workflow";
import { saveResult } from "app/ci/upstash/redis";

const header = `test-header-foo`
const headerValue = `header-bar`
const authentication = `Bearer test-auth-super-secret`
const payload = "my-payload"

const thirdPartyEndpoint = `${TEST_ROUTE_PREFIX}/auth/custom/target`

const makeCall = async (
context: WorkflowContext,
stepName: string,
method: "GET" | "POST",
expectedStatus: number,
expectedBody: unknown
) => {
const randomId = context.headers.get(CI_RANDOM_ID_HEADER)
const route = context.headers.get(CI_ROUTE_HEADER)

if (!randomId || !route) {
throw new Error("randomId or route not found")
}

const { status, body } = await context.call<FailureFunctionPayload>(stepName, {
url: thirdPartyEndpoint,
body:
{
status: 200,
header: "",
body: "",
url: "",
sourceHeader: {
[CI_ROUTE_HEADER]: [route],
[CI_RANDOM_ID_HEADER]: [randomId]
},
sourceBody: "",
workflowRunId: "",
sourceMessageId: "",
},
method,
headers: {
[ CI_RANDOM_ID_HEADER ]: randomId,
[ CI_ROUTE_HEADER ]: route,
"Upstash-Workflow-Is-Failure": "true"
}
})

expect(status, expectedStatus)

expect(typeof body, typeof expectedBody)
expect(JSON.stringify(body), JSON.stringify(expectedBody))
}

export const { POST, GET } = testServe(
serve<string>(
async (context) => {

expect(context.headers.get(header)!, headerValue)

await makeCall(
context,
"regular call should fail",
"POST",
500,
{
error: "WorkflowError",
message: "Not authorized to run the failure function."
}
)

const input = context.requestPayload;
expect(input, payload);

await saveResult(
context,
"not authorized for failure"
)
}, {
baseUrl: BASE_URL,
retries: 0,
}
), {
expectedCallCount: 4,
expectedResult: "not authorized for failure",
payload,
headers: {
[ header ]: headerValue,
"authentication": authentication
}
}
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { serve } from "@upstash/workflow/nextjs";
import { BASE_URL } from "app/ci/constants";
import { testServe, expect } from "app/ci/utils";
import { saveResult } from "app/ci/upstash/redis"
import { fail, saveResult } from "app/ci/upstash/redis"

const header = `test-header-foo`
const headerValue = `header-bar`
Expand All @@ -28,10 +28,10 @@ export const { POST, GET } = testServe(
return;
}

throw new Error("shouldn't come here.")
await fail(context)
}, {
baseUrl: BASE_URL,
retries: 0
retries: 1 // check with retries 1 to see if endpoint will retry
}
), {
expectedCallCount: 1,
Expand Down
14 changes: 12 additions & 2 deletions examples/ci/app/test-routes/call/third-party/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,15 @@ export const PATCH = async () => {
headers: {
[ FAILING_HEADER ]: FAILING_HEADER_VALUE
}
})
}
}
)
}

export const PUT = async () => {
return new Response(
undefined,
{
status: 300,
}
)
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { serve } from "@upstash/workflow/nextjs";
import { BASE_URL, TEST_ROUTE_PREFIX } from "app/ci/constants";
import { testServe, expect } from "app/ci/utils";
import { saveResult } from "app/ci/upstash/redis"
import { fail, saveResult } from "app/ci/upstash/redis"
import { FAILING_HEADER, FAILING_HEADER_VALUE } from "../constants";
import { WorkflowContext } from "@upstash/workflow";

const testHeader = `test-header-foo`
const headerValue = `header-foo`
Expand Down Expand Up @@ -31,8 +32,8 @@ export const { POST, GET } = testServe(
}, {
baseUrl: BASE_URL,
retries: 0,
failureFunction() {
console.log("SHOULDNT RUN");
async failureFunction({ context }) {
await fail(context as WorkflowContext)
},
}
), {
Expand Down
20 changes: 15 additions & 5 deletions examples/ci/app/test-routes/call/workflow/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ export const { POST, GET } = testServe(

await context.sleep("sleep 1", 2);

const { body: getResult, header: getHeaders, status: getStatus } = await context.call("get call", {
const { body: getResult, header: getHeaders, status: getStatus } = await context.call<string>("get call", {
url: thirdPartyEndpoint,
headers: getHeader,
});

expect(getStatus, 200)
expect(getHeaders[GET_HEADER][0], GET_HEADER_VALUE)
expect(getResult as string, "called GET 'third-party-result' 'get-header-value-x'");
expect(getResult, "called GET 'third-party-result' 'get-header-value-x'");

const { body: patchResult, status, header } = await context.call("get call", {
const { body: patchResult, status, header } = await context.call("patch call", {
url: thirdPartyEndpoint,
headers: getHeader,
method: "PATCH",
Expand All @@ -65,16 +65,26 @@ export const { POST, GET } = testServe(
expect(patchResult as string, "failing request");
expect(header[FAILING_HEADER][0], FAILING_HEADER_VALUE)

// put will return with an empty body. should return "" as body in that case.
const { body: putBody, status: putStatus } = await context.call<string>("put call", {
url: thirdPartyEndpoint,
method: "PUT",
retries: 0
})

expect(putStatus, 300)
expect(putBody, "");

await saveResult(
context,
getResult as string
getResult
)
}, {
baseUrl: BASE_URL,
retries: 0
}
), {
expectedCallCount: 10,
expectedCallCount: 12,
expectedResult: "called GET 'third-party-result' 'get-header-value-x'",
payload,
headers: {
Expand Down
2 changes: 1 addition & 1 deletion examples/cloudflare-workers-hono/ci.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ const testEndpoint = ({
expect(result?.secret).toBe(secret)
expect(result?.result).toBe(expectedResult)
}, {
timeout: 8000
timeout: 15000
})
}

Expand Down
2 changes: 1 addition & 1 deletion examples/nextjs-pages/ci.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ const testEndpoint = ({
expect(result?.secret).toBe(secret)
expect(result?.result).toBe(expectedResult)
}, {
timeout: 9000
timeout: 15000
})
}

Expand Down
1 change: 1 addition & 0 deletions src/serve/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export const serve = <
requestPayload,
qstashClient,
initialPayloadParser,
routeFunction,
failureFunction
);
if (failureCheck.isErr()) {
Expand Down
Loading
Loading