diff --git a/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryBatchUpdating.java b/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryBatchUpdating.java index 762ca1f..9231114 100644 --- a/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryBatchUpdating.java +++ b/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryBatchUpdating.java @@ -13,7 +13,9 @@ public class InventoryBatchUpdating implements RecordReceiver { private final JobHandler job; private JsonArray inventoryRecordSets = new JsonArray(); - private final BlockingQueue batchQueue = new ArrayBlockingQueue<>(1); + + // Blocking queue of one, acting as a turnstile for batches, to upsert them one at a time + private final BlockingQueue turnstile = new ArrayBlockingQueue<>(1); private int batchCounter = 0; private final InventoryUpdateClient updateClient; @@ -46,8 +48,8 @@ public void put(String jsonRecord) { private void releaseBatch(BatchOfRecords batch) { try { - // add batch to a queue of one (thus wait if a batch is in the queue already) - batchQueue.put(batch); + // stage next batch for upsert, will wait if a previous batch is still in the turnstile + turnstile.put(batch); persistBatch(); } catch (InterruptedException ie) { System.out.println("Error: Queue put operation was interrupted."); @@ -65,7 +67,7 @@ public void endOfDocument() { * otherwise know when the last upsert of a source file of records is done, for example. */ private void persistBatch() { - BatchOfRecords batch = batchQueue.peek(); + BatchOfRecords batch = turnstile.peek(); if (batch != null) { if (batch.size() > 0) { updateClient.inventoryUpsert(batch.getUpsertRequestBody()).onComplete(json -> { @@ -75,8 +77,8 @@ private void persistBatch() { report(batch); } try { - // Open entrance for next batch - batchQueue.take(); + // Clear the gate for next batch + turnstile.take(); } catch (InterruptedException ignored) {} }); } else { // we get here when the last set of records is exactly 100. We just need to report @@ -84,8 +86,8 @@ private void persistBatch() { report(batch); } try { - // Open entrance for next batch - batchQueue.take(); + // Clear the gate for next batch + turnstile.take(); } catch (InterruptedException ignored) {} } } @@ -101,16 +103,16 @@ private void report(BatchOfRecords batch) { } } - private final AtomicInteger batchQueueIdleChecks = new AtomicInteger(0); + private final AtomicInteger turnstileEmptyChecks = new AtomicInteger(0); public boolean noPendingBatches(int idlingChecksThreshold) { - if (batchQueue.isEmpty()) { - if (batchQueueIdleChecks.incrementAndGet() > idlingChecksThreshold) { - System.out.println("ID-NE: BatchQueue has been idle for " + idlingChecksThreshold + " consecutive checks."); - batchQueueIdleChecks.set(0); + if (turnstile.isEmpty()) { + if (turnstileEmptyChecks.incrementAndGet() > idlingChecksThreshold) { + System.out.println("ID-NE: Turnstile has been idle for " + idlingChecksThreshold + " consecutive checks."); + turnstileEmptyChecks.set(0); return true; } } else { - batchQueueIdleChecks.set(0); + turnstileEmptyChecks.set(0); } return false; } diff --git a/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryMetrics.java b/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryMetrics.java index f06fdf2..97bffe4 100644 --- a/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryMetrics.java +++ b/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryMetrics.java @@ -101,7 +101,7 @@ public void add(InventoryMetrics delta) { public String toString() { StringBuilder str = new StringBuilder(); for (Entity entity : metrics.keySet()) { - str.append(entity + ": "); + str.append(entity).append(": "); for (Transaction transaction : metrics.get(entity).keySet()) { for (Outcome outcome : metrics.get(entity).get(transaction).keySet()) { int count = this.metrics.get(entity).get(transaction).get(outcome); @@ -115,47 +115,34 @@ public String toString() { } public String report() { - StringBuilder str = new StringBuilder(); - str.append("Instance creates: " - + metrics.get(INSTANCE).get(CREATE).get(COMPLETED) + ". " - + " Failed: " + metrics.get(INSTANCE).get(CREATE).get(FAILED) - + " Skipped: " + metrics.get(INSTANCE).get(CREATE).get(SKIPPED) + "\n"); - str.append("Instance updates: " - + metrics.get(INSTANCE).get(UPDATE).get(COMPLETED) + ". " - + " Failed: " + metrics.get(INSTANCE).get(UPDATE).get(FAILED) - + " Skipped: " + metrics.get(INSTANCE).get(UPDATE).get(SKIPPED) + "\n"); - str.append("Instance deletes: " - + metrics.get(INSTANCE).get(DELETE).get(COMPLETED) + ". " - + " Failed: " + metrics.get(INSTANCE).get(DELETE).get(FAILED) - + " Skipped: " + metrics.get(INSTANCE).get(DELETE).get(SKIPPED) + "\n"); - str.append("Holdings records creates: " - + metrics.get(HOLDINGS_RECORD).get(CREATE).get(COMPLETED) + ". " - + " Failed: " + metrics.get(HOLDINGS_RECORD).get(CREATE).get(FAILED) - + " Skipped: " + metrics.get(HOLDINGS_RECORD).get(CREATE).get(SKIPPED) + "\n"); - str.append("Holdings records updates: " - + metrics.get(HOLDINGS_RECORD).get(UPDATE).get(COMPLETED) + ". " - + " Failed: " + metrics.get(HOLDINGS_RECORD).get(UPDATE).get(FAILED) - + " Skipped: " + metrics.get(HOLDINGS_RECORD).get(UPDATE).get(SKIPPED)+ "\n"); - str.append("Holdings records deletes: " - + metrics.get(HOLDINGS_RECORD).get(DELETE).get(COMPLETED) + ". " - + " Failed: " + metrics.get(HOLDINGS_RECORD).get(DELETE).get(FAILED) - + " Skipped: " + metrics.get(HOLDINGS_RECORD).get(DELETE).get(SKIPPED)+ "\n"); - str.append("Item creates: " - + metrics.get(ITEM).get(CREATE).get(COMPLETED) + ". " - + " Failed: " + metrics.get(ITEM).get(CREATE).get(FAILED) - + " Skipped: " + metrics.get(ITEM).get(CREATE).get(SKIPPED)+ "\n"); - str.append("Item updates: " - + metrics.get(ITEM).get(UPDATE).get(COMPLETED) + ". " - + " Failed: " + metrics.get(ITEM).get(UPDATE).get(FAILED) - + " Skipped: " + metrics.get(ITEM).get(UPDATE).get(SKIPPED) + "\n"); - str.append("Item deletes: " - + metrics.get(ITEM).get(DELETE).get(COMPLETED) + ". " - + " Failed: " + metrics.get(ITEM).get(DELETE).get(FAILED) - + " Skipped: " + metrics.get(ITEM).get(DELETE).get(SKIPPED)+ "\n"); - return str.toString(); - } - - + return "Instance creates: " + metrics.get(INSTANCE).get(CREATE).get(COMPLETED) + ". " + + " Failed: " + metrics.get(INSTANCE).get(CREATE).get(FAILED) + + " Skipped: " + metrics.get(INSTANCE).get(CREATE).get(SKIPPED) + "\n" + + "Instance updates: " + metrics.get(INSTANCE).get(UPDATE).get(COMPLETED) + ". " + + " Failed: " + metrics.get(INSTANCE).get(UPDATE).get(FAILED) + + " Skipped: " + metrics.get(INSTANCE).get(UPDATE).get(SKIPPED) + "\n" + + "Instance deletes: " + metrics.get(INSTANCE).get(DELETE).get(COMPLETED) + ". " + + " Failed: " + metrics.get(INSTANCE).get(DELETE).get(FAILED) + + " Skipped: " + metrics.get(INSTANCE).get(DELETE).get(SKIPPED) + "\n" + + "Holdings records creates: " + metrics.get(HOLDINGS_RECORD).get(CREATE).get(COMPLETED) + ". " + + " Failed: " + metrics.get(HOLDINGS_RECORD).get(CREATE).get(FAILED) + + " Skipped: " + metrics.get(HOLDINGS_RECORD).get(CREATE).get(SKIPPED) + "\n" + + "Holdings records updates: " + metrics.get(HOLDINGS_RECORD).get(UPDATE).get(COMPLETED) + ". " + + " Failed: " + metrics.get(HOLDINGS_RECORD).get(UPDATE).get(FAILED) + + " Skipped: " + metrics.get(HOLDINGS_RECORD).get(UPDATE).get(SKIPPED) + "\n" + + "Holdings records deletes: " + metrics.get(HOLDINGS_RECORD).get(DELETE).get(COMPLETED) + ". " + + " Failed: " + metrics.get(HOLDINGS_RECORD).get(DELETE).get(FAILED) + + " Skipped: " + metrics.get(HOLDINGS_RECORD).get(DELETE).get(SKIPPED) + "\n" + + "Item creates: " + metrics.get(ITEM).get(CREATE).get(COMPLETED) + ". " + + " Failed: " + metrics.get(ITEM).get(CREATE).get(FAILED) + + " Skipped: " + metrics.get(ITEM).get(CREATE).get(SKIPPED) + "\n" + + "Item updates: " + metrics.get(ITEM).get(UPDATE).get(COMPLETED) + ". " + + " Failed: " + metrics.get(ITEM).get(UPDATE).get(FAILED) + + " Skipped: " + metrics.get(ITEM).get(UPDATE).get(SKIPPED) + "\n" + + "Item deletes: " + metrics.get(ITEM).get(DELETE).get(COMPLETED) + ". " + + " Failed: " + metrics.get(ITEM).get(DELETE).get(FAILED) + + " Skipped: " + metrics.get(ITEM).get(DELETE).get(SKIPPED); } +} diff --git a/src/main/java/org/folio/harvesteradmin/service/fileimport/JobHandler.java b/src/main/java/org/folio/harvesteradmin/service/fileimport/JobHandler.java index 947a110..23871f5 100644 --- a/src/main/java/org/folio/harvesteradmin/service/fileimport/JobHandler.java +++ b/src/main/java/org/folio/harvesteradmin/service/fileimport/JobHandler.java @@ -68,6 +68,12 @@ public boolean fileQueueDone(boolean possibly) { return passive.get(); } + /** + * If there's a file in the processing slot but no activity in the inventory updater, the current job + * is assumed to be in a paused state, which could for example be due to a module restart. + * @return true if there's a file ostensibly processing but no activity detected in inventory updater + * for `idlingChecksThreshold` consecutive checks + */ private boolean resumeHaltedProcessing() { return fileQueue.processingSlotTaken() && inventoryUpdater.noPendingBatches(10); } diff --git a/src/main/java/org/folio/harvesteradmin/service/fileimport/Reporting.java b/src/main/java/org/folio/harvesteradmin/service/fileimport/Reporting.java index 3ec7170..cc76c9a 100644 --- a/src/main/java/org/folio/harvesteradmin/service/fileimport/Reporting.java +++ b/src/main/java/org/folio/harvesteradmin/service/fileimport/Reporting.java @@ -68,7 +68,9 @@ public void reportFileQueueStats(boolean queueDone) { System.out.println((queueDone ? "Done processing queue for job " : "Job ") + jobId + ": " + filesProcessed + " file(s) with " + recordsProcessed.get() + " records processed in " + processingTimeAsString(processingTime) + " (" + (recordsProcessed.get() * 1000L / processingTime) + " recs/s.)"); - System.out.println(inventoryMetrics.report()); + if (queueDone) { + System.out.println(inventoryMetrics.report()); + } }