From 43f9a011e3f8d1ef35a4e94e3eb292aec09d6eef Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Thu, 10 Oct 2024 16:14:37 -0400 Subject: [PATCH] Minor object store improvement - complete faster (#1237) * Minor object store improvement - complete faster * validation order * short circuit --- .../io/nats/client/impl/NatsFeatureBase.java | 2 +- .../io/nats/client/impl/NatsObjectStore.java | 32 +++++++++++++------ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsFeatureBase.java b/src/main/java/io/nats/client/impl/NatsFeatureBase.java index fae6ed4df..7f1490a5c 100644 --- a/src/main/java/io/nats/client/impl/NatsFeatureBase.java +++ b/src/main/java/io/nats/client/impl/NatsFeatureBase.java @@ -29,7 +29,7 @@ public class NatsFeatureBase { protected final NatsJetStream js; - protected final JetStreamManagement jsm; + protected final NatsJetStreamManagement jsm; protected String streamName; NatsFeatureBase(NatsConnection connection, FeatureOptions fo) throws IOException { diff --git a/src/main/java/io/nats/client/impl/NatsObjectStore.java b/src/main/java/io/nats/client/impl/NatsObjectStore.java index 30d697a3a..5d3118973 100644 --- a/src/main/java/io/nats/client/impl/NatsObjectStore.java +++ b/src/main/java/io/nats/client/impl/NatsObjectStore.java @@ -22,7 +22,6 @@ import java.io.*; import java.nio.file.Files; import java.security.NoSuchAlgorithmException; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -218,6 +217,7 @@ public ObjectInfo get(String objectName, OutputStream out) throws IOException, J Digester digester = new Digester(); long totalBytes = 0; long totalChunks = 0; + long expectedChunks = oi.getChunks(); // if there is one chunk, just go get the message directly and we're done. if (oi.getChunks() == 1) { @@ -236,30 +236,42 @@ public ObjectInfo get(String objectName, OutputStream out) throws IOException, J JetStreamSubscription sub = js.subscribe(rawChunkSubject(oi.getNuid()), PushSubscribeOptions.builder().stream(streamName).ordered(true).build()); - Message m = sub.nextMessage(Duration.ofSeconds(1)); + Message m = sub.nextMessage(jsm.getTimeout()); while (m != null) { + // track the byte count and chunks + long pending = m.metaData().pendingCount(); + if (expectedChunks != pending + (++totalChunks)) { + throw OsGetChunksMismatch.instance(); // short circuit, we already know there are not enough chunks. + } + byte[] data = m.getData(); + totalBytes += data.length; - // track the byte count and chunks // update the digest - // write the bytes to the output file - totalBytes += data.length; - totalChunks++; digester.update(data); + + // write the bytes to the output file out.write(data); // read until the subject is complete - m = sub.nextMessage(Duration.ofSeconds(1)); + if (pending == 0) { + break; + } + m = sub.nextMessage(jsm.getTimeout()); } - sub.unsubscribe(); + try { + sub.unsubscribe(); + } + catch (RuntimeException ignore) {} } - out.flush(); - if (totalBytes != oi.getSize()) { throw OsGetSizeMismatch.instance(); } if (totalChunks != oi.getChunks()) { throw OsGetChunksMismatch.instance(); } + if (totalBytes != oi.getSize()) { throw OsGetSizeMismatch.instance(); } if (!digester.matches(oi.getDigest())) { throw OsGetDigestMismatch.instance(); } + out.flush(); // moved after validation, no need if invalid + return oi; }