Skip to content

Commit

Permalink
Throw error when duplicate processor is being registered + process pr…
Browse files Browse the repository at this point in the history
…ops deep equal check
  • Loading branch information
afshinm committed Dec 30, 2023
1 parent 451a912 commit c7d585c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
37 changes: 32 additions & 5 deletions src/pipeline/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
// -1 means all new processors should be processed
private lastProcessorIndexUpdated = -1;

constructor(steps?: PipelineProcessor<any, any>[]) {
constructor(steps?: PipelineProcessor<T, P>[]) {
super();

if (steps) {
Expand All @@ -70,29 +70,56 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* @param priority
*/
register(
processor: PipelineProcessor<any, any>,
processor: PipelineProcessor<T, P>,
priority: number = null,
): void {
if (!processor) return;
): PipelineProcessor<T, P> {
if (!processor) {
throw Error('Processor is not defined');
}

if (processor.type === null) {
throw Error('Processor type is not defined');
}

if (this.findProcessorIndexByID(processor.id) > -1) {
throw Error(`Processor ID ${processor.id} is already defined`);
}

// binding the propsUpdated callback to the Pipeline
processor.on('propsUpdated', this.processorPropsUpdated.bind(this));

this.addProcessorByPriority(processor, priority);
this.afterRegistered(processor);

return processor;
}

/**
* Tries to register a new processor
* @param processor
* @param priority
*/
tryRegister(
processor: PipelineProcessor<T, P>,
priority: number = null,
): PipelineProcessor<T, P> | undefined {
try {
return this.register(processor, priority);
} catch (_) {
// noop
}

return undefined;
}

/**
* Removes a processor from the list
*
* @param processor
*/
unregister(processor: PipelineProcessor<any, any>): void {
unregister(processor: PipelineProcessor<T, P>): void {
if (!processor) return;
if (this.findProcessorIndexByID(processor.id) === -1) return;

const subSteps = this._steps.get(processor.type);

Expand Down
15 changes: 12 additions & 3 deletions src/pipeline/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// e.g. Extractor = 0 will be processed before Transformer = 1
import { generateUUID, ID } from '../util/id';
import { EventEmitter } from '../util/eventEmitter';
import { deepEqual } from '../util/deepEqual';

export enum ProcessorType {
Initiator,
Expand Down Expand Up @@ -29,7 +30,7 @@ export abstract class PipelineProcessor<
P extends Partial<PipelineProcessorProps>,
> extends EventEmitter<PipelineProcessorEvents<T, P>> {
public readonly id: ID;
private readonly _props: P;
private _props: P;

abstract get type(): ProcessorType;
protected abstract _process(...args): T | Promise<T>;
Expand Down Expand Up @@ -62,8 +63,16 @@ export abstract class PipelineProcessor<
}

setProps(props: Partial<P>): this {
Object.assign(this._props, props);
this.emit('propsUpdated', this);
const updatedProps = {
...this._props,
...props,
};

if (!deepEqual(updatedProps, this._props)) {
this._props = updatedProps;
this.emit('propsUpdated', this);
}

return this;
}

Expand Down

0 comments on commit c7d585c

Please sign in to comment.