Skip to content

Commit

Permalink
MODHAADM-6 Experiments continued
Browse files Browse the repository at this point in the history
 - minor tweaks and commenting
  • Loading branch information
nielserik committed Jan 10, 2025
1 parent 8ed341e commit 69e5314
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ public class InventoryBatchUpdating implements RecordReceiver {

private final JobHandler job;
private JsonArray inventoryRecordSets = new JsonArray();
private final BlockingQueue<BatchOfRecords> batchQueue = new ArrayBlockingQueue<>(1);

// Blocking queue of one, acting as a turnstile for batches, to upsert them one at a time
private final BlockingQueue<BatchOfRecords> turnstile = new ArrayBlockingQueue<>(1);
private int batchCounter = 0;
private final InventoryUpdateClient updateClient;

Expand Down Expand Up @@ -46,8 +48,8 @@ public void put(String jsonRecord) {

private void releaseBatch(BatchOfRecords batch) {
try {
// add batch to a queue of one (thus wait if a batch is in the queue already)
batchQueue.put(batch);
// stage next batch for upsert, will wait if a previous batch is still in the turnstile
turnstile.put(batch);
persistBatch();
} catch (InterruptedException ie) {
System.out.println("Error: Queue put operation was interrupted.");
Expand All @@ -65,7 +67,7 @@ public void endOfDocument() {
* otherwise know when the last upsert of a source file of records is done, for example.
*/
private void persistBatch() {
BatchOfRecords batch = batchQueue.peek();
BatchOfRecords batch = turnstile.peek();
if (batch != null) {
if (batch.size() > 0) {
updateClient.inventoryUpsert(batch.getUpsertRequestBody()).onComplete(json -> {
Expand All @@ -75,17 +77,17 @@ private void persistBatch() {
report(batch);
}
try {
// Open entrance for next batch
batchQueue.take();
// Clear the gate for next batch
turnstile.take();
} catch (InterruptedException ignored) {}
});
} else { // we get here when the last set of records is exactly 100. We just need to report
if (batch.isLastBatchOfFile()) {
report(batch);
}
try {
// Open entrance for next batch
batchQueue.take();
// Clear the gate for next batch
turnstile.take();
} catch (InterruptedException ignored) {}
}
}
Expand All @@ -101,16 +103,16 @@ private void report(BatchOfRecords batch) {
}
}

private final AtomicInteger batchQueueIdleChecks = new AtomicInteger(0);
private final AtomicInteger turnstileEmptyChecks = new AtomicInteger(0);
public boolean noPendingBatches(int idlingChecksThreshold) {
if (batchQueue.isEmpty()) {
if (batchQueueIdleChecks.incrementAndGet() > idlingChecksThreshold) {
System.out.println("ID-NE: BatchQueue has been idle for " + idlingChecksThreshold + " consecutive checks.");
batchQueueIdleChecks.set(0);
if (turnstile.isEmpty()) {
if (turnstileEmptyChecks.incrementAndGet() > idlingChecksThreshold) {
System.out.println("ID-NE: Turnstile has been idle for " + idlingChecksThreshold + " consecutive checks.");
turnstileEmptyChecks.set(0);
return true;
}
} else {
batchQueueIdleChecks.set(0);
turnstileEmptyChecks.set(0);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void add(InventoryMetrics delta) {
public String toString() {
StringBuilder str = new StringBuilder();
for (Entity entity : metrics.keySet()) {
str.append(entity + ": ");
str.append(entity).append(": ");
for (Transaction transaction : metrics.get(entity).keySet()) {
for (Outcome outcome : metrics.get(entity).get(transaction).keySet()) {
int count = this.metrics.get(entity).get(transaction).get(outcome);
Expand All @@ -115,47 +115,34 @@ public String toString() {
}

public String report() {
StringBuilder str = new StringBuilder();
str.append("Instance creates: "
+ metrics.get(INSTANCE).get(CREATE).get(COMPLETED) + ". "
+ " Failed: " + metrics.get(INSTANCE).get(CREATE).get(FAILED)
+ " Skipped: " + metrics.get(INSTANCE).get(CREATE).get(SKIPPED) + "\n");
str.append("Instance updates: "
+ metrics.get(INSTANCE).get(UPDATE).get(COMPLETED) + ". "
+ " Failed: " + metrics.get(INSTANCE).get(UPDATE).get(FAILED)
+ " Skipped: " + metrics.get(INSTANCE).get(UPDATE).get(SKIPPED) + "\n");
str.append("Instance deletes: "
+ metrics.get(INSTANCE).get(DELETE).get(COMPLETED) + ". "
+ " Failed: " + metrics.get(INSTANCE).get(DELETE).get(FAILED)
+ " Skipped: " + metrics.get(INSTANCE).get(DELETE).get(SKIPPED) + "\n");
str.append("Holdings records creates: "
+ metrics.get(HOLDINGS_RECORD).get(CREATE).get(COMPLETED) + ". "
+ " Failed: " + metrics.get(HOLDINGS_RECORD).get(CREATE).get(FAILED)
+ " Skipped: " + metrics.get(HOLDINGS_RECORD).get(CREATE).get(SKIPPED) + "\n");
str.append("Holdings records updates: "
+ metrics.get(HOLDINGS_RECORD).get(UPDATE).get(COMPLETED) + ". "
+ " Failed: " + metrics.get(HOLDINGS_RECORD).get(UPDATE).get(FAILED)
+ " Skipped: " + metrics.get(HOLDINGS_RECORD).get(UPDATE).get(SKIPPED)+ "\n");
str.append("Holdings records deletes: "
+ metrics.get(HOLDINGS_RECORD).get(DELETE).get(COMPLETED) + ". "
+ " Failed: " + metrics.get(HOLDINGS_RECORD).get(DELETE).get(FAILED)
+ " Skipped: " + metrics.get(HOLDINGS_RECORD).get(DELETE).get(SKIPPED)+ "\n");
str.append("Item creates: "
+ metrics.get(ITEM).get(CREATE).get(COMPLETED) + ". "
+ " Failed: " + metrics.get(ITEM).get(CREATE).get(FAILED)
+ " Skipped: " + metrics.get(ITEM).get(CREATE).get(SKIPPED)+ "\n");
str.append("Item updates: "
+ metrics.get(ITEM).get(UPDATE).get(COMPLETED) + ". "
+ " Failed: " + metrics.get(ITEM).get(UPDATE).get(FAILED)
+ " Skipped: " + metrics.get(ITEM).get(UPDATE).get(SKIPPED) + "\n");
str.append("Item deletes: "
+ metrics.get(ITEM).get(DELETE).get(COMPLETED) + ". "
+ " Failed: " + metrics.get(ITEM).get(DELETE).get(FAILED)
+ " Skipped: " + metrics.get(ITEM).get(DELETE).get(SKIPPED)+ "\n");
return str.toString();
}


return "Instance creates: " + metrics.get(INSTANCE).get(CREATE).get(COMPLETED) + ". " +
" Failed: " + metrics.get(INSTANCE).get(CREATE).get(FAILED) +
" Skipped: " + metrics.get(INSTANCE).get(CREATE).get(SKIPPED) + "\n" +
"Instance updates: " + metrics.get(INSTANCE).get(UPDATE).get(COMPLETED) + ". " +
" Failed: " + metrics.get(INSTANCE).get(UPDATE).get(FAILED) +
" Skipped: " + metrics.get(INSTANCE).get(UPDATE).get(SKIPPED) + "\n" +
"Instance deletes: " + metrics.get(INSTANCE).get(DELETE).get(COMPLETED) + ". " +
" Failed: " + metrics.get(INSTANCE).get(DELETE).get(FAILED) +
" Skipped: " + metrics.get(INSTANCE).get(DELETE).get(SKIPPED) + "\n" +
"Holdings records creates: " + metrics.get(HOLDINGS_RECORD).get(CREATE).get(COMPLETED) + ". " +
" Failed: " + metrics.get(HOLDINGS_RECORD).get(CREATE).get(FAILED) +
" Skipped: " + metrics.get(HOLDINGS_RECORD).get(CREATE).get(SKIPPED) + "\n" +
"Holdings records updates: " + metrics.get(HOLDINGS_RECORD).get(UPDATE).get(COMPLETED) + ". " +
" Failed: " + metrics.get(HOLDINGS_RECORD).get(UPDATE).get(FAILED) +
" Skipped: " + metrics.get(HOLDINGS_RECORD).get(UPDATE).get(SKIPPED) + "\n" +
"Holdings records deletes: " + metrics.get(HOLDINGS_RECORD).get(DELETE).get(COMPLETED) + ". " +
" Failed: " + metrics.get(HOLDINGS_RECORD).get(DELETE).get(FAILED) +
" Skipped: " + metrics.get(HOLDINGS_RECORD).get(DELETE).get(SKIPPED) + "\n" +
"Item creates: " + metrics.get(ITEM).get(CREATE).get(COMPLETED) + ". " +
" Failed: " + metrics.get(ITEM).get(CREATE).get(FAILED) +
" Skipped: " + metrics.get(ITEM).get(CREATE).get(SKIPPED) + "\n" +
"Item updates: " + metrics.get(ITEM).get(UPDATE).get(COMPLETED) + ". " +
" Failed: " + metrics.get(ITEM).get(UPDATE).get(FAILED) +
" Skipped: " + metrics.get(ITEM).get(UPDATE).get(SKIPPED) + "\n" +
"Item deletes: " + metrics.get(ITEM).get(DELETE).get(COMPLETED) + ". " +
" Failed: " + metrics.get(ITEM).get(DELETE).get(FAILED) +
" Skipped: " + metrics.get(ITEM).get(DELETE).get(SKIPPED);
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public boolean fileQueueDone(boolean possibly) {
return passive.get();
}

/**
* If there's a file in the processing slot but no activity in the inventory updater, the current job
* is assumed to be in a paused state, which could for example be due to a module restart.
* @return true if there's a file ostensibly processing but no activity detected in inventory updater
* for `idlingChecksThreshold` consecutive checks
*/
private boolean resumeHaltedProcessing() {
return fileQueue.processingSlotTaken() && inventoryUpdater.noPendingBatches(10);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public void reportFileQueueStats(boolean queueDone) {
System.out.println((queueDone ? "Done processing queue for job " : "Job ") + jobId + ": " + filesProcessed + " file(s) with " + recordsProcessed.get() +
" records processed in " + processingTimeAsString(processingTime) + " (" +
(recordsProcessed.get() * 1000L / processingTime) + " recs/s.)");
System.out.println(inventoryMetrics.report());
if (queueDone) {
System.out.println(inventoryMetrics.report());
}

}

Expand Down

0 comments on commit 69e5314

Please sign in to comment.