Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue-423 - Propagate errors in SitemapAndIndexStream #424

Merged
merged 7 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 214 additions & 54 deletions lib/sitemap-index-stream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { WriteStream } from 'fs';
import { Transform, TransformOptions, TransformCallback } from 'stream';
import { IndexItem, SitemapItemLoose, ErrorLevel } from './types';
import { SitemapStream, stylesheetInclude } from './sitemap-stream';
import { element, otag, ctag } from './sitemap-xml';
import { WriteStream } from 'fs';

export enum IndexTagNames {
sitemap = 'sitemap',
Expand All @@ -16,17 +16,57 @@ const sitemapIndexTagStart =
'<sitemapindex xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">';
const closetag = '</sitemapindex>';

/**
* Options for the SitemapIndexStream
*/
export interface SitemapIndexStreamOptions extends TransformOptions {
/**
* Whether to output the lastmod date only (no time)
*
* @default false
*/
lastmodDateOnly?: boolean;

/**
* How to handle errors in passed in urls
*
* @default ErrorLevel.WARN
*/
level?: ErrorLevel;

/**
* URL to an XSL stylesheet to include in the XML
*/
xslUrl?: string;
}
const defaultStreamOpts: SitemapIndexStreamOptions = {};

/**
* `SitemapIndexStream` is a Transform stream that takes `IndexItem`s or sitemap URL strings and outputs a stream of sitemap index XML.
*
* It automatically handles the XML declaration and the opening and closing tags for the sitemap index.
*
* ⚠️ CAUTION: This object is `readable` and must be read (e.g. piped to a file or to /dev/null)
* before `finish` will be emitted. Failure to read the stream will result in hangs.
*
* @extends {Transform}
*/
export class SitemapIndexStream extends Transform {
lastmodDateOnly: boolean;
level: ErrorLevel;
xslUrl?: string;
private hasHeadOutput: boolean;

/**
* `SitemapIndexStream` is a Transform stream that takes `IndexItem`s or sitemap URL strings and outputs a stream of sitemap index XML.
*
* It automatically handles the XML declaration and the opening and closing tags for the sitemap index.
*
* ⚠️ CAUTION: This object is `readable` and must be read (e.g. piped to a file or to /dev/null)
* before `finish` will be emitted. Failure to read the stream will result in hangs.
*
* @param {SitemapIndexStreamOptions} [opts=defaultStreamOpts] - Stream options.
*/
constructor(opts = defaultStreamOpts) {
opts.objectMode = true;
super(opts);
Expand All @@ -36,18 +76,22 @@ export class SitemapIndexStream extends Transform {
this.xslUrl = opts.xslUrl;
}

private writeHeadOutput(): void {
this.hasHeadOutput = true;
let stylesheet = '';
if (this.xslUrl) {
stylesheet = stylesheetInclude(this.xslUrl);
}
this.push(xmlDec + stylesheet + sitemapIndexTagStart);
}

_transform(
item: IndexItem | string,
encoding: string,
callback: TransformCallback
): void {
if (!this.hasHeadOutput) {
this.hasHeadOutput = true;
let stylesheet = '';
if (this.xslUrl) {
stylesheet = stylesheetInclude(this.xslUrl);
}
this.push(xmlDec + stylesheet + sitemapIndexTagStart);
this.writeHeadOutput();
}
this.push(otag(IndexTagNames.sitemap));
if (typeof item === 'string') {
Expand All @@ -69,83 +113,199 @@ export class SitemapIndexStream extends Transform {
}

_flush(cb: TransformCallback): void {
if (!this.hasHeadOutput) {
this.writeHeadOutput();
}

this.push(closetag);
cb();
}
}

type getSitemapStream = (
type getSitemapStreamFunc = (
i: number
) => [IndexItem | string, SitemapStream, WriteStream];

/**
* Options for the SitemapAndIndexStream
*
* @extends {SitemapIndexStreamOptions}
*/
export interface SitemapAndIndexStreamOptions
extends SitemapIndexStreamOptions {
level?: ErrorLevel;
/**
* Max number of items in each sitemap XML file.
*
* When the limit is reached the current sitemap file will be closed,
* a wait for `finish` on the target write stream will happen,
* and a new sitemap file will be created.
*
* Range: 1 - 50,000
*
* @default 45000
*/
limit?: number;
getSitemapStream: getSitemapStream;

/**
* Callback for SitemapIndexAndStream that creates a new sitemap stream for a given sitemap index.
*
* Called when a new sitemap file is needed.
*
* The write stream is the destination where the sitemap was piped.
* SitemapAndIndexStream will wait for the `finish` event on each sitemap's
* write stream before moving on to the next sitemap. This ensures that the
* contents of the write stream will be fully written before being used
* by any following operations (e.g. uploading, reading contents for unit tests).
*
* @param i - The index of the sitemap file
* @returns A tuple containing the index item to be written into the sitemap index, the sitemap stream, and the write stream for the sitemap pipe destination
*/
getSitemapStream: getSitemapStreamFunc;
}
// const defaultSIStreamOpts: SitemapAndIndexStreamOptions = {};

/**
* `SitemapAndIndexStream` is a Transform stream that takes in sitemap items,
* writes them to sitemap files, adds the sitemap files to a sitemap index,
* and creates new sitemap files when the count limit is reached.
*
* It waits for the target stream of the current sitemap file to finish before
* moving on to the next if the target stream is returned by the `getSitemapStream`
* callback in the 3rd position of the tuple.
*
* ⚠️ CAUTION: This object is `readable` and must be read (e.g. piped to a file or to /dev/null)
* before `finish` will be emitted. Failure to read the stream will result in hangs.
*
* @extends {SitemapIndexStream}
*/
export class SitemapAndIndexStream extends SitemapIndexStream {
private i: number;
private getSitemapStream: getSitemapStream;
private currentSitemap: SitemapStream;
private currentSitemapPipeline?: WriteStream;
private idxItem: IndexItem | string;
private itemsWritten: number;
private getSitemapStream: getSitemapStreamFunc;
private currentSitemap?: SitemapStream;
private limit: number;
private currentSitemapPipeline?: WriteStream;

/**
* `SitemapAndIndexStream` is a Transform stream that takes in sitemap items,
* writes them to sitemap files, adds the sitemap files to a sitemap index,
* and creates new sitemap files when the count limit is reached.
*
* It waits for the target stream of the current sitemap file to finish before
* moving on to the next if the target stream is returned by the `getSitemapStream`
* callback in the 3rd position of the tuple.
*
* ⚠️ CAUTION: This object is `readable` and must be read (e.g. piped to a file or to /dev/null)
* before `finish` will be emitted. Failure to read the stream will result in hangs.
*
* @param {SitemapAndIndexStreamOptions} opts - Stream options.
*/
constructor(opts: SitemapAndIndexStreamOptions) {
opts.objectMode = true;
super(opts);
this.i = 0;
this.itemsWritten = 0;
this.getSitemapStream = opts.getSitemapStream;
[this.idxItem, this.currentSitemap, this.currentSitemapPipeline] =
this.getSitemapStream(0);
this.limit = opts.limit ?? 45000;
}

_writeSMI(item: SitemapItemLoose, callback: () => void): void {
this.i++;
if (!this.currentSitemap.write(item)) {
this.currentSitemap.once('drain', callback);
} else {
process.nextTick(callback);
}
}

_transform(
item: SitemapItemLoose,
encoding: string,
callback: TransformCallback
): void {
if (this.i === 0) {
this._writeSMI(item, () =>
super._transform(this.idxItem, encoding, callback)
);
} else if (this.i % this.limit === 0) {
const onFinish = () => {
const [idxItem, currentSitemap, currentSitemapPipeline] =
this.getSitemapStream(this.i / this.limit);
this.currentSitemap = currentSitemap;
this.currentSitemapPipeline = currentSitemapPipeline;
// push to index stream
this._writeSMI(item, () =>
// push to index stream
super._transform(idxItem, encoding, callback)
);
};
this.currentSitemapPipeline?.on('finish', onFinish);
this.currentSitemap.end(
!this.currentSitemapPipeline ? onFinish : undefined
);
if (this.itemsWritten % this.limit === 0) {
if (this.currentSitemap) {
const onFinish = new Promise<void>((resolve, reject) => {
this.currentSitemap?.on('finish', resolve);
this.currentSitemap?.on('error', reject);
this.currentSitemap?.end();
});

const onPipelineFinish = this.currentSitemapPipeline
? new Promise<void>((resolve, reject) => {
this.currentSitemapPipeline?.on('finish', resolve);
this.currentSitemapPipeline?.on('error', reject);
})
: Promise.resolve();

Promise.all([onFinish, onPipelineFinish])
.then(() => {
this.createSitemap(encoding);
this.writeItem(item, callback);
})
.catch(callback);
return;
} else {
this.createSitemap(encoding);
}
}

this.writeItem(item, callback);
}

private writeItem(item: SitemapItemLoose, callback: TransformCallback): void {
if (!this.currentSitemap) {
callback(new Error('No sitemap stream available'));
return;
}

if (!this.currentSitemap.write(item)) {
this.currentSitemap.once('drain', callback);
} else {
this._writeSMI(item, callback);
process.nextTick(callback);
}

// Increment the count of items written
this.itemsWritten++;
}

/**
* Called when the stream is finished.
* If there is a current sitemap, we wait for it to finish before calling the callback.
*
* @param cb
*/
_flush(cb: TransformCallback): void {
const onFinish = () => super._flush(cb);
this.currentSitemapPipeline?.on('finish', onFinish);
this.currentSitemap.end(
!this.currentSitemapPipeline ? onFinish : undefined
);
const onFinish = new Promise<void>((resolve, reject) => {
if (this.currentSitemap) {
this.currentSitemap.on('finish', resolve);
this.currentSitemap.on('error', reject);
this.currentSitemap.end();
} else {
resolve();
}
});

const onPipelineFinish = new Promise<void>((resolve, reject) => {
if (this.currentSitemapPipeline) {
this.currentSitemapPipeline.on('finish', resolve);
this.currentSitemapPipeline.on('error', reject);
// The pipeline (pipe target) will get it's end() call
// from the sitemap stream ending.
} else {
resolve();
}
});

Promise.all([onFinish, onPipelineFinish])
.then(() => {
super._flush(cb);
})
.catch((err) => {
cb(err);
});
}

private createSitemap(encoding: string): void {
const [idxItem, currentSitemap, currentSitemapPipeline] =
this.getSitemapStream(this.itemsWritten / this.limit);
currentSitemap.on('error', (err) => this.emit('error', err));
this.currentSitemap = currentSitemap;
this.currentSitemapPipeline = currentSitemapPipeline;
super._transform(idxItem, encoding, () => {
// We are not too fussed about waiting for the index item to be written
// we we'll wait for the file to finish at the end
// and index file write volume tends to be small in comprarison to sitemap
// writes.
// noop
});
}
}
Loading
Loading