Skip to content

Commit

Permalink
fix types
Browse files Browse the repository at this point in the history
  • Loading branch information
afshinm committed Dec 31, 2023
1 parent 1609c8c commit e6ab514
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/pipeline/extractor/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface StorageExtractorProps extends PipelineProcessorProps {
}

class StorageExtractor extends PipelineProcessor<
Promise<StorageResponse>,
StorageResponse,
StorageExtractorProps
> {
get type(): ProcessorType {
Expand Down
46 changes: 25 additions & 21 deletions src/pipeline/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import { ID } from '../util/id';
import log from '../util/log';
import { EventEmitter } from '../util/eventEmitter';

interface PipelineEvents<T> {
interface PipelineEvents<R> {
/**
* Generic updated event. Triggers the callback function when the pipeline
* is updated, including when a new processor is registered, a processor's props
* get updated, etc.
*/
updated: (processor: PipelineProcessor<any, any>) => void;
updated: <T, P>(processor: PipelineProcessor<T, P>) => void;
/**
* Triggers the callback function when a new
* processor is registered successfully
Expand All @@ -27,27 +27,29 @@ interface PipelineEvents<T> {
* afterProcess will not be called if there is an
* error in the pipeline (i.e a step throw an Error)
*/
afterProcess: (prev: T) => void;
afterProcess: (prev: R) => void;
/**
* Triggers the callback function when the pipeline
* fails to process all steps or at least one step
* throws an Error
*/
error: (prev: T) => void;
error: <T>(prev: T) => void;
}

class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
class Pipeline<R> extends EventEmitter<PipelineEvents<R>> {
// available steps for this pipeline
private readonly _steps: Map<ProcessorType, PipelineProcessor<T, P>[]> =
new Map<ProcessorType, PipelineProcessor<T, P>[]>();
private readonly _steps: Map<
ProcessorType,
PipelineProcessor<unknown, unknown>[]
> = new Map<ProcessorType, PipelineProcessor<unknown, unknown>[]>();
// used to cache the results of processors using their id field
private cache: Map<string, any> = new Map<string, any>();
private cache: Map<string, unknown> = new Map<string, unknown>();
// keeps the index of the last updated processor in the registered
// processors list and will be used to invalidate the cache
// -1 means all new processors should be processed
private lastProcessorIndexUpdated = -1;

constructor(steps?: PipelineProcessor<T, P>[]) {
constructor(steps?: PipelineProcessor<unknown, unknown>[]) {
super();

if (steps) {
Expand All @@ -59,7 +61,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* Clears the `cache` array
*/
clearCache(): void {
this.cache = new Map<string, any>();
this.cache = new Map<string, object>();
this.lastProcessorIndexUpdated = -1;
}

Expand All @@ -69,7 +71,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* @param processor
* @param priority
*/
register(
register<T, P>(
processor: PipelineProcessor<T, P>,
priority: number = null,
): PipelineProcessor<T, P> {
Expand Down Expand Up @@ -99,7 +101,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* @param processor
* @param priority
*/
tryRegister(
tryRegister<T, P>(
processor: PipelineProcessor<T, P>,
priority: number = null,
): PipelineProcessor<T, P> | undefined {
Expand All @@ -117,7 +119,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
*
* @param processor
*/
unregister(processor: PipelineProcessor<T, P>): void {
unregister<T, P>(processor: PipelineProcessor<T, P>): void {
if (!processor) return;
if (this.findProcessorIndexByID(processor.id) === -1) return;

Expand All @@ -138,7 +140,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* @param processor
* @param priority
*/
private addProcessorByPriority(
private addProcessorByPriority<T, P>(
processor: PipelineProcessor<T, P>,
priority: number,
): void {
Expand Down Expand Up @@ -169,8 +171,8 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
/**
* Flattens the _steps Map and returns a list of steps with their correct priorities
*/
get steps(): PipelineProcessor<T, P>[] {
let steps: PipelineProcessor<T, P>[] = [];
get steps(): PipelineProcessor<unknown, unknown>[] {
let steps: PipelineProcessor<unknown, unknown>[] = [];

for (const type of this.getSortedProcessorTypes()) {
const subSteps = this._steps.get(type);
Expand All @@ -190,7 +192,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
*
* @param type
*/
getStepsByType(type: ProcessorType): PipelineProcessor<T, P>[] {
getStepsByType(type: ProcessorType): PipelineProcessor<unknown, unknown>[] {
return this.steps.filter((process) => process.type === type);
}

Expand All @@ -209,7 +211,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
*
* @param data
*/
async process(data?: T): Promise<T> {
async process(data?: R): Promise<R> {
const lastProcessorIndexUpdated = this.lastProcessorIndexUpdated;
const steps = this.steps;

Expand All @@ -224,11 +226,11 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
// updated processor was before "processor".
// This is to ensure that we always have correct and up to date
// data from processors and also to skip them when necessary
prev = await processor.process(prev);
prev = (await processor.process(prev)) as R;
this.cache.set(processor.id, prev);
} else {
// cached results already exist
prev = this.cache.get(processor.id);
prev = this.cache.get(processor.id) as R;
}
}
} catch (e) {
Expand Down Expand Up @@ -263,7 +265,9 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* This is used to invalid or skip a processor in
* the process() method
*/
private setLastProcessorIndex(processor: PipelineProcessor<T, P>): void {
private setLastProcessorIndex<T, P>(
processor: PipelineProcessor<T, P>,
): void {
const processorIndex = this.findProcessorIndexByID(processor.id);

if (this.lastProcessorIndexUpdated > processorIndex) {
Expand Down
8 changes: 5 additions & 3 deletions src/pipeline/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ export enum ProcessorType {
Limit,
}

interface PipelineProcessorEvents<T, P> {
propsUpdated: (processor: PipelineProcessor<T, P>) => void;
interface PipelineProcessorEvents {
propsUpdated: <T, P>(
processor: PipelineProcessor<T, P>,
) => void;
beforeProcess: (...args) => void;
afterProcess: (...args) => void;
}
Expand All @@ -28,7 +30,7 @@ export interface PipelineProcessorProps {}
export abstract class PipelineProcessor<
T,
P extends Partial<PipelineProcessorProps>,
> extends EventEmitter<PipelineProcessorEvents<T, P>> {
> extends EventEmitter<PipelineProcessorEvents> {
public readonly id: ID;
private _props: P;

Expand Down
6 changes: 3 additions & 3 deletions src/view/container.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import log from '../util/log';
import { useEffect } from 'preact/hooks';
import * as actions from './actions';
import { useStore } from '../hooks/useStore';
import useSelector from '../../src/hooks/useSelector';
import { useConfig } from '../../src/hooks/useConfig';
import { throttle } from 'src/util/throttle';
import useSelector from '../hooks/useSelector';
import { useConfig } from '../hooks/useConfig';
import { throttle } from '../util/throttle';

export function Container() {
const config = useConfig();
Expand Down
7 changes: 5 additions & 2 deletions src/view/plugin/pagination.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ export function Pagination() {
url: server.url,
body: server.body,
});

config.pipeline.register(processor.current);

Check warning on line 50 in src/view/plugin/pagination.tsx

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
} else {
processor.current = new PaginationLimit({
limit: limit,
page: currentPage,
});

config.pipeline.register(processor.current);
}

Check warning on line 58 in src/view/plugin/pagination.tsx

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch

if (processor.current instanceof ServerPaginationLimit) {
Expand All @@ -65,7 +69,6 @@ export function Pagination() {
}

config.pipeline.on('updated', onUpdate);
config.pipeline.register(processor.current);

// we need to make sure that the state is set
// to the default props when an error happens
Expand All @@ -75,7 +78,7 @@ export function Pagination() {
});

return () => {
config.pipeline.unregister(processor.current);
config.pipeline.unregister<object, object>(processor.current);
config.pipeline.off('updated', onUpdate);
};
}, []);
Expand Down
6 changes: 4 additions & 2 deletions src/view/plugin/search/search.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ export function Search() {
}, [props]);

useEffect(() => {
config.pipeline.register(processor);
if (processor) {
config.pipeline.register<object, object>(processor);
}

return () => config.pipeline.unregister(processor);
return () => config.pipeline.unregister<object, object>(processor);
}, [config, processor]);

const debouncedOnInput = useCallback(
Expand Down

0 comments on commit e6ab514

Please sign in to comment.