diff --git a/lib/sitemap-index-stream.ts b/lib/sitemap-index-stream.ts index 16160e9..af9c823 100644 --- a/lib/sitemap-index-stream.ts +++ b/lib/sitemap-index-stream.ts @@ -1,8 +1,8 @@ +import { WriteStream } from 'fs'; import { Transform, TransformOptions, TransformCallback } from 'stream'; import { IndexItem, SitemapItemLoose, ErrorLevel } from './types'; import { SitemapStream, stylesheetInclude } from './sitemap-stream'; import { element, otag, ctag } from './sitemap-xml'; -import { WriteStream } from 'fs'; export enum IndexTagNames { sitemap = 'sitemap', @@ -16,17 +16,57 @@ const sitemapIndexTagStart = ''; const closetag = ''; +/** + * Options for the SitemapIndexStream + */ export interface SitemapIndexStreamOptions extends TransformOptions { + /** + * Whether to output the lastmod date only (no time) + * + * @default false + */ lastmodDateOnly?: boolean; + + /** + * How to handle errors in passed in urls + * + * @default ErrorLevel.WARN + */ level?: ErrorLevel; + + /** + * URL to an XSL stylesheet to include in the XML + */ xslUrl?: string; } const defaultStreamOpts: SitemapIndexStreamOptions = {}; + +/** + * `SitemapIndexStream` is a Transform stream that takes `IndexItem`s or sitemap URL strings and outputs a stream of sitemap index XML. + * + * It automatically handles the XML declaration and the opening and closing tags for the sitemap index. + * + * ⚠️ CAUTION: This object is `readable` and must be read (e.g. piped to a file or to /dev/null) + * before `finish` will be emitted. Failure to read the stream will result in hangs. + * + * @extends {Transform} + */ export class SitemapIndexStream extends Transform { lastmodDateOnly: boolean; level: ErrorLevel; xslUrl?: string; private hasHeadOutput: boolean; + + /** + * `SitemapIndexStream` is a Transform stream that takes `IndexItem`s or sitemap URL strings and outputs a stream of sitemap index XML. + * + * It automatically handles the XML declaration and the opening and closing tags for the sitemap index. + * + * ⚠️ CAUTION: This object is `readable` and must be read (e.g. piped to a file or to /dev/null) + * before `finish` will be emitted. Failure to read the stream will result in hangs. + * + * @param {SitemapIndexStreamOptions} [opts=defaultStreamOpts] - Stream options. + */ constructor(opts = defaultStreamOpts) { opts.objectMode = true; super(opts); @@ -36,18 +76,22 @@ export class SitemapIndexStream extends Transform { this.xslUrl = opts.xslUrl; } + private writeHeadOutput(): void { + this.hasHeadOutput = true; + let stylesheet = ''; + if (this.xslUrl) { + stylesheet = stylesheetInclude(this.xslUrl); + } + this.push(xmlDec + stylesheet + sitemapIndexTagStart); + } + _transform( item: IndexItem | string, encoding: string, callback: TransformCallback ): void { if (!this.hasHeadOutput) { - this.hasHeadOutput = true; - let stylesheet = ''; - if (this.xslUrl) { - stylesheet = stylesheetInclude(this.xslUrl); - } - this.push(xmlDec + stylesheet + sitemapIndexTagStart); + this.writeHeadOutput(); } this.push(otag(IndexTagNames.sitemap)); if (typeof item === 'string') { @@ -69,83 +113,199 @@ export class SitemapIndexStream extends Transform { } _flush(cb: TransformCallback): void { + if (!this.hasHeadOutput) { + this.writeHeadOutput(); + } + this.push(closetag); cb(); } } -type getSitemapStream = ( +type getSitemapStreamFunc = ( i: number ) => [IndexItem | string, SitemapStream, WriteStream]; +/** + * Options for the SitemapAndIndexStream + * + * @extends {SitemapIndexStreamOptions} + */ export interface SitemapAndIndexStreamOptions extends SitemapIndexStreamOptions { - level?: ErrorLevel; + /** + * Max number of items in each sitemap XML file. + * + * When the limit is reached the current sitemap file will be closed, + * a wait for `finish` on the target write stream will happen, + * and a new sitemap file will be created. + * + * Range: 1 - 50,000 + * + * @default 45000 + */ limit?: number; - getSitemapStream: getSitemapStream; + + /** + * Callback for SitemapIndexAndStream that creates a new sitemap stream for a given sitemap index. + * + * Called when a new sitemap file is needed. + * + * The write stream is the destination where the sitemap was piped. + * SitemapAndIndexStream will wait for the `finish` event on each sitemap's + * write stream before moving on to the next sitemap. This ensures that the + * contents of the write stream will be fully written before being used + * by any following operations (e.g. uploading, reading contents for unit tests). + * + * @param i - The index of the sitemap file + * @returns A tuple containing the index item to be written into the sitemap index, the sitemap stream, and the write stream for the sitemap pipe destination + */ + getSitemapStream: getSitemapStreamFunc; } -// const defaultSIStreamOpts: SitemapAndIndexStreamOptions = {}; + +/** + * `SitemapAndIndexStream` is a Transform stream that takes in sitemap items, + * writes them to sitemap files, adds the sitemap files to a sitemap index, + * and creates new sitemap files when the count limit is reached. + * + * It waits for the target stream of the current sitemap file to finish before + * moving on to the next if the target stream is returned by the `getSitemapStream` + * callback in the 3rd position of the tuple. + * + * ⚠️ CAUTION: This object is `readable` and must be read (e.g. piped to a file or to /dev/null) + * before `finish` will be emitted. Failure to read the stream will result in hangs. + * + * @extends {SitemapIndexStream} + */ export class SitemapAndIndexStream extends SitemapIndexStream { - private i: number; - private getSitemapStream: getSitemapStream; - private currentSitemap: SitemapStream; - private currentSitemapPipeline?: WriteStream; - private idxItem: IndexItem | string; + private itemsWritten: number; + private getSitemapStream: getSitemapStreamFunc; + private currentSitemap?: SitemapStream; private limit: number; + private currentSitemapPipeline?: WriteStream; + + /** + * `SitemapAndIndexStream` is a Transform stream that takes in sitemap items, + * writes them to sitemap files, adds the sitemap files to a sitemap index, + * and creates new sitemap files when the count limit is reached. + * + * It waits for the target stream of the current sitemap file to finish before + * moving on to the next if the target stream is returned by the `getSitemapStream` + * callback in the 3rd position of the tuple. + * + * ⚠️ CAUTION: This object is `readable` and must be read (e.g. piped to a file or to /dev/null) + * before `finish` will be emitted. Failure to read the stream will result in hangs. + * + * @param {SitemapAndIndexStreamOptions} opts - Stream options. + */ constructor(opts: SitemapAndIndexStreamOptions) { opts.objectMode = true; super(opts); - this.i = 0; + this.itemsWritten = 0; this.getSitemapStream = opts.getSitemapStream; - [this.idxItem, this.currentSitemap, this.currentSitemapPipeline] = - this.getSitemapStream(0); this.limit = opts.limit ?? 45000; } - _writeSMI(item: SitemapItemLoose, callback: () => void): void { - this.i++; - if (!this.currentSitemap.write(item)) { - this.currentSitemap.once('drain', callback); - } else { - process.nextTick(callback); - } - } - _transform( item: SitemapItemLoose, encoding: string, callback: TransformCallback ): void { - if (this.i === 0) { - this._writeSMI(item, () => - super._transform(this.idxItem, encoding, callback) - ); - } else if (this.i % this.limit === 0) { - const onFinish = () => { - const [idxItem, currentSitemap, currentSitemapPipeline] = - this.getSitemapStream(this.i / this.limit); - this.currentSitemap = currentSitemap; - this.currentSitemapPipeline = currentSitemapPipeline; - // push to index stream - this._writeSMI(item, () => - // push to index stream - super._transform(idxItem, encoding, callback) - ); - }; - this.currentSitemapPipeline?.on('finish', onFinish); - this.currentSitemap.end( - !this.currentSitemapPipeline ? onFinish : undefined - ); + if (this.itemsWritten % this.limit === 0) { + if (this.currentSitemap) { + const onFinish = new Promise((resolve, reject) => { + this.currentSitemap?.on('finish', resolve); + this.currentSitemap?.on('error', reject); + this.currentSitemap?.end(); + }); + + const onPipelineFinish = this.currentSitemapPipeline + ? new Promise((resolve, reject) => { + this.currentSitemapPipeline?.on('finish', resolve); + this.currentSitemapPipeline?.on('error', reject); + }) + : Promise.resolve(); + + Promise.all([onFinish, onPipelineFinish]) + .then(() => { + this.createSitemap(encoding); + this.writeItem(item, callback); + }) + .catch(callback); + return; + } else { + this.createSitemap(encoding); + } + } + + this.writeItem(item, callback); + } + + private writeItem(item: SitemapItemLoose, callback: TransformCallback): void { + if (!this.currentSitemap) { + callback(new Error('No sitemap stream available')); + return; + } + + if (!this.currentSitemap.write(item)) { + this.currentSitemap.once('drain', callback); } else { - this._writeSMI(item, callback); + process.nextTick(callback); } + + // Increment the count of items written + this.itemsWritten++; } + /** + * Called when the stream is finished. + * If there is a current sitemap, we wait for it to finish before calling the callback. + * + * @param cb + */ _flush(cb: TransformCallback): void { - const onFinish = () => super._flush(cb); - this.currentSitemapPipeline?.on('finish', onFinish); - this.currentSitemap.end( - !this.currentSitemapPipeline ? onFinish : undefined - ); + const onFinish = new Promise((resolve, reject) => { + if (this.currentSitemap) { + this.currentSitemap.on('finish', resolve); + this.currentSitemap.on('error', reject); + this.currentSitemap.end(); + } else { + resolve(); + } + }); + + const onPipelineFinish = new Promise((resolve, reject) => { + if (this.currentSitemapPipeline) { + this.currentSitemapPipeline.on('finish', resolve); + this.currentSitemapPipeline.on('error', reject); + // The pipeline (pipe target) will get it's end() call + // from the sitemap stream ending. + } else { + resolve(); + } + }); + + Promise.all([onFinish, onPipelineFinish]) + .then(() => { + super._flush(cb); + }) + .catch((err) => { + cb(err); + }); + } + + private createSitemap(encoding: string): void { + const [idxItem, currentSitemap, currentSitemapPipeline] = + this.getSitemapStream(this.itemsWritten / this.limit); + currentSitemap.on('error', (err) => this.emit('error', err)); + this.currentSitemap = currentSitemap; + this.currentSitemapPipeline = currentSitemapPipeline; + super._transform(idxItem, encoding, () => { + // We are not too fussed about waiting for the index item to be written + // we we'll wait for the file to finish at the end + // and index file write volume tends to be small in comprarison to sitemap + // writes. + // noop + }); } } diff --git a/tests/sitemap-index.test.ts b/tests/sitemap-index.test.ts index 64937b7..3dc79e8 100644 --- a/tests/sitemap-index.test.ts +++ b/tests/sitemap-index.test.ts @@ -1,6 +1,6 @@ import { SitemapStream } from '../index'; import { tmpdir } from 'os'; -import { resolve } from 'path'; +import { join, resolve } from 'path'; import { existsSync, unlinkSync, @@ -11,8 +11,13 @@ import { SitemapIndexStream, SitemapAndIndexStream, } from '../lib/sitemap-index-stream'; -import { streamToPromise } from '../dist'; -import { WriteStream } from 'node:fs'; +import { streamToPromise } from '../lib/sitemap-stream'; +import { finished as finishedCallback } from 'stream'; +import { readFileSync, WriteStream } from 'fs'; +import { promisify } from 'util'; + +const finished = promisify(finishedCallback); + /* eslint-env jest, jasmine */ function removeFilesArray(files): void { if (files && files.length) { @@ -129,6 +134,8 @@ describe('sitemapAndIndex', () => { resolve(targetFolder, `./sitemap-1.xml`), resolve(targetFolder, `./sitemap-2.xml`), resolve(targetFolder, `./sitemap-3.xml`), + resolve(targetFolder, `./sitemap-4.xml`), + resolve(targetFolder, `./sitemap-index.xml`), ]); }); @@ -138,6 +145,8 @@ describe('sitemapAndIndex', () => { resolve(targetFolder, `./sitemap-1.xml`), resolve(targetFolder, `./sitemap-2.xml`), resolve(targetFolder, `./sitemap-3.xml`), + resolve(targetFolder, `./sitemap-4.xml`), + resolve(targetFolder, `./sitemap-index.xml`), ]); }); @@ -150,7 +159,15 @@ describe('sitemapAndIndex', () => { const sm = new SitemapStream(); const path = `./sitemap-${i}.xml`; - const ws = sm.pipe(createWriteStream(resolve(targetFolder, path))); + const outputStream = createWriteStream(resolve(targetFolder, path)); + + // Streams do not automatically propagate errors + // We must propagate this up to the SitemapStream + outputStream.on('error', (err) => { + sm.emit('error', err); + }); + + const ws = sm.pipe(outputStream); return [new URL(path, baseURL).toString(), sm, ws]; }, }); @@ -175,4 +192,326 @@ describe('sitemapAndIndex', () => { ); expect(xml.toString()).toContain('https://1.example.com/a'); }); + + it('propagates error from sitemap stream that cannot be written', async () => { + const baseURL = 'https://example.com/sub/'; + + const sms = new SitemapAndIndexStream({ + limit: 1, + getSitemapStream: (i: number): [string, SitemapStream, WriteStream] => { + const sm = new SitemapStream(); + const path = `./sitemap-${i}.xml`; + + // This will not throw even though it will fail + // `outputStream.writable === true` + // `outputStream.closed === false` + const outputStream = createWriteStream( + resolve(join(targetFolder, 'does', 'not', 'exist'), path) + ); + + // Streams do not automatically propagate errors + // We must propagate this up to the SitemapStream + outputStream.on('error', (err) => { + sm.emit('error', err); + }); + + const ws = sm.pipe(outputStream); + return [new URL(path, baseURL).toString(), sm, ws]; + }, + }); + sms.write('https://1.example.com/a'); + sms.write('https://2.example.com/a'); + sms.write('https://3.example.com/a'); + sms.write('https://4.example.com/a'); + sms.end(); + await expect(finished(sms)).rejects.toThrow( + 'ENOENT: no such file or directory, open' + ); + + expect( + existsSync( + resolve(join(targetFolder, 'does', 'not', 'exist'), `./sitemap-0.xml`) + ) + ).toBe(false); + }); + + it('writes to index file', async () => { + const baseURL = 'https://example.com/sub/'; + + const sms = new SitemapAndIndexStream({ + limit: 2, + getSitemapStream: (i: number): [string, SitemapStream, WriteStream] => { + const sm = new SitemapStream(); + const path = `./sitemap-${i}.xml`; + + // This will not throw even though it will fail + // `outputStream.writable === true` + // `outputStream.closed === false` + const outputStream = createWriteStream(resolve(targetFolder, path)); + + // Streams do not automatically propagate errors + // We must propagate this up to the SitemapStream + outputStream.on('error', (err) => { + sm.emit('error', err); + }); + + const ws = sm.pipe(outputStream); + return [new URL(path, baseURL).toString(), sm, ws]; + }, + }); + + // Pipe the index stream to a file + const indexStream = createWriteStream( + resolve(targetFolder, `./sitemap-index.xml`) + ); + sms.pipe(indexStream); + await writeData(sms, 'https://1.example.com/a'); + await writeData(sms, 'https://2.example.com/a'); + await writeData(sms, 'https://3.example.com/a'); + sms.end(); + await expect(finished(sms)).resolves.toBeUndefined(); + + await finished(indexStream); + + expect(existsSync(resolve(targetFolder, `./sitemap-index.xml`))).toBe(true); + expect(existsSync(resolve(targetFolder, `./sitemap-0.xml`))).toBe(true); + expect(existsSync(resolve(targetFolder, `./sitemap-1.xml`))).toBe(true); + expect(existsSync(resolve(targetFolder, `./sitemap-2.xml`))).toBe(false); + + // Read the first sitemap to make sure it was written + const sitemap0 = await streamToPromise( + createReadStream(resolve(targetFolder, `./sitemap-0.xml`)) + ); + expect(sitemap0.toString()).toContain('https://1.example.com/a'); + + // Read the last sitemap to make sure it was written + const sitemap1 = await streamToPromise( + createReadStream(resolve(targetFolder, `./sitemap-1.xml`)) + ); + expect(sitemap1.toString()).toContain('https://3.example.com/a'); + + // Read the index to make sure it was written + const indexText = readFileSync( + resolve(targetFolder, `./sitemap-index.xml`), + 'utf-8' + ); + expect(indexText).toContain(`${baseURL}sitemap-0`); + expect(indexText).toContain(`${baseURL}sitemap-1`); + expect(indexText).not.toContain(`${baseURL}sitemap-2`); + }); + + it('does not hang if last sitemap is filled', async () => { + const baseURL = 'https://example.com/sub/'; + + const sms = new SitemapAndIndexStream({ + limit: 2, + getSitemapStream: (i: number): [string, SitemapStream, WriteStream] => { + const sm = new SitemapStream(); + const path = `./sitemap-${i}.xml`; + + // This will not throw even though it will fail + // `outputStream.writable === true` + // `outputStream.closed === false` + const outputStream = createWriteStream(resolve(targetFolder, path)); + + // Streams do not automatically propagate errors + // We must propagate this up to the SitemapStream + outputStream.on('error', (err) => { + sm.emit('error', err); + }); + + const ws = sm.pipe(outputStream); + return [new URL(path, baseURL).toString(), sm, ws]; + }, + }); + + // Pipe the index stream to a file + const indexStream = createWriteStream( + resolve(targetFolder, `./sitemap-index.xml`) + ); + sms.pipe(indexStream); + await writeData(sms, 'https://1.example.com/a'); + await writeData(sms, 'https://2.example.com/a'); + sms.end(); + await expect(finished(sms)).resolves.toBeUndefined(); + + await finished(indexStream); + + expect(existsSync(resolve(targetFolder, `./sitemap-index.xml`))).toBe(true); + expect(existsSync(resolve(targetFolder, `./sitemap-0.xml`))).toBe(true); + expect(existsSync(resolve(targetFolder, `./sitemap-1.xml`))).toBe(false); + + const sitemap0Raw = readFileSync( + resolve(targetFolder, `./sitemap-0.xml`), + 'utf-8' + ); + expect(sitemap0Raw).toContain('https://1.example.com/a'); + expect(sitemap0Raw).toContain('https://2.example.com/a'); + expect(sitemap0Raw).not.toContain('https://3.example.com/a'); + + // Read the first sitemap to make sure it was written + const sitemap0 = await streamToPromise( + createReadStream(resolve(targetFolder, `./sitemap-0.xml`)) + ); + expect(sitemap0.toString()).toContain('https://1.example.com/a'); + + // Read the index to make sure it was written + const indexText = readFileSync( + resolve(targetFolder, `./sitemap-index.xml`), + 'utf-8' + ); + expect(indexText).toContain(`${baseURL}sitemap-0`); + expect(indexText).not.toContain(`${baseURL}sitemap-1`); + }); + + it('deterministically finishes writing each sitemap file before creating a new one', async () => { + const baseURL = 'https://example.com/sub/'; + + const sms = new SitemapAndIndexStream({ + limit: 5000, + getSitemapStream: (i: number): [string, SitemapStream, WriteStream] => { + const sm = new SitemapStream(); + const path = `./sitemap-${i}.xml`; + + const outputStream = createWriteStream(resolve(targetFolder, path)); + + // Streams do not automatically propagate errors + // We must propagate this up to the SitemapStream + outputStream.on('error', (err) => { + sm.emit('error', err); + }); + + const ws = sm.pipe(outputStream); + return [new URL(path, baseURL).toString(), sm, ws]; + }, + }); + + // Pipe the index stream to a file + const indexStream = createWriteStream( + resolve(targetFolder, `./sitemap-index.xml`) + ); + sms.pipe(indexStream); + for (let i = 0; i < 5000; i++) { + // Intentionally write while ignoring back pressure to stress test + // the rolling to new files + sms.write(`https://1.example.com/a${i}`); + } + for (let i = 0; i < 5000; i++) { + sms.write(`https://2.example.com/a${i}`); + } + for (let i = 0; i < 1; i++) { + sms.write(`https://3.example.com/a${i}`); + } + sms.end(); + await expect(finished(sms)).resolves.toBeUndefined(); + + await finished(indexStream); + + expect(existsSync(resolve(targetFolder, `./sitemap-index.xml`))).toBe(true); + expect(existsSync(resolve(targetFolder, `./sitemap-0.xml`))).toBe(true); + expect(existsSync(resolve(targetFolder, `./sitemap-1.xml`))).toBe(true); + expect(existsSync(resolve(targetFolder, `./sitemap-2.xml`))).toBe(true); + expect(existsSync(resolve(targetFolder, `./sitemap-3.xml`))).toBe(false); + + // Make sure the very first file is completed + const sitemap0Raw = readFileSync( + resolve(targetFolder, `./sitemap-0.xml`), + 'utf-8' + ); + expect(sitemap0Raw).toContain(''); + expect(sitemap0Raw).toContain('https://1.example.com/a0'); + expect(sitemap0Raw).toContain('https://1.example.com/a4999'); + expect(sitemap0Raw).toContain(''); + + // Make sure the first rolled file is completed + const sitemap1Raw = readFileSync( + resolve(targetFolder, `./sitemap-1.xml`), + 'utf-8' + ); + expect(sitemap1Raw).toContain(''); + expect(sitemap1Raw).toContain('https://2.example.com/a0'); + expect(sitemap1Raw).toContain('https://2.example.com/a4999'); + expect(sitemap1Raw).toContain(''); + + // Make sure the last file is completed + const sitemap2Raw = readFileSync( + resolve(targetFolder, `./sitemap-2.xml`), + 'utf-8' + ); + expect(sitemap2Raw).toContain(''); + expect(sitemap2Raw).toContain('https://3.example.com/a0'); + expect(sitemap2Raw).toContain(''); + expect(sitemap2Raw).not.toContain('https://3.example.com/a1'); + + // Read the index to make sure it was written + const indexText = readFileSync( + resolve(targetFolder, `./sitemap-index.xml`), + 'utf-8' + ); + expect(indexText).toContain(''); + expect(indexText).not.toContain(`${baseURL}sitemap-3`); + }); + + it('writes index if no items written at all', async () => { + const baseURL = 'https://example.com/sub/'; + + const sms = new SitemapAndIndexStream({ + limit: 2, + getSitemapStream: (i: number): [string, SitemapStream, WriteStream] => { + const sm = new SitemapStream(); + const path = `./sitemap-${i}.xml`; + + const outputStream = createWriteStream(resolve(targetFolder, path)); + + // Streams do not automatically propagate errors + // We must propagate this up to the SitemapStream + outputStream.on('error', (err) => { + sm.emit('error', err); + }); + + const ws = sm.pipe(outputStream); + return [new URL(path, baseURL).toString(), sm, ws]; + }, + }); + + // Pipe the index stream to a file + const indexStream = createWriteStream( + resolve(targetFolder, `./sitemap-index.xml`) + ); + sms.pipe(indexStream); + sms.end(); + await expect(finished(sms)).resolves.toBeUndefined(); + + await finished(indexStream); + + expect(existsSync(resolve(targetFolder, `./sitemap-index.xml`))).toBe(true); + expect(existsSync(resolve(targetFolder, `./sitemap-0.xml`))).toBe(false); + + // Read the index to make sure it was written + const indexText = readFileSync( + resolve(targetFolder, `./sitemap-index.xml`), + 'utf-8' + ); + expect(indexText).toContain(``); + expect(indexText).not.toContain(`${baseURL}sitemap-2`); + }); }); + +function writeData( + sms: SitemapStream | SitemapAndIndexStream, + data +): Promise { + if (!sms.write(data)) { + return new Promise((resolve) => { + sms.once('drain', resolve); + }); + } + return Promise.resolve(); +}