From 6d9d10b26424e55713d30f239273d5743499142a Mon Sep 17 00:00:00 2001 From: Harold Hunt Date: Tue, 21 May 2024 20:48:24 -0400 Subject: [PATCH] Issue-423 - Fix failing tests --- lib/sitemap-index-stream.ts | 178 +++++++++++++++++++++++++----------- tests/sitemap-index.test.ts | 2 +- 2 files changed, 127 insertions(+), 53 deletions(-) diff --git a/lib/sitemap-index-stream.ts b/lib/sitemap-index-stream.ts index c75d966..7421bc7 100644 --- a/lib/sitemap-index-stream.ts +++ b/lib/sitemap-index-stream.ts @@ -1,8 +1,8 @@ +import { WriteStream } from 'fs'; import { Transform, TransformOptions, TransformCallback } from 'stream'; import { IndexItem, SitemapItemLoose, ErrorLevel } from './types'; import { SitemapStream, stylesheetInclude } from './sitemap-stream'; import { element, otag, ctag } from './sitemap-xml'; -import { WriteStream } from 'fs'; export enum IndexTagNames { sitemap = 'sitemap', @@ -36,18 +36,22 @@ export class SitemapIndexStream extends Transform { this.xslUrl = opts.xslUrl; } + private writeHeadOutput(): void { + this.hasHeadOutput = true; + let stylesheet = ''; + if (this.xslUrl) { + stylesheet = stylesheetInclude(this.xslUrl); + } + this.push(xmlDec + stylesheet + sitemapIndexTagStart); + } + _transform( item: IndexItem | string, encoding: string, callback: TransformCallback ): void { if (!this.hasHeadOutput) { - this.hasHeadOutput = true; - let stylesheet = ''; - if (this.xslUrl) { - stylesheet = stylesheetInclude(this.xslUrl); - } - this.push(xmlDec + stylesheet + sitemapIndexTagStart); + this.writeHeadOutput(); } this.push(otag(IndexTagNames.sitemap)); if (typeof item === 'string') { @@ -69,11 +73,29 @@ export class SitemapIndexStream extends Transform { } _flush(cb: TransformCallback): void { + if (!this.hasHeadOutput) { + this.writeHeadOutput(); + } + this.push(closetag); cb(); } } +/** + * Callback for SitemapIndexAndStream that creates a new sitemap stream for a given sitemap index. + * + * Called when a new sitemap file is needed. + * + * The write stream is the destination where the sitemap was piped. + * SitemapAndIndexStream will wait for the `finish` event on each sitemap's + * write stream before moving on to the next sitemap. This ensures that the + * contents of the write stream will be fully written before being used + * by any following operations (e.g. uploading, reading contents for unit tests). + * + * @param i - The index of the sitemap file + * @returns A tuple containing the index item to be written into the sitemap index, the sitemap stream, and the write stream for the sitemap pipe destination + */ type getSitemapStream = ( i: number ) => [IndexItem | string, SitemapStream, WriteStream]; @@ -84,70 +106,122 @@ export interface SitemapAndIndexStreamOptions limit?: number; getSitemapStream: getSitemapStream; } -// const defaultSIStreamOpts: SitemapAndIndexStreamOptions = {}; + export class SitemapAndIndexStream extends SitemapIndexStream { - private i: number; + private itemsWritten: number; private getSitemapStream: getSitemapStream; - private currentSitemap: SitemapStream; - private currentSitemapPipeline?: WriteStream; - private idxItem: IndexItem | string; + private currentSitemap?: SitemapStream; private limit: number; + private currentSitemapPipeline?: WriteStream; + constructor(opts: SitemapAndIndexStreamOptions) { opts.objectMode = true; super(opts); - this.i = 0; + this.itemsWritten = 0; this.getSitemapStream = opts.getSitemapStream; - [this.idxItem, this.currentSitemap, this.currentSitemapPipeline] = - this.getSitemapStream(0); - this.currentSitemap.on('error', (err) => this.emit('error', err)); this.limit = opts.limit ?? 45000; } - _writeSMI(item: SitemapItemLoose, callback: () => void): void { - this.i++; - if (!this.currentSitemap.write(item)) { - this.currentSitemap.once('drain', callback); - } else { - process.nextTick(callback); - } - } - _transform( item: SitemapItemLoose, encoding: string, callback: TransformCallback ): void { - if (this.i === 0) { - this._writeSMI(item, () => - super._transform(this.idxItem, encoding, callback) - ); - } else if (this.i % this.limit === 0) { - const onFinish = () => { - const [idxItem, currentSitemap, currentSitemapPipeline] = - this.getSitemapStream(this.i / this.limit); - currentSitemap.on('error', (err) => this.emit('error', err)); - this.currentSitemap = currentSitemap; - this.currentSitemapPipeline = currentSitemapPipeline; - // push to index stream - this._writeSMI(item, () => - // push to index stream - super._transform(idxItem, encoding, callback) - ); - }; - this.currentSitemapPipeline?.on('finish', onFinish); - this.currentSitemap.end( - !this.currentSitemapPipeline ? onFinish : undefined - ); + if (this.itemsWritten % this.limit === 0) { + if (this.currentSitemap) { + const onFinish = new Promise((resolve, reject) => { + this.currentSitemap?.on('finish', resolve); + this.currentSitemap?.on('error', reject); + this.currentSitemap?.end(); + }); + + const onPipelineFinish = this.currentSitemapPipeline + ? new Promise((resolve, reject) => { + this.currentSitemapPipeline?.on('finish', resolve); + this.currentSitemapPipeline?.on('error', reject); + }) + : Promise.resolve(); + + Promise.all([onFinish, onPipelineFinish]) + .then(() => { + this.createSitemap(encoding); + this.writeItem(item, callback); + }) + .catch(callback); + return; + } else { + this.createSitemap(encoding); + } + } + + this.writeItem(item, callback); + } + + private writeItem(item: SitemapItemLoose, callback: TransformCallback): void { + if (!this.currentSitemap) { + callback(new Error('No sitemap stream available')); + return; + } + + if (!this.currentSitemap.write(item)) { + this.currentSitemap.once('drain', callback); } else { - this._writeSMI(item, callback); + process.nextTick(callback); } + + // Increment the count of items written + this.itemsWritten++; } + /** + * Called when the stream is finished. + * If there is a current sitemap, we wait for it to finish before calling the callback. + * + * @param cb + */ _flush(cb: TransformCallback): void { - const onFinish = () => super._flush(cb); - this.currentSitemapPipeline?.on('finish', onFinish); - this.currentSitemap.end( - !this.currentSitemapPipeline ? onFinish : undefined - ); + const onFinish = new Promise((resolve, reject) => { + if (this.currentSitemap) { + this.currentSitemap.on('finish', resolve); + this.currentSitemap.on('error', reject); + this.currentSitemap.end(); + } else { + resolve(); + } + }); + + const onPipelineFinish = new Promise((resolve, reject) => { + if (this.currentSitemapPipeline) { + this.currentSitemapPipeline.on('finish', resolve); + this.currentSitemapPipeline.on('error', reject); + // The pipeline (pipe target) will get it's end() call + // from the sitemap stream ending. + } else { + resolve(); + } + }); + + Promise.all([onFinish, onPipelineFinish]) + .then(() => { + super._flush(cb); + }) + .catch((err) => { + cb(err); + }); + } + + private createSitemap(encoding: string): void { + const [idxItem, currentSitemap, currentSitemapPipeline] = + this.getSitemapStream(this.itemsWritten / this.limit); + currentSitemap.on('error', (err: any) => this.emit('error', err)); + this.currentSitemap = currentSitemap; + this.currentSitemapPipeline = currentSitemapPipeline; + super._transform(idxItem, encoding, () => { + // We are not too fussed about waiting for the index item to be written + // we we'll wait for the file to finish at the end + // and index file write volume tends to be small in comprarison to sitemap + // writes. + // noop + }); } } diff --git a/tests/sitemap-index.test.ts b/tests/sitemap-index.test.ts index 6718779..c4b78eb 100644 --- a/tests/sitemap-index.test.ts +++ b/tests/sitemap-index.test.ts @@ -506,7 +506,7 @@ describe('sitemapAndIndex', () => { function writeData( sms: SitemapStream | SitemapAndIndexStream, - data: any + data ): Promise { if (!sms.write(data)) { return new Promise((resolve) => {