Skip to content

Commit

Permalink
refactor: rework how http source uses the transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
DavieReid committed Dec 6, 2023
1 parent 24c949a commit 4c5426a
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 224 deletions.
125 changes: 33 additions & 92 deletions packages/source-http/src/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,47 +142,6 @@ describe('GIVEN an HTTP Source ', () => {
});
});

describe('WHEN no transformResponseToPagesModulePath is undefined', () => {
const server = setupServer();
beforeAll(() => {
server.use(...successHandlers);
server.listen({ onUnhandledRequest: 'warn' });
});
afterAll(() => {
server.close();
});

it('should merge results from all endpoints into 1 array', done => {
const source$: Observable<Page[]> = Source.create(
{ ...options, transformResponseToPagesModulePath: undefined },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
expect(result.length).toEqual(3);
},
complete: () => done()
});
});

it('no transformation of responses is carried out', done => {
const source$: Observable<Page[]> = Source.create(
{ ...options, transformResponseToPagesModulePath: undefined },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
expect(result[0]).toEqual({ name: 'Alice' });
expect(result[1]).toEqual({ name: 'Bob' });
expect(result[2]).toEqual({ name: 'Eve' });
},
complete: () => done()
});
});
});

describe('WHEN noProxy option is used', () => {
const server = setupServer();
beforeAll(() => {
Expand Down Expand Up @@ -226,7 +185,10 @@ describe('GIVEN the createHttpSource function ', () => {
});

it('should merge results from all endpoints into 1 array', done => {
const source$: Observable<Page[]> = createHttpSource(options, { schedule });
const source$: Observable<Page[]> = createHttpSource(
{ ...options, transformer: toUpperCaseTransformer },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
Expand All @@ -237,7 +199,10 @@ describe('GIVEN the createHttpSource function ', () => {
});

it('should transform the responses using the transform function', done => {
const source$: Observable<Page[]> = createHttpSource(options, { schedule });
const source$: Observable<Page[]> = createHttpSource(
{ ...options, transformer: toUpperCaseTransformer },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
Expand Down Expand Up @@ -271,7 +236,10 @@ describe('GIVEN the createHttpSource function ', () => {
});

it('should merge results from **successful** endpoints into 1 array', done => {
const source$: Observable<Page[]> = createHttpSource(options, { schedule });
const source$: Observable<Page[]> = createHttpSource(
{ ...options, transformer: toUpperCaseTransformer },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
Expand All @@ -283,63 +251,30 @@ describe('GIVEN the createHttpSource function ', () => {
});
});

describe('WHEN the transformer has a request config', () => {
describe('WHEN the transformer is passed params', () => {
const server = setupServer();
const mockTransformer = jest.fn();
beforeAll(() => {
server.use(...successHandlers);
server.listen({ onUnhandledRequest: 'warn' });
});
afterAll(() => {
server.close();
});
it('should merge results from **successful** endpoints into 1 array', done => {
const source$: Observable<Page[]> = createHttpSource(options, { schedule });

source$.pipe(take(1)).subscribe({
next: result => {
expect(result.length).toEqual(3);
expect(result[0]).toEqual('ALICE');
},
complete: () => done()
});
});
});

describe('WHEN no transformResponseToPagesModulePath is undefined', () => {
const server = setupServer();
beforeAll(() => {
server.use(...successHandlers);
server.listen({ onUnhandledRequest: 'warn' });
});
afterAll(() => {
server.close();
});

it('should merge results from all endpoints into 1 array', done => {
it('should pass transformer options to the transformer', done => {
const source$: Observable<Page[]> = createHttpSource(
{ ...options, transformResponseToPagesModulePath: undefined },
{ ...options, transformer: mockTransformer, transformerOptions: { option: 'an option' } },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
expect(result.length).toEqual(3);
},
complete: () => done()
});
});

it('no transformation of responses is carried out', done => {
const source$: Observable<Page[]> = createHttpSource(
{ ...options, transformResponseToPagesModulePath: undefined },
{ schedule }
);

source$.pipe(take(1)).subscribe({
next: result => {
expect(result[0]).toEqual({ name: 'Alice' });
expect(result[1]).toEqual({ name: 'Bob' });
expect(result[2]).toEqual({ name: 'Eve' });
expect(mockTransformer).toBeCalledTimes(3);
expect(mockTransformer.mock.calls[0][0]).toEqual({ name: 'Alice' });
expect(mockTransformer.mock.calls[0][1]).toEqual(options.prefixDir);
expect(mockTransformer.mock.calls[0][2]).toEqual(0);
expect(mockTransformer.mock.calls[0][3]).toEqual({ option: 'an option' });
},
complete: () => done()
});
Expand All @@ -361,9 +296,12 @@ describe('GIVEN the createHttpSource function ', () => {
});

it('should merge results from all endpoints into 1 array', done => {
const source$: Observable<Page[]> = createHttpSource(configuredRequestsOptions, {
schedule
});
const source$: Observable<Page[]> = createHttpSource(
{ ...configuredRequestsOptions, transformer: toUpperCaseTransformer },
{
schedule
}
);

source$.pipe(take(1)).subscribe({
next: result => {
Expand All @@ -374,9 +312,12 @@ describe('GIVEN the createHttpSource function ', () => {
});

it('should transform the responses using the transform function', done => {
const source$: Observable<Page[]> = createHttpSource(configuredRequestsOptions, {
schedule
});
const source$: Observable<Page[]> = createHttpSource(
{ ...configuredRequestsOptions, transformer: toUpperCaseTransformer },
{
schedule
}
);

source$.pipe(take(1)).subscribe({
next: result => {
Expand Down
99 changes: 99 additions & 0 deletions packages/source-http/src/createHttpSource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { forkJoin, timer } from 'rxjs';
import { switchMap, map } from 'rxjs/operators';
import { z } from 'zod';
import type { Page, SourceConfig } from '@jpmorganchase/mosaic-types';
import { fromHttpRequest, isErrorResponse } from '@jpmorganchase/mosaic-from-http-request';
import { sourceScheduleSchema, validateMosaicSchema } from '@jpmorganchase/mosaic-schemas';

import { createProxyAgent } from './proxyAgent.js';
import { ResponseTransformer } from './fromDynamicImport.js';

export { createProxyAgent };

export type HttpSourceResponseTransformerType<TResponse, TPage> = ResponseTransformer<
TResponse,
TPage
>;

export const httpSourceCreatorSchema = z.object({
schedule: sourceScheduleSchema.optional(),
endpoints: z.array(z.string().url()).optional().default([]),
prefixDir: z.string({ required_error: 'Please provide a prefixDir' }),
requestTimeout: z.number().optional().default(5000),
proxyEndpoint: z.string().url().optional(),
noProxy: z
.any()
.transform(val => new RegExp(val))
.optional(),
requestHeaders: z.object({}).passthrough().optional(),
transformerOptions: z.unknown().optional()
});

export interface CreateHttpSourceParams<TResponse, TPage>
extends z.input<typeof httpSourceCreatorSchema> {
configuredRequests?: Request[];
transformer: HttpSourceResponseTransformerType<TResponse, TPage>;
}

/**
* For use inside *other* sources.
* Allows a transformer function to be passed directly without the need for dynamic imports.
*
*/
export function createHttpSource<TResponse, TPage extends Page>(
{ configuredRequests, transformer, ...restOptions }: CreateHttpSourceParams<TResponse, TPage>,
{ schedule }: SourceConfig
) {
const {
endpoints,
prefixDir,
proxyEndpoint,
noProxy,
requestHeaders,
requestTimeout,
transformerOptions
} = validateMosaicSchema(httpSourceCreatorSchema, restOptions);

const delayMs = schedule.checkIntervalMins * 60000;
let requests = configuredRequests || [];

if (endpoints.length > 0) {
requests = endpoints.map(endpoint => {
let agent;
const headers = requestHeaders
? (requestHeaders as HeadersInit)
: {
'Content-Type': 'application/json'
};

if (!noProxy?.test(endpoint) && proxyEndpoint) {
agent = createProxyAgent(proxyEndpoint);
}

return new Request(new URL(endpoint), {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
agent,
headers,
timeout: requestTimeout
});
});
}

return timer(schedule.initialDelayMs, delayMs).pipe(
switchMap(() => {
const fetches = requests.map((request, index) =>
fromHttpRequest<TResponse>(request).pipe(
map(response => {
if (isErrorResponse<TResponse>(response)) {
return [];
}

return transformer(response, prefixDir, index, transformerOptions);
})
)
);
return forkJoin(fetches).pipe(map(result => result.flat()));
})
);
}
25 changes: 11 additions & 14 deletions packages/source-http/src/fromDynamicImport.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import { distinctUntilChanged, from, iif, of, switchMap } from 'rxjs';
import type { Page } from '@jpmorganchase/mosaic-types';
import { distinctUntilChanged, from, switchMap } from 'rxjs';

export type ResponseTransformer<TResponse, TOptions> = (
export type ResponseTransformer<TResponse, TPage> = (
response: TResponse,
prefixDir: string,
index: number,
options?: TOptions
) => Array<TResponse>;
...rest: any[]
) => Array<TPage>;

async function importTransformer<T, O>(
async function importTransformer<TResponse, TPage>(
modulePath: string
): Promise<{
transformer: ResponseTransformer<T, O>;
transformer: ResponseTransformer<TResponse, TPage>;
}> {
const { default: transformResponseToPages } = await import(modulePath);
if (!transformResponseToPages) {
Expand All @@ -20,12 +21,8 @@ async function importTransformer<T, O>(
return { transformer: transformResponseToPages };
}

export const fromDynamicImport = <TResponse = unknown, TOptions = unknown>(modulePath?: string) =>
iif(
() => modulePath === undefined,
of({ transformer: null }),
from(String(modulePath)).pipe(
distinctUntilChanged(),
switchMap(() => importTransformer<TResponse, TOptions>(String(modulePath)))
)
export const fromDynamicImport = <TResponse = unknown, TPage = Page>(modulePath: string) =>
from(modulePath).pipe(
distinctUntilChanged(),
switchMap(() => importTransformer<TResponse, TPage>(modulePath))
);
Loading

1 comment on commit 4c5426a

@vercel
Copy link

@vercel vercel bot commented on 4c5426a Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

mosaic – ./

mosaic-git-main-mosaic-dev-team.vercel.app
mosaic-mosaic-dev-team.vercel.app

Please sign in to comment.