diff --git a/src/subscriber.ts b/src/subscriber.ts index 731c018f5..fd3dc30a4 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -993,11 +993,16 @@ export class Subscriber extends EventEmitter { const {maxStreams = defaultOptions.subscription.maxStreams} = options.streamingOptions; + options.streamingOptions.maxStreams = Math.min( maxStreams, this.maxMessages ); } + + if (this._inventory) { + this._inventory.setOptions(this._options.flowControl!); + } } /** diff --git a/test/subscriber.ts b/test/subscriber.ts index f6bc4d2e2..7ab85ab67 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -95,6 +95,8 @@ class FakeLeaseManager extends EventEmitter { } // eslint-disable-next-line @typescript-eslint/no-unused-vars add(message: s.Message): void {} + // eslint-disable-next-line @typescript-eslint/no-unused-vars + setOptions(options: FlowControlOptions): void {} clear(): s.Message[] { return []; } @@ -115,6 +117,8 @@ class FakeQueue { async add(message: s.Message, deadline?: number): Promise { return s.AckResponses.Success; } + // eslint-disable-next-line @typescript-eslint/no-unused-vars + setOptions(options: BatchOptions) {} async flush(): Promise {} async onFlush(): Promise {} async onDrain(): Promise {}