Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breadcrumbs after update #494

Merged
merged 8 commits into from
Sep 28, 2023
13 changes: 13 additions & 0 deletions .changeset/tall-avocados-repeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
'@jpmorganchase/mosaic-core': patch
'@jpmorganchase/mosaic-plugins': patch
'@jpmorganchase/mosaic-types': patch
---

### Feature

Add new plugin lifecycle method `shouldUpdateNamespaceSources`.

This method is called when a source emits new pages and there is another source(s) that share the same namespace.

If `shouldUpdateNamespaceSources` returns `true` then the other source(s), i.e., not the source that triggered the initial update, will call `afterUpdate` again.
35 changes: 33 additions & 2 deletions packages/core/src/Source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,37 @@ export default class Source {
}
}

async requestNamespaceSourceUpdate(
updatedSourceFilesystem: IVolumeImmutable,
sharedFilesystem: IVolumeImmutable,
globalConfig: MutableData<unknown>
) {
const initTime = new Date().getTime();
const shouldInvokeAfterUpdate = await this.#pluginApi.shouldUpdateNamespaceSources(
updatedSourceFilesystem,
{
globalFilesystem: this.#globalFilesystem,
pageExtensions: this.#pageExtensions,
ignorePages: this.#ignorePages,
serialiser: this.serialiser,
config: this.config.asReadOnly(),
namespace: this.namespace
}
);
const timeTaken = new Date().getTime() - initTime;
if (timeTaken > 400) {
console.warn(
`Lifecycle phase 'shouldUpdateNamespaceSources' for source '${this.id.description}' took ${timeTaken}ms to complete. The method is async, so this may not be an accurate measurement of execution time, but consider optimising this method if it is performing intensive operations.`
);
}
if (shouldInvokeAfterUpdate === true) {
this.filesystem.unfreeze();
await this.invokeAfterUpdate(sharedFilesystem, globalConfig);
DavieReid marked this conversation as resolved.
Show resolved Hide resolved
this.filesystem.freeze();
this.filesystem.clearCache();
}
}

async use(
plugins: PluginModuleDefinition[] = [],
serialisers: SerialiserModuleDefinition[] = []
Expand Down Expand Up @@ -214,13 +245,13 @@ export default class Source {

if (foundWorkflows.length === 0) {
return {
error: `[Mosaic] Workflow ${name} not found for ${this.id.description.toString()}`
error: `[Mosaic][Source] workflow ${name} not found for ${this.id.description.toString()}`
};
}

if (foundWorkflows.length > 1) {
return {
error: `[Mosaic] Multiple workflows with "${name}" found for ${this.id.description.toString()}`
error: `[Mosaic][Source] multiple workflows with "${name}" found for ${this.id.description.toString()}`
};
}

Expand Down
51 changes: 43 additions & 8 deletions packages/core/src/SourceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import createConfig from './helpers/createConfig.js';
function logUpdateStatus(sourceId, initOrStartTime) {
if (initOrStartTime) {
console.debug(
`[Mosaic] Source '${sourceId.description}' received first docs snapshot ${
`[Mosaic][Source] '${sourceId.description}' received first docs snapshot ${
(new Date().getTime() - initOrStartTime) / 1000
}s after starting.`
);
} else {
console.debug(`[Mosaic] Source '${sourceId.description}' received updated docs`);
console.debug(`[Mosaic][Source] '${sourceId.description}' received updated docs`);
}
}

Expand Down Expand Up @@ -126,7 +126,7 @@ export default class SourceManager {
if (!sourceActive) {
reject(
new Error(
`Source '${source.id.description}' received a message before it was initialised.`
`[Mosaic][Source] '${source.id.description}' received a message before it was initialised.`
)
);
} else {
Expand All @@ -151,7 +151,6 @@ export default class SourceManager {
if (!sourceActive) {
return;
}

await source.invokeAfterUpdate(this.#sharedFilesystem, this.#globalConfig);

// After each async operation, we should check if anything has caused the Source to close
Expand All @@ -163,6 +162,19 @@ export default class SourceManager {
source.filesystem.clearCache();

await this.#updateSources(immutableSourceFilesystem, source);

// After each async operation, we should check if anything has caused the Source to close
if (!sourceActive) {
return;
}

await this.#updateNamespaceSources(immutableSourceFilesystem, source);

// After each async operation, we should check if anything has caused the Source to close
if (!sourceActive) {
return;
}

this.#invokeUpdateCallbacks(immutableSourceFilesystem, source);
} catch (e) {
console.warn(
Expand All @@ -185,18 +197,21 @@ export default class SourceManager {
source.onExit(() => {
if (!sourceActive) {
reject(
new Error(`Source '${source.id.description}' silently exited before initialising.`)
new Error(
`[Mosaic][Source] '${source.id.description}' silently exited before initialising.`
)
);
}
console.debug(`[Mosaic] Source '${source.id.description}' closed`);
console.debug(`[Mosaic][Source] '${source.id.description}' closed`);

this.#sources.delete(source.id);
sourceActive = false;
});

source.onStart(() => {
sourceActive = true;
console.debug(
`[Mosaic] Source '${source.id.description}' started in ${
`[Mosaic][Source] '${source.id.description}' started in ${
new Date().getTime() - initOrStartTime
}ms - awaiting first docs snapshot`
);
Expand All @@ -210,7 +225,7 @@ export default class SourceManager {
this.#handlers.forEach(callback => callback(filesystem, source));
}

#updateSources(immutableSourceFilesystem, source) {
#updateSources(immutableSourceFilesystem: IVolumeImmutable, source: Source) {
// Notify other frozen sources that something has changed. If a source's filesystem hasn't been frozen yet - there's no point
// in notifying it of another source changing, as it hasn't initialised or called `afterUpdate` for the first time yet anyway.
// No need to do them sequentially - we can fire them all off at once and wait for them all to finish in their own time
Expand All @@ -224,6 +239,26 @@ export default class SourceManager {
);
}

#updateNamespaceSources(immutableSourceFilesystem: IVolumeImmutable, source: Source) {
// Notify other frozen sources that share this sources namespace that something has changed.
return Promise.all(
Array.from(this.#sources.values()).map(existingSource => {
if (
existingSource !== source &&
existingSource.filesystem.frozen &&
existingSource.namespace === source.namespace
) {
return existingSource.requestNamespaceSourceUpdate(
immutableSourceFilesystem,
this.#sharedFilesystem,
this.#globalConfig
);
}
return existingSource;
})
);
}

listSources() {
return Array.from(this.#sources.values()).map((source, index) => ({
name: source.id.description,
Expand Down
6 changes: 4 additions & 2 deletions packages/core/src/__tests__/SourceManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ describe('GIVEN SourceManager', () => {
await expect(() =>
sourceManager.addSource({ name: 'source2', modulePath: 'source2-module' }, {})
).rejects.toThrow(
new Error("Source 'source2' received a message before it was initialised.")
new Error("[Mosaic][Source] 'source2' received a message before it was initialised.")
);
});

Expand Down Expand Up @@ -242,7 +242,9 @@ describe('GIVEN SourceManager', () => {
});
await expect(() =>
sourceManager.addSource({ name: 'source', modulePath: 'source-module' }, {})
).rejects.toThrow(new Error("Source 'source' silently exited before initialising."));
).rejects.toThrow(
new Error("[Mosaic][Source] 'source' silently exited before initialising.")
);
});
test('THEN the error should be reported in a rejected promise', async () => {
setTimeout(() => {
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/plugin/createPluginAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ function createProxyBaseAPI<ConfigData>(): Plugin<Page, ConfigData> {
},
$beforeSend() {
throw new Error('This is just for the interface on the Proxy and should never be invoked.');
},
shouldUpdateNamespaceSources() {
throw new Error('This is just for the interface on the Proxy and should never be invoked.');
}
};
}
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ export async function bindPluginMethods(
async $beforeSend(mutableFilesystem, args) {
const result = await pluginApi.$beforeSend(mutableFilesystem, args);
return result;
},
async shouldUpdateNamespaceSources(lastAfterUpdateReturn, args) {
const result = await pluginApi.shouldUpdateNamespaceSources(lastAfterUpdateReturn, args);
return result;
}
};
}
16 changes: 15 additions & 1 deletion packages/core/src/plugin/pluginRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,26 @@ export default async function pluginRunner(
plugin.options
);

if (result && lifecycleName !== '$afterSource' && lifecycleName !== 'shouldClearCache') {
if (
result &&
lifecycleName !== '$afterSource' &&
lifecycleName !== 'shouldClearCache' &&
lifecycleName !== 'shouldUpdateNamespaceSources'
) {
console.warn(
`[Mosaic] \`${lifecycleName}\` plugin should not return a value - this lifecycle phase expects mutation to occur directly on the filesystem instance. This will be ignored.`
);
}

transformedInput = result;
if (
(lifecycleName === 'shouldClearCache' ||
lifecycleName === 'shouldUpdateNamespaceSources') &&
result === true
) {
/** This lifecycle returns a boolean so if *any* plugin wants to clear the cache then we should do so */
break;
}
} catch (exception) {
const pluginName = path.posix.basename(
plugin.modulePath,
Expand Down Expand Up @@ -63,5 +76,6 @@ export default async function pluginRunner(
continue;
}
}

return { result: transformedInput, errors: pluginErrors };
}
4 changes: 2 additions & 2 deletions packages/core/src/worker/Source.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ if (isMainThread) {
)
.subscribe(async (pagesAndSymlinks: Buffer) => {
if (workerData.options.cache !== false) {
console.info(`[Mosaic] Saving cached filesystem of ${workerData.name}`);
console.info(`[Mosaic][Source] Saving cached filesystem of ${workerData.name}`);
await fs.promises.writeFile(cachePath, pagesAndSymlinks);
}

Expand All @@ -107,7 +107,7 @@ if (isMainThread) {
try {
if (await fs.promises.stat(cachePath)) {
const data = await fs.promises.readFile(cachePath);
console.info(`[Mosaic] Restoring cached filesystem for ${workerData.name}`);
console.info(`[Mosaic][Source] Restoring cached filesystem for ${workerData.name}`);
parentPort.postMessage(
{
type: 'init',
Expand Down
3 changes: 3 additions & 0 deletions packages/plugins/src/BreadcrumbsPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ const BreadcrumbsPlugin: PluginType<BreadcrumbsPluginPage, BreadcrumbsPluginOpti
}
}
}
},
async shouldUpdateNamespaceSources() {
return true;
}
};

Expand Down
24 changes: 24 additions & 0 deletions packages/types/src/Plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,28 @@ export type Plugin<
},
options?: TOptions
) => Promise<boolean>;
/**
* Plugin lifecycle method that triggers inside the main process everytime ANY source emits new pages.
* This method should return a boolean that will indicate if this source should force other namespace sources to re-run afterUpdate.
* Returning `undefined`, false or no value, will result in no update being triggered for namespace sources
* @param updatedSourceFilesystem Immutable filesystem for the source that changed
* @param param.config An immutable object for reading data from other lifecycle phases of all plugins for this source in the child process for this plugin
* @param param.serialiser A matching `Serialiser` for serialising/deserialising pages when reading/writing to the filesystem
* @param param.globalFilesystem Immutable union filesystem instance with all source's pages (and symlinks applied)
* @param param.namespace The namespace of the source running the plugin
* @param options The options passed in when declaring the plugin
* @returns {Promise<boolean>} A boolean indicating whether to clear the cache for this source
*/
shouldUpdateNamespaceSources?: (
updatedSourceFilesystem: IVolumeImmutable,
helpers: {
serialiser: Serialiser<TPage>;
config: ImmutableData<ConfigData>;
globalFilesystem: IUnionVolume;
pageExtensions: string[];
ignorePages: string[];
namespace: string;
},
options?: TOptions
) => Promise<boolean>;
};