From 5d58593598aa3c1a2d2bdd923397b9e0eb58d8dc Mon Sep 17 00:00:00 2001 From: nielserik Date: Fri, 10 Jan 2025 13:27:06 +0100 Subject: [PATCH] MODHAADM-6 Experiments continued - minor tweak --- .../fileimport/InventoryBatchUpdating.java | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryBatchUpdating.java b/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryBatchUpdating.java index 9231114..9604015 100644 --- a/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryBatchUpdating.java +++ b/src/main/java/org/folio/harvesteradmin/service/fileimport/InventoryBatchUpdating.java @@ -1,5 +1,7 @@ package org.folio.harvesteradmin.service.fileimport; +import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; @@ -14,7 +16,7 @@ public class InventoryBatchUpdating implements RecordReceiver { private final JobHandler job; private JsonArray inventoryRecordSets = new JsonArray(); - // Blocking queue of one, acting as a turnstile for batches, to upsert them one at a time + // Blocking queue of one, acting as a turnstile for batches in order to persist them one at a time private final BlockingQueue turnstile = new ArrayBlockingQueue<>(1); private int batchCounter = 0; private final InventoryUpdateClient updateClient; @@ -48,12 +50,13 @@ public void put(String jsonRecord) { private void releaseBatch(BatchOfRecords batch) { try { - // stage next batch for upsert, will wait if a previous batch is still in the turnstile + // stage next batch for upsert (process will wait if a previous batch is still in the turnstile) turnstile.put(batch); - persistBatch(); - } catch (InterruptedException ie) { - System.out.println("Error: Queue put operation was interrupted."); - } + persistBatch().onComplete(na -> { + try { turnstile.take(); } + catch (InterruptedException e) { throw new RuntimeException(e);} + }); + } catch (InterruptedException ie) { throw new RuntimeException(ie);} } @Override @@ -66,7 +69,8 @@ public void endOfDocument() { * it must be in charge of when to invoke reporting. JobHandler will not * otherwise know when the last upsert of a source file of records is done, for example. */ - private void persistBatch() { + private Future persistBatch() { + Promise promise = Promise.promise(); BatchOfRecords batch = turnstile.peek(); if (batch != null) { if (batch.size() > 0) { @@ -76,21 +80,16 @@ private void persistBatch() { if (batch.isLastBatchOfFile()) { report(batch); } - try { - // Clear the gate for next batch - turnstile.take(); - } catch (InterruptedException ignored) {} + promise.complete(); }); } else { // we get here when the last set of records is exactly 100. We just need to report if (batch.isLastBatchOfFile()) { report(batch); } - try { - // Clear the gate for next batch - turnstile.take(); - } catch (InterruptedException ignored) {} + promise.complete(); } } + return promise.future(); } private void report(BatchOfRecords batch) {