diff --git a/src/pipeline/extractor/storage.ts b/src/pipeline/extractor/storage.ts index 7efbad1c..4c5c529a 100644 --- a/src/pipeline/extractor/storage.ts +++ b/src/pipeline/extractor/storage.ts @@ -10,7 +10,7 @@ interface StorageExtractorProps extends PipelineProcessorProps { } class StorageExtractor extends PipelineProcessor< - Promise, + StorageResponse, StorageExtractorProps > { get type(): ProcessorType { diff --git a/src/pipeline/pipeline.ts b/src/pipeline/pipeline.ts index 5bc87ad5..050d2d4e 100644 --- a/src/pipeline/pipeline.ts +++ b/src/pipeline/pipeline.ts @@ -3,13 +3,13 @@ import { ID } from '../util/id'; import log from '../util/log'; import { EventEmitter } from '../util/eventEmitter'; -interface PipelineEvents { +interface PipelineEvents { /** * Generic updated event. Triggers the callback function when the pipeline * is updated, including when a new processor is registered, a processor's props * get updated, etc. */ - updated: (processor: PipelineProcessor) => void; + updated: (processor: PipelineProcessor) => void; /** * Triggers the callback function when a new * processor is registered successfully @@ -27,27 +27,29 @@ interface PipelineEvents { * afterProcess will not be called if there is an * error in the pipeline (i.e a step throw an Error) */ - afterProcess: (prev: T) => void; + afterProcess: (prev: R) => void; /** * Triggers the callback function when the pipeline * fails to process all steps or at least one step * throws an Error */ - error: (prev: T) => void; + error: (prev: T) => void; } -class Pipeline extends EventEmitter> { +class Pipeline extends EventEmitter> { // available steps for this pipeline - private readonly _steps: Map[]> = - new Map[]>(); + private readonly _steps: Map< + ProcessorType, + PipelineProcessor[] + > = new Map[]>(); // used to cache the results of processors using their id field - private cache: Map = new Map(); + private cache: Map = new Map(); // keeps the index of the last updated processor in the registered // processors list and will be used to invalidate the cache // -1 means all new processors should be processed private lastProcessorIndexUpdated = -1; - constructor(steps?: PipelineProcessor[]) { + constructor(steps?: PipelineProcessor[]) { super(); if (steps) { @@ -59,7 +61,7 @@ class Pipeline extends EventEmitter> { * Clears the `cache` array */ clearCache(): void { - this.cache = new Map(); + this.cache = new Map(); this.lastProcessorIndexUpdated = -1; } @@ -69,7 +71,7 @@ class Pipeline extends EventEmitter> { * @param processor * @param priority */ - register( + register( processor: PipelineProcessor, priority: number = null, ): PipelineProcessor { @@ -99,7 +101,7 @@ class Pipeline extends EventEmitter> { * @param processor * @param priority */ - tryRegister( + tryRegister( processor: PipelineProcessor, priority: number = null, ): PipelineProcessor | undefined { @@ -117,7 +119,7 @@ class Pipeline extends EventEmitter> { * * @param processor */ - unregister(processor: PipelineProcessor): void { + unregister(processor: PipelineProcessor): void { if (!processor) return; if (this.findProcessorIndexByID(processor.id) === -1) return; @@ -138,7 +140,7 @@ class Pipeline extends EventEmitter> { * @param processor * @param priority */ - private addProcessorByPriority( + private addProcessorByPriority( processor: PipelineProcessor, priority: number, ): void { @@ -169,8 +171,8 @@ class Pipeline extends EventEmitter> { /** * Flattens the _steps Map and returns a list of steps with their correct priorities */ - get steps(): PipelineProcessor[] { - let steps: PipelineProcessor[] = []; + get steps(): PipelineProcessor[] { + let steps: PipelineProcessor[] = []; for (const type of this.getSortedProcessorTypes()) { const subSteps = this._steps.get(type); @@ -190,7 +192,7 @@ class Pipeline extends EventEmitter> { * * @param type */ - getStepsByType(type: ProcessorType): PipelineProcessor[] { + getStepsByType(type: ProcessorType): PipelineProcessor[] { return this.steps.filter((process) => process.type === type); } @@ -209,7 +211,7 @@ class Pipeline extends EventEmitter> { * * @param data */ - async process(data?: T): Promise { + async process(data?: R): Promise { const lastProcessorIndexUpdated = this.lastProcessorIndexUpdated; const steps = this.steps; @@ -224,11 +226,11 @@ class Pipeline extends EventEmitter> { // updated processor was before "processor". // This is to ensure that we always have correct and up to date // data from processors and also to skip them when necessary - prev = await processor.process(prev); + prev = (await processor.process(prev)) as R; this.cache.set(processor.id, prev); } else { // cached results already exist - prev = this.cache.get(processor.id); + prev = this.cache.get(processor.id) as R; } } } catch (e) { @@ -263,7 +265,9 @@ class Pipeline extends EventEmitter> { * This is used to invalid or skip a processor in * the process() method */ - private setLastProcessorIndex(processor: PipelineProcessor): void { + private setLastProcessorIndex( + processor: PipelineProcessor, + ): void { const processorIndex = this.findProcessorIndexByID(processor.id); if (this.lastProcessorIndexUpdated > processorIndex) { diff --git a/src/pipeline/processor.ts b/src/pipeline/processor.ts index 01835cef..47882915 100644 --- a/src/pipeline/processor.ts +++ b/src/pipeline/processor.ts @@ -16,8 +16,10 @@ export enum ProcessorType { Limit, } -interface PipelineProcessorEvents { - propsUpdated: (processor: PipelineProcessor) => void; +interface PipelineProcessorEvents { + propsUpdated: ( + processor: PipelineProcessor, + ) => void; beforeProcess: (...args) => void; afterProcess: (...args) => void; } @@ -28,7 +30,7 @@ export interface PipelineProcessorProps {} export abstract class PipelineProcessor< T, P extends Partial, -> extends EventEmitter> { +> extends EventEmitter { public readonly id: ID; private _props: P; diff --git a/src/view/container.tsx b/src/view/container.tsx index 35c8bde2..041908a6 100644 --- a/src/view/container.tsx +++ b/src/view/container.tsx @@ -8,9 +8,9 @@ import log from '../util/log'; import { useEffect } from 'preact/hooks'; import * as actions from './actions'; import { useStore } from '../hooks/useStore'; -import useSelector from '../../src/hooks/useSelector'; -import { useConfig } from '../../src/hooks/useConfig'; -import { throttle } from 'src/util/throttle'; +import useSelector from '../hooks/useSelector'; +import { useConfig } from '../hooks/useConfig'; +import { throttle } from '../util/throttle'; export function Container() { const config = useConfig(); diff --git a/src/view/plugin/pagination.tsx b/src/view/plugin/pagination.tsx index d8feb1b5..3f5e9abb 100644 --- a/src/view/plugin/pagination.tsx +++ b/src/view/plugin/pagination.tsx @@ -46,11 +46,15 @@ export function Pagination() { url: server.url, body: server.body, }); + + config.pipeline.register(processor.current); } else { processor.current = new PaginationLimit({ limit: limit, page: currentPage, }); + + config.pipeline.register(processor.current); } if (processor.current instanceof ServerPaginationLimit) { @@ -65,7 +69,6 @@ export function Pagination() { } config.pipeline.on('updated', onUpdate); - config.pipeline.register(processor.current); // we need to make sure that the state is set // to the default props when an error happens @@ -75,7 +78,7 @@ export function Pagination() { }); return () => { - config.pipeline.unregister(processor.current); + config.pipeline.unregister(processor.current); config.pipeline.off('updated', onUpdate); }; }, []); diff --git a/src/view/plugin/search/search.tsx b/src/view/plugin/search/search.tsx index 89593af2..c449e68d 100644 --- a/src/view/plugin/search/search.tsx +++ b/src/view/plugin/search/search.tsx @@ -67,9 +67,11 @@ export function Search() { }, [props]); useEffect(() => { - config.pipeline.register(processor); + if (processor) { + config.pipeline.register(processor); + } - return () => config.pipeline.unregister(processor); + return () => config.pipeline.unregister(processor); }, [config, processor]); const debouncedOnInput = useCallback(