Skip to content

Commit

Permalink
chore(broker): Categorise opportunity caches (#670)
Browse files Browse the repository at this point in the history
  • Loading branch information
lukehesluke authored Mar 26, 2024
1 parent d7ac312 commit 8e312a9
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 119 deletions.
4 changes: 3 additions & 1 deletion packages/openactive-broker-microservice/src/broker-config.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ const WAIT_FOR_HARVEST = VALIDATE_ONLY ? false : config.get('broker.waitForHarve
const VERBOSE = config.get('broker.verbose');
const OUTPUT_PATH = config.get('broker.outputPath');
const IS_RUNNING_IN_CI = config.has('ci') ? config.get('ci') : false;
// TODO: move this property to the root of the config
// TODO: move this property to the root of the config as it is used in both
// broker and the integration tests. Broker should only access config from
// either the root or within `.broker`
const USE_RANDOM_OPPORTUNITIES = config.get('integrationTests.useRandomOpportunities');

const HARVEST_START_TIME = (new Date()).toISOString();
Expand Down
111 changes: 57 additions & 54 deletions packages/openactive-broker-microservice/src/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const { harvestRPDE, createFeedContext, progressFromContext } = require('@openac
const { partial } = require('lodash');
const path = require('path');

const { OpportunityIdCache } = require('./util/opportunity-id-cache');
const { CriteriaOrientedOpportunityIdCache } = require('./util/criteria-oriented-opportunity-id-cache');
const { logError, logErrorDuringHarvest, log, logCharacter } = require('./util/log');
const {
PORT,
Expand Down Expand Up @@ -146,7 +146,7 @@ function getStatusRoute(req, res) {
children: `${childOrphans} of ${totalChildren} (${percentageChildOrphans}%)`,
},
totalOpportunitiesHarvested: totalOpportunities,
buckets: DO_NOT_FILL_BUCKETS ? null : mapToObjectSummary(state.opportunityIdCache),
buckets: DO_NOT_FILL_BUCKETS ? null : mapToObjectSummary(state.criteriaOrientedOpportunityIdCache),
});
}

Expand All @@ -163,14 +163,14 @@ async function getValidationErrorsRoute(req, res) {
* @param {import('express').Response} res
*/
function deleteOpportunityCacheRoute(req, res) {
state.parentOpportunityMap.clear();
state.parentOpportunityRpdeMap.clear();
state.opportunityMap.clear();
state.opportunityRpdeMap.clear();
state.rowStoreMap.clear();
state.parentIdIndex.clear();
state.opportunityCache.parentMap.clear();
state.opportunityHousekeepingCaches.parentOpportunityRpdeMap.clear();
state.opportunityCache.childMap.clear();
state.opportunityHousekeepingCaches.opportunityRpdeMap.clear();
state.opportunityItemRowCache.store.clear();
state.opportunityItemRowCache.parentIdIndex.clear();

state.opportunityIdCache = OpportunityIdCache.create();
state.criteriaOrientedOpportunityIdCache = CriteriaOrientedOpportunityIdCache.create();

res.status(204).send();
}
Expand Down Expand Up @@ -386,7 +386,7 @@ async function renderValidationErrorsHtml(validatorWorkerPool) {
* @param {string} id The `@id` of the JSON-LD object
*/
function renderOpenValidatorHref(id) {
const cachedResponse = state.opportunityMap.get(id) || state.parentOpportunityMap.get(id);
const cachedResponse = state.opportunityCache.childMap.get(id) || state.opportunityCache.parentMap.get(id);
if (cachedResponse) {
const jsonString = JSON.stringify(cachedResponse, null, 2);
return `https://validator.openactive.io/?validationMode=${ITEM_VALIDATION_MODE}#/json/${Base64.encodeURI(jsonString)}`;
Expand Down Expand Up @@ -443,7 +443,7 @@ function withOpportunityRpdeHeaders(getHeadersFn) {
* @returns {any}
*/
function getRandomBookableOpportunity({ sellerId, bookingFlow, opportunityType, criteriaName, testDatasetIdentifier }) {
const typeBucket = OpportunityIdCache.getTypeBucket(state.opportunityIdCache, {
const typeBucket = CriteriaOrientedOpportunityIdCache.getTypeBucket(state.criteriaOrientedOpportunityIdCache, {
criteriaName, bookingFlow, opportunityType,
});
const sellerCompartment = typeBucket.contents.get(sellerId);
Expand Down Expand Up @@ -490,7 +490,7 @@ function getRandomBookableOpportunity({ sellerId, bookingFlow, opportunityType,
* @param {string} args.bookingFlow
*/
function assertOpportunityCriteriaNotFound({ opportunityType, criteriaName, bookingFlow }) {
const typeBucket = OpportunityIdCache.getTypeBucket(state.opportunityIdCache, {
const typeBucket = CriteriaOrientedOpportunityIdCache.getTypeBucket(state.criteriaOrientedOpportunityIdCache, {
criteriaName, opportunityType, bookingFlow,
});

Expand All @@ -515,15 +515,15 @@ function releaseOpportunityLocks(testDatasetIdentifier) {
* @param {string} childOpportunityId
*/
function getOpportunityMergedWithParentById(childOpportunityId) {
const opportunity = state.opportunityMap.get(childOpportunityId);
const opportunity = state.opportunityCache.childMap.get(childOpportunityId);
if (!opportunity) {
return null;
}
if (!jsonLdHasReferencedParent(opportunity)) {
return opportunity;
}
const superEvent = state.parentOpportunityMap.get(/** @type {string} */(opportunity.superEvent));
const facilityUse = state.parentOpportunityMap.get(/** @type {string} */(opportunity.facilityUse));
const superEvent = state.opportunityCache.parentMap.get(/** @type {string} */(opportunity.superEvent));
const facilityUse = state.opportunityCache.parentMap.get(/** @type {string} */(opportunity.facilityUse));
if (superEvent || facilityUse) {
const mergedContexts = getMergedJsonLdContext(opportunity, superEvent, facilityUse);
delete opportunity['@context'];
Expand Down Expand Up @@ -744,7 +744,7 @@ function millisToMinutesAndSeconds(millis) {
}

function getOrphanJson() {
const rows = Array.from(state.rowStoreMap.values()).filter((x) => x.jsonLdParentId !== null);
const rows = Array.from(state.opportunityItemRowCache.store.values()).filter((x) => x.jsonLdParentId !== null);
return {
children: {
matched: rows.filter((x) => !x.waitingForParentToBeIngested).length,
Expand Down Expand Up @@ -774,10 +774,10 @@ function getOrphanJson() {
* @returns {OrphanStats}
*/
function getOrphanStats() {
const childRows = Array.from(state.rowStoreMap.values()).filter((x) => x.jsonLdParentId !== null);
const childRows = Array.from(state.opportunityItemRowCache.store.values()).filter((x) => x.jsonLdParentId !== null);
const childOrphans = childRows.filter((x) => x.waitingForParentToBeIngested).length;
const totalChildren = childRows.length;
const totalOpportunities = Array.from(state.rowStoreMap.values()).filter((x) => !x.waitingForParentToBeIngested).length;
const totalOpportunities = Array.from(state.opportunityItemRowCache.store.values()).filter((x) => !x.waitingForParentToBeIngested).length;
const percentageChildOrphans = totalChildren > 0 ? ((childOrphans / totalChildren) * 100).toFixed(2) : '0';
return {
childOrphans,
Expand Down Expand Up @@ -999,8 +999,8 @@ async function ingestParentOpportunityPage(rpdePage, feedIdentifier, isInitialHa
// Store the parent opportunity data in the maps
const jsonLdId = item.data['@id'] || item.data.id;

state.parentOpportunityRpdeMap.set(feedItemIdentifier, jsonLdId);
state.parentOpportunityMap.set(jsonLdId, item.data);
state.opportunityHousekeepingCaches.parentOpportunityRpdeMap.set(feedItemIdentifier, jsonLdId);
state.opportunityCache.parentMap.set(jsonLdId, item.data);

// If there are subEvents then we have a basic "small provider" SessionSeries feed. This is not
// recommended, but we support it anyway here. We do this by converting each of the embedded
Expand Down Expand Up @@ -1031,19 +1031,19 @@ async function ingestParentOpportunityPage(rpdePage, feedIdentifier, isInitialHa
}
} else {
// State = deleted
const jsonLdId = state.parentOpportunityRpdeMap.get(feedItemIdentifier);
const jsonLdId = state.opportunityHousekeepingCaches.parentOpportunityRpdeMap.get(feedItemIdentifier);

// If we had subEvents for this item, then we must be sure to delete the associated opportunityItems
// that were made for them:
if (state.parentOpportunitySubEventMap.get(jsonLdId)) {
for (const subEventId of state.parentOpportunitySubEventMap.get(jsonLdId)) {
if (state.opportunityHousekeepingCaches.parentOpportunitySubEventMap.get(jsonLdId)) {
for (const subEventId of state.opportunityHousekeepingCaches.parentOpportunitySubEventMap.get(jsonLdId)) {
deleteChildOpportunityItem(subEventId);
}
}

state.parentOpportunityRpdeMap.delete(feedItemIdentifier);
state.parentOpportunityMap.delete(jsonLdId);
state.parentOpportunitySubEventMap.delete(jsonLdId);
state.opportunityHousekeepingCaches.parentOpportunityRpdeMap.delete(feedItemIdentifier);
state.opportunityCache.parentMap.delete(jsonLdId);
state.opportunityHousekeepingCaches.parentOpportunitySubEventMap.delete(jsonLdId);
}
}

Expand All @@ -1062,23 +1062,23 @@ async function ingestParentOpportunityPage(rpdePage, feedIdentifier, isInitialHa
* @param {string} jsonLdId
*/
function updateParentOpportunitySubEventMap(item, jsonLdId) {
const oldSubEventIds = state.parentOpportunitySubEventMap.get(jsonLdId);
const oldSubEventIds = state.opportunityHousekeepingCaches.parentOpportunitySubEventMap.get(jsonLdId);
const newSubEventIds = item.data.subEvent.map((subEvent) => subEvent['@id'] || subEvent.id).filter((x) => x);

if (!oldSubEventIds) {
if (newSubEventIds.length > 0) {
state.parentOpportunitySubEventMap.set(jsonLdId, newSubEventIds);
state.opportunityHousekeepingCaches.parentOpportunitySubEventMap.set(jsonLdId, newSubEventIds);
}
} else {
for (const subEventId of oldSubEventIds) {
if (!newSubEventIds.includes(subEventId)) {
deleteChildOpportunityItem(subEventId);
state.parentOpportunitySubEventMap.get(jsonLdId).filter((x) => x !== subEventId);
state.opportunityHousekeepingCaches.parentOpportunitySubEventMap.get(jsonLdId).filter((x) => x !== subEventId);
}
}
for (const subEventId of newSubEventIds) {
if (!oldSubEventIds.includes(subEventId)) {
state.parentOpportunitySubEventMap.get(jsonLdId).push(subEventId);
state.opportunityHousekeepingCaches.parentOpportunitySubEventMap.get(jsonLdId).push(subEventId);
}
}
}
Expand All @@ -1095,15 +1095,15 @@ async function ingestChildOpportunityPage(rpdePage, feedIdentifier, isInitialHar
for (const item of rpdePage.items) {
const feedItemIdentifier = feedPrefix + item.id;
if (item.state === 'deleted') {
const jsonLdId = state.opportunityRpdeMap.get(feedItemIdentifier);
state.opportunityMap.delete(jsonLdId);
state.opportunityRpdeMap.delete(feedItemIdentifier);
const jsonLdId = state.opportunityHousekeepingCaches.opportunityRpdeMap.get(feedItemIdentifier);
state.opportunityCache.childMap.delete(jsonLdId);
state.opportunityHousekeepingCaches.opportunityRpdeMap.delete(feedItemIdentifier);

deleteChildOpportunityItem(jsonLdId);
} else {
const jsonLdId = item.data['@id'] || item.data.id;
state.opportunityRpdeMap.set(feedItemIdentifier, jsonLdId);
state.opportunityMap.set(jsonLdId, item.data);
state.opportunityHousekeepingCaches.opportunityRpdeMap.set(feedItemIdentifier, jsonLdId);
state.opportunityCache.childMap.set(jsonLdId, item.data);

await storeChildOpportunityItem(item);
}
Expand All @@ -1123,16 +1123,16 @@ async function touchChildOpportunityItems(parentIds) {

// Get IDs of all opportunities which are children of the specified parents.
parentIds.forEach((parentId) => {
if (state.parentIdIndex.has(parentId)) {
state.parentIdIndex.get(parentId).forEach((jsonLdId) => {
if (state.opportunityItemRowCache.parentIdIndex.has(parentId)) {
state.opportunityItemRowCache.parentIdIndex.get(parentId).forEach((jsonLdId) => {
opportunitiesToUpdate.add(jsonLdId);
});
}
});

await Promise.all([...opportunitiesToUpdate].map(async (jsonLdId) => {
if (state.rowStoreMap.has(jsonLdId)) {
const row = state.rowStoreMap.get(jsonLdId);
if (state.opportunityItemRowCache.store.has(jsonLdId)) {
const row = state.opportunityItemRowCache.store.get(jsonLdId);
row.feedModified = Date.now() + 1000; // 1 second in the future
row.waitingForParentToBeIngested = false;
await processRow(row);
Expand All @@ -1146,13 +1146,13 @@ async function touchChildOpportunityItems(parentIds) {
* @param {string} jsonLdId
*/
function deleteChildOpportunityItem(jsonLdId) {
const row = state.rowStoreMap.get(jsonLdId);
const row = state.opportunityItemRowCache.store.get(jsonLdId);
if (row) {
const idx = state.parentIdIndex.get(row.jsonLdParentId);
const idx = state.opportunityItemRowCache.parentIdIndex.get(row.jsonLdParentId);
if (idx) {
idx.delete(jsonLdId);
}
state.rowStoreMap.delete(jsonLdId);
state.opportunityItemRowCache.store.delete(jsonLdId);
}
}

Expand All @@ -1177,20 +1177,20 @@ async function storeChildOpportunityItem(item) {
jsonLd: item.data,
jsonLdType: item.data['@type'] || item.data.type,
jsonLdParentId: !jsonLdHasReferencedParent(item.data) ? null : item.data.superEvent || item.data.facilityUse,
waitingForParentToBeIngested: jsonLdHasReferencedParent(item.data) && !(state.parentOpportunityMap.has(item.data.superEvent) || state.parentOpportunityMap.has(item.data.facilityUse)),
waitingForParentToBeIngested: jsonLdHasReferencedParent(item.data) && !(state.opportunityCache.parentMap.has(item.data.superEvent) || state.opportunityCache.parentMap.has(item.data.facilityUse)),
};

if (row.jsonLdId == null) {
throw new FatalError(`RPDE item '${item.id}' of kind '${item.kind}' does not have an @id. All items in the feeds must have an @id within the "data" property.`);
}
// Associate the child with its parent
if (row.jsonLdParentId != null) {
if (!state.parentIdIndex.has(row.jsonLdParentId)) state.parentIdIndex.set(row.jsonLdParentId, new Set());
state.parentIdIndex.get(row.jsonLdParentId).add(row.jsonLdId);
if (!state.opportunityItemRowCache.parentIdIndex.has(row.jsonLdParentId)) state.opportunityItemRowCache.parentIdIndex.set(row.jsonLdParentId, new Set());
state.opportunityItemRowCache.parentIdIndex.get(row.jsonLdParentId).add(row.jsonLdId);
}

// Cache it
state.rowStoreMap.set(row.jsonLdId, row);
state.opportunityItemRowCache.store.set(row.jsonLdId, row);

// If child and parent both exist, notify any listeners, etc
if (!row.waitingForParentToBeIngested) {
Expand Down Expand Up @@ -1249,7 +1249,7 @@ async function processRow(row) {
data: row.jsonLd,
};
} else {
const parentOpportunity = state.parentOpportunityMap.get(row.jsonLdParentId);
const parentOpportunity = state.opportunityCache.parentMap.get(row.jsonLdParentId);
const mergedContexts = getMergedJsonLdContext(row.jsonLd, parentOpportunity);

const parentOpportunityWithoutContext = {
Expand Down Expand Up @@ -1322,7 +1322,7 @@ async function processOpportunityItem(item) {
}),
}))) {
for (const bookingFlow of bookingFlows) {
const typeBucket = OpportunityIdCache.getTypeBucket(state.opportunityIdCache, {
const typeBucket = CriteriaOrientedOpportunityIdCache.getTypeBucket(state.criteriaOrientedOpportunityIdCache, {
criteriaName, opportunityType, bookingFlow,
});
if (!typeBucket.contents.has(sellerId)) typeBucket.contents.set(sellerId, new Set());
Expand Down Expand Up @@ -1376,10 +1376,7 @@ function monitorOrdersPage(orderFeedType, bookingPartnerIdentifier) {
/* Note that the Orders RpdePageProcessor does NOT use validateItemsFn i.e. Orders feed items are not validated.
The reasoning being that the feed _should_ be empty in controlled mode as previously created Orders will have been
deleted via the Test Interface.
TODO: Validate items in Orders feed as there will be some in there in the following use cases:
- Random mode
- Controlled mode but the Booking Partner is one that is also used outside of Test Suite (though this use case is
not recommended). */
TODO implement validation (https://github.com/openactive/openactive-test-suite/issues/666) */
return async (rpdePage) => {
for (const item of rpdePage.items) {
if (item.id) {
Expand All @@ -1403,8 +1400,14 @@ async function startPolling() {
mkdirp(OUTPUT_PATH),
]);

// Limit validator to 5 minutes if WAIT_FOR_HARVEST is set
// TODO: explain why
/* Limit validator to 5 minutes if WAIT_FOR_HARVEST is set. If
WAIT_FOR_HARVEST is set, then the integration tests are waiting for Broker to
finish harvesting before they start. Validation of a potentially large feed's
worth of data (e.g. in random mode) is computationally more expensive than
just fetching the feed and so lags behind. Ideally, Broker validates the feed
as much as possible, but the primary purpose of Test Suite is to check the
outcomes of different scenarios. And so, we put a max validation time on
Broker when WAIT_FOR_HARVEST is set. */
const validatorTimeoutMs = WAIT_FOR_HARVEST ? 1000 * 60 * 5 : null;
const validatorWorkerPool = new ValidatorWorkerPool(validatorTimeoutMs);
validatorWorkerPool.run();
Expand Down
Loading

0 comments on commit 8e312a9

Please sign in to comment.