Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

$batch #675

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sbvr-api/permissions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,7 @@ export const resolveApiKey = async (
tx?: Tx,
): Promise<PermissionReq['apiKey']> => {
const apiKey =
req.params[paramName] ?? req.body[paramName] ?? req.query[paramName];
req.params?.[paramName] ?? req.body?.[paramName] ?? req.query?.[paramName];
if (apiKey == null) {
return;
}
Expand Down
159 changes: 143 additions & 16 deletions src/sbvr-api/sbvr-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ import {
setExecutedMigrations,
} from '../migrator/utils';

const validBatchMethods = new Set(['PUT', 'POST', 'PATCH', 'DELETE', 'GET']);

const LF2AbstractSQLTranslator = LF2AbstractSQL.createTranslator(sbvrTypes);
const LF2AbstractSQLTranslatorVersion = `${LF2AbstractSQLVersion}+${sbvrTypesVersion}`;

Expand Down Expand Up @@ -140,6 +142,7 @@ export interface ApiKey extends Actor {
}

export interface Response {
id?: string;
statusCode: number;
headers?:
| {
Expand Down Expand Up @@ -1143,6 +1146,7 @@ const $getAffectedIds = async ({
const parsedRequest: uriParser.ParsedODataRequest &
Partial<Pick<uriParser.ODataRequest, 'engine' | 'translateVersions'>> =
await uriParser.parseOData({
id: request.batchRequestId,
method: request.method,
url: `/${request.vocabulary}${request.url}`,
});
Expand Down Expand Up @@ -1192,11 +1196,103 @@ export const getModel = (vocabulary: string) => {
return models[vocabulary];
};

const validateBatch = (req: Express.Request) => {
const { requests } = req.body as { requests: uriParser.UnparsedRequest[] };
if (!Array.isArray(requests)) {
throw new BadRequestError(
'Batch requests must include an array of requests in the body via the "requests" property',
);
}
if (req.headers != null && req.headers['content-type'] == null) {
throw new BadRequestError(
'Headers in a batch request must include a "content-type" header if they are provided',
);
}
if (
requests.find(
(request) =>
request.headers?.authorization != null ||
request.url?.includes('apikey='),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically params for authentication can be customized so this isn't perfect - I'm not sure if it's something we can account for but worth being aware of

) != null
) {
throw new BadRequestError(
'Authorization may only be passed to the main batch request',
);
}
const ids = new Set<string>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string should be inferred if a string[] is passed in?

Suggested change
const ids = new Set<string>(
const ids = new Set(

requests
.map((request) => request.id)
.filter((id) => typeof id === 'string') as string[],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.filter((id) => typeof id === 'string') as string[],
.filter((id): id is string => typeof id === 'string'),

);
if (ids.size !== requests.length) {
throw new BadRequestError(
'All requests in a batch request must have unique string ids',
);
}

for (const request of requests) {
if (
request.headers != null &&
request.headers['content-type'] == null &&
(req.headers == null || req.headers['content-type'] == null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(req.headers == null || req.headers['content-type'] == null)
req.headers?.['content-type'] == null

) {
throw new BadRequestError(
'Requests of a batch request that have headers must include a "content-type" header',
);
}
if (request.method == null) {
throw new BadRequestError(
'Requests of a batch request must have a "method"',
);
}
const upperCaseMethod = request.method.toUpperCase();
if (!validBatchMethods.has(upperCaseMethod)) {
throw new BadRequestError(
`Requests of a batch request must have a method matching one of the following: ${Array.from(
validBatchMethods,
).join(', ')}`,
);
}
if (
request.body !== undefined &&
(upperCaseMethod === 'GET' || upperCaseMethod === 'DELETE')
) {
throw new BadRequestError(
'GET and DELETE requests of a batch request must not have a body',
);
}
}

const urls = new Set<string | undefined>(
requests.map((request) => request.url),
);
if (urls.has(undefined)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also check for null?

throw new BadRequestError('Requests of a batch request must have a "url"');
}
const containsBatch =
Array.from(urls).filter((url) => !!url?.includes('/$batch')).length > 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Array.from(urls).filter((url) => !!url?.includes('/$batch')).length > 0;
Array.from(urls).some((url) => !!url?.includes('/$batch'));

if (containsBatch) {
throw new BadRequestError('Batch requests cannot contain batch requests');
}
const urlModels = new Set(
Array.from(urls.values()).map((url: string) => url.split('/')[1]),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can limit the splitting to stop on the first match:

Suggested change
Array.from(urls.values()).map((url: string) => url.split('/')[1]),
Array.from(urls.values()).map((url: string) => url.split('/', 1)[1]),

);
if (urlModels.size > 1) {
throw new BadRequestError(
'Batch requests must consist of requests for only one model',
);
}
};

const runODataRequest = (req: Express.Request, vocabulary: string) => {
if (env.DEBUG) {
api[vocabulary].logger.log('Parsing', req.method, req.url);
}

if (req.url.startsWith(`/${vocabulary}/$batch`)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be good to store this check somewhere as I've seen equivalent .startsWith checks in a few places

validateBatch(req);
}

// Get the hooks for the current method/vocabulary as we know it,
// in order to run PREPARSE hooks, before parsing gets us more info
const { versions } = models[vocabulary];
Expand Down Expand Up @@ -1244,11 +1340,20 @@ const runODataRequest = (req: Express.Request, vocabulary: string) => {
await runHooks('PREPARSE', reqHooks, { req, tx: req.tx });
let requests: uriParser.UnparsedRequest[];
// Check if it is a single request or a batch
if (req.batch != null && req.batch.length > 0) {
requests = req.batch;
if (req.url.startsWith(`/${vocabulary}/$batch`)) {
await Promise.all(
req.body.requests.map(
async (request: HookReq) =>
await runHooks('PREPARSE', reqHooks, {
req: request,
tx: req.tx,
}),
),
);
requests = req.body.requests;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will execute all requests in one database transaction. Is this desired or should every request run in it's own database transaction?
The background is, if a database transaction fails the qhole transaction is rolled back, thus all batch requests will be rolled back.

@Page- What do you think, should every request in a batch run in its own db tx?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is part of the point/idea of implementing $batch @fisehara , to run all requests in a batch on one transaction. Part of the following roadmap item: https://roadmap.balena.io/posts/19/allow-changes-to-variables-to-be-batched-and-submitted-all-at-once

} else {
const { method, url, body } = req;
requests = [{ method, url, data: body }];
requests = [{ method, url, body }];
}

const prepareRequest = async (
Expand Down Expand Up @@ -1312,7 +1417,13 @@ const runODataRequest = (req: Express.Request, vocabulary: string) => {

// Parse the OData requests
const results = await mappingFn(requests, async (requestPart) => {
const parsedRequest = await uriParser.parseOData(requestPart);
const parsedRequest = await uriParser.parseOData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the requests are independent, should they be executed in parallel on the DB or should pinejs limit the requests to be executed sequencially to do some kind of rate limiting?

Copy link
Member Author

@myarmolinsky myarmolinsky Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requests should be executed sequentially. If a request along the way fails, the batch request should fail. However, there will be no rollback if the requests are not atomic

should pinejs limit the requests to be executed sequencially to do some kind of rate limiting?

I don't know enough about pine and the API to answer this well, your opinion based on what I stated above would be better

requestPart,
req.url.startsWith(`/${vocabulary}/$batch`) &&
!requestPart.url.includes(`/${vocabulary}/$batch`)
? req.headers
: undefined,
);

let request: uriParser.ODataRequest | uriParser.ODataRequest[];
if (Array.isArray(parsedRequest)) {
Expand Down Expand Up @@ -1392,7 +1503,10 @@ export const handleODataRequest: Express.Handler = async (req, res, next) => {

res.set('Cache-Control', 'no-cache');
// If we are dealing with a single request unpack the response and respond normally
if (req.batch == null || req.batch.length === 0) {
if (
!req.url.startsWith(`/${apiRoot}/$batch`) ||
req.body.requests?.length === 0
) {
let [response] = responses;
if (response instanceof HttpError) {
response = httpErrorToResponse(response);
Expand All @@ -1401,15 +1515,15 @@ export const handleODataRequest: Express.Handler = async (req, res, next) => {

// Otherwise its a multipart request and we reply with the appropriate multipart response
} else {
(res.status(200) as any).sendMulti(
responses.map((response) => {
res.status(200).json({
responses: responses.map((response) => {
if (response instanceof HttpError) {
myarmolinsky marked this conversation as resolved.
Show resolved Hide resolved
return httpErrorToResponse(response);
myarmolinsky marked this conversation as resolved.
Show resolved Hide resolved
} else {
return response;
}
}),
);
});
}
} catch (e: any) {
if (handleHttpErrors(req, res, e)) {
Expand All @@ -1436,7 +1550,7 @@ export const handleHttpErrors = (
for (const handleErrorFn of handleErrorFns) {
handleErrorFn(req, err);
}
const response = httpErrorToResponse(err);
const response = httpErrorToResponse(err, req);
handleResponse(res, response);
return true;
}
Expand All @@ -1455,10 +1569,12 @@ const handleResponse = (res: Express.Response, response: Response): void => {

const httpErrorToResponse = (
err: HttpError,
req?: Express.Request,
): RequiredField<Response, 'statusCode'> => {
const message = err.getResponseBody();
return {
statusCode: err.status,
body: err.getResponseBody(),
body: req != null && 'batch' in req ? { responses: [], message } : message,
headers: err.headers,
};
};
Expand Down Expand Up @@ -1572,7 +1688,8 @@ const runChangeSet =
throw new Error('No request id');
}
result.headers ??= {};
result.headers['content-id'] = request.id;
result.headers['content-id'] = request.batchRequestId;
result.id = request.batchRequestId;
changeSetResults.set(request.id, result);
};

Expand Down Expand Up @@ -1611,22 +1728,32 @@ const prepareResponse = async (
result: any,
tx: Db.Tx,
): Promise<Response> => {
let response: Response;
switch (request.method) {
case 'GET':
return await respondGet(req, request, result, tx);
response = await respondGet(req, request, result, tx);
break;
case 'POST':
return await respondPost(req, request, result, tx);
response = await respondPost(req, request, result, tx);
break;
case 'PUT':
case 'PATCH':
case 'MERGE':
return await respondPut(req, request, result, tx);
response = await respondPut(req, request, result, tx);
break;
case 'DELETE':
return await respondDelete(req, request, result, tx);
response = await respondDelete(req, request, result, tx);
break;
case 'OPTIONS':
return await respondOptions(req, request, result, tx);
response = await respondOptions(req, request, result, tx);
break;
default:
throw new MethodNotAllowedError();
}
if (request.batchRequestId != null) {
response['id'] = request.batchRequestId;
}
return response;
};

const checkReadOnlyRequests = (request: uriParser.ODataRequest) => {
Expand Down
20 changes: 15 additions & 5 deletions src/sbvr-api/uri-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,22 @@ import {
TranslationError,
} from './errors';
import * as sbvrUtils from './sbvr-utils';
import { IncomingHttpHeaders } from 'http';

export type OdataBinds = ODataBinds;

export interface UnparsedRequest {
id?: string;
myarmolinsky marked this conversation as resolved.
Show resolved Hide resolved
method: string;
url: string;
data?: any;
headers?: { [header: string]: string };
body?: any;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reasoning behind renaming data to body?
This is making it a breaking change, is this neccessary for implementing batch support?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the spec, which specifies that there is a body but says nothing about data, yet we seem to use data in place of body, so it seems like we just named the property in a way that doesn't abide by the spec (I don't know what our reasoning for this was)

If we want to keep it is data just to avoid a breaking change, I don't know if that sounds like good enough reasoning to avoid following the spec which should presumably be helpful for future changes and understanding as well

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This predates the odata json $batch but now that it exists I think it makes sense to match that format for our internal data structure as it should simplify a whole bunch of things

headers?: IncomingHttpHeaders;
changeSet?: UnparsedRequest[];
_isChangeSet?: boolean;
}

export interface ParsedODataRequest {
headers?: IncomingHttpHeaders;
method: SupportedMethod;
url: string;
vocabulary: string;
Expand All @@ -48,6 +51,7 @@ export interface ParsedODataRequest {
odataBinds: OdataBinds;
custom: AnyObject;
id?: number | undefined;
batchRequestId?: string;
_defer?: boolean;
}
export interface ODataRequest extends ParsedODataRequest {
Expand Down Expand Up @@ -263,15 +267,19 @@ export const metadataEndpoints = ['$metadata', '$serviceroot'];

export async function parseOData(
b: UnparsedRequest & { _isChangeSet?: false },
headers?: IncomingHttpHeaders,
): Promise<ParsedODataRequest>;
export async function parseOData(
b: UnparsedRequest & { _isChangeSet: true },
headers?: IncomingHttpHeaders,
): Promise<ParsedODataRequest[]>;
export async function parseOData(
b: UnparsedRequest,
headers?: IncomingHttpHeaders,
): Promise<ParsedODataRequest | ParsedODataRequest[]>;
export async function parseOData(
b: UnparsedRequest,
batchHeaders?: IncomingHttpHeaders,
): Promise<ParsedODataRequest | ParsedODataRequest[]> {
try {
if (b._isChangeSet && b.changeSet != null) {
Expand All @@ -292,12 +300,14 @@ export async function parseOData(
const odata = memoizedParseOdata(url);

return {
batchRequestId: b.id,
headers: { ...batchHeaders, ...b.headers },
method: b.method as SupportedMethod,
url,
vocabulary: apiRoot,
resourceName: odata.tree.resource,
originalResourceName: odata.tree.resource,
values: b.data ?? {},
values: b.body ?? {},
odataQuery: odata.tree,
odataBinds: odata.binds,
custom: {},
Expand Down Expand Up @@ -362,7 +372,7 @@ const parseODataChangeset = (
originalResourceName: odata.tree.resource,
odataBinds: odata.binds,
odataQuery: odata.tree,
values: b.data ?? {},
values: b.body ?? {},
custom: {},
id: contentId,
_defer: defer,
Expand All @@ -379,7 +389,7 @@ const splitApiRoot = (url: string) => {
};

const mustExtractHeader = (
body: { headers?: { [header: string]: string } },
body: { headers?: IncomingHttpHeaders },
header: string,
) => {
const h: any = body.headers?.[header]?.[0];
Expand Down
Loading