Skip to content

Commit

Permalink
MODHAADM-6 Experiments continued
Browse files Browse the repository at this point in the history
 - minor tweak
  • Loading branch information
nielserik committed Jan 10, 2025
1 parent 69e5314 commit 5d58593
Showing 1 changed file with 14 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.folio.harvesteradmin.service.fileimport;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
Expand All @@ -14,7 +16,7 @@ public class InventoryBatchUpdating implements RecordReceiver {
private final JobHandler job;
private JsonArray inventoryRecordSets = new JsonArray();

// Blocking queue of one, acting as a turnstile for batches, to upsert them one at a time
// Blocking queue of one, acting as a turnstile for batches in order to persist them one at a time
private final BlockingQueue<BatchOfRecords> turnstile = new ArrayBlockingQueue<>(1);
private int batchCounter = 0;
private final InventoryUpdateClient updateClient;
Expand Down Expand Up @@ -48,12 +50,13 @@ public void put(String jsonRecord) {

private void releaseBatch(BatchOfRecords batch) {
try {
// stage next batch for upsert, will wait if a previous batch is still in the turnstile
// stage next batch for upsert (process will wait if a previous batch is still in the turnstile)
turnstile.put(batch);
persistBatch();
} catch (InterruptedException ie) {
System.out.println("Error: Queue put operation was interrupted.");
}
persistBatch().onComplete(na -> {
try { turnstile.take(); }
catch (InterruptedException e) { throw new RuntimeException(e);}
});
} catch (InterruptedException ie) { throw new RuntimeException(ie);}
}

@Override
Expand All @@ -66,7 +69,8 @@ public void endOfDocument() {
* it must be in charge of when to invoke reporting. JobHandler will not
* otherwise know when the last upsert of a source file of records is done, for example.
*/
private void persistBatch() {
private Future<Void> persistBatch() {
Promise<Void> promise = Promise.promise();
BatchOfRecords batch = turnstile.peek();
if (batch != null) {
if (batch.size() > 0) {
Expand All @@ -76,21 +80,16 @@ private void persistBatch() {
if (batch.isLastBatchOfFile()) {
report(batch);
}
try {
// Clear the gate for next batch
turnstile.take();
} catch (InterruptedException ignored) {}
promise.complete();
});
} else { // we get here when the last set of records is exactly 100. We just need to report
if (batch.isLastBatchOfFile()) {
report(batch);
}
try {
// Clear the gate for next batch
turnstile.take();
} catch (InterruptedException ignored) {}
promise.complete();
}
}
return promise.future();
}

private void report(BatchOfRecords batch) {
Expand Down

0 comments on commit 5d58593

Please sign in to comment.