diff --git a/src/main/java/net/openhft/chronicle/bytes/DistributedUniqueTimeProvider.java b/src/main/java/net/openhft/chronicle/bytes/DistributedUniqueTimeProvider.java index f71e3cab9f1..2d75ea64a7c 100644 --- a/src/main/java/net/openhft/chronicle/bytes/DistributedUniqueTimeProvider.java +++ b/src/main/java/net/openhft/chronicle/bytes/DistributedUniqueTimeProvider.java @@ -29,9 +29,16 @@ import java.io.File; /** - * A TimeProvider implementation that ensures unique timestamps across multiple systems using a predefined hostId. - * It extends SimpleCloseable and implements TimeProvider to provide functionality for managing the timestamps. - * This class manages unique timestamp distribution across different hosts/systems. + * Provides unique timestamps across multiple systems by incorporating a host identifier into the timestamp, for microsecond and nanosecond resolution timestamps. + * This class is particularly useful in distributed systems where clock synchronization and uniqueness + * across hosts are critical. It implements {@link TimeProvider}. + *

+ * NOTE: {@link #currentTimeMillis()} is not unique, it is just a call to the underlying provider as there isn't enough resolution to include the hostId. + *

+ * + * Each timestamp generated is guaranteed to be unique across all hosts participating in the system. + * The class uses a file-based mechanism to ensure that timestamps are not only unique across restarts + * but also across different JVM instances. */ public class DistributedUniqueTimeProvider extends SimpleCloseable implements TimeProvider { @@ -47,6 +54,13 @@ public class DistributedUniqueTimeProvider extends SimpleCloseable implements Ti private TimeProvider provider = SystemTimeProvider.INSTANCE; private int hostId; + /** + * Constructs a {@link DistributedUniqueTimeProvider} for a specified hostId. + * This constructor initializes a file-based backend which is used to store and deduplicate timestamps. + * + * @param hostId the identifier for the host, must be non-negative + * @param unmonitor if true, disables the monitoring of the file and byte stores used internally + */ private DistributedUniqueTimeProvider(@NonNegative int hostId, boolean unmonitor) { hostId(hostId); try { @@ -66,14 +80,22 @@ private DistributedUniqueTimeProvider(@NonNegative int hostId, boolean unmonitor } /** - * Returns an instance of DistributedUniqueTimeProvider. + * Provides a singleton instance of DistributedUniqueTimeProvider using the default hostId. + * This method is thread-safe and uses lazy initialization. * - * @return the single instance of DistributedUniqueTimeProvider + * @return the single, shared instance of DistributedUniqueTimeProvider */ public static DistributedUniqueTimeProvider instance() { return DistributedUniqueTimeProviderHolder.INSTANCE; } + /** + * Creates a new instance of DistributedUniqueTimeProvider for a specified hostId. + * This is useful in scenarios where multiple instances are required, each with a different hostId. + * + * @param hostId the host identifier for which the time provider is to be created, must be non-negative + * @return a new instance of DistributedUniqueTimeProvider configured with the given hostId + */ public static DistributedUniqueTimeProvider forHostId(@NonNegative int hostId) { return new DistributedUniqueTimeProvider(hostId, false); } @@ -105,6 +127,7 @@ protected void performClose() { bytes.release(this); file.releaseLast(); } + /** * Sets the hostId of the DistributedUniqueTimeProvider. The hostId is used to * create unique timestamps across different hosts. @@ -116,7 +139,7 @@ protected void performClose() { public DistributedUniqueTimeProvider hostId(@NonNegative int hostId) { // Check if the provided hostId is negative and throw an exception if it is if (hostId < 0) - throw new IllegalArgumentException("Invalid hostId: " + hostId); + throw new IllegalArgumentException("Host ID must be non-negative but was: " + hostId); // Assign the provided hostId modulo the maximum number of host IDs // to ensure it's within the valid range @@ -142,6 +165,11 @@ public DistributedUniqueTimeProvider provider(TimeProvider provider) { } /** + * NOTE: Calls to this method do not produce unique timestamps, rather just calls the underlying provider. + *

+ * Use {@link #currentTimeMicros()} or {@link #currentTimeNanos()} to generate unique timestamps, + * or use {@link net.openhft.chronicle.core.time.UniqueMicroTimeProvider#currentTimeMillis()} to generate unique timestamps. + *

* @return Ordinary millisecond timestamp */ @Override diff --git a/src/test/java/net/openhft/chronicle/bytes/DistributedUniqueTimeProviderTest.java b/src/test/java/net/openhft/chronicle/bytes/DistributedUniqueTimeProviderTest.java index 0055d6bedfc..de07e7883d1 100644 --- a/src/test/java/net/openhft/chronicle/bytes/DistributedUniqueTimeProviderTest.java +++ b/src/test/java/net/openhft/chronicle/bytes/DistributedUniqueTimeProviderTest.java @@ -19,14 +19,21 @@ import net.openhft.chronicle.core.Jvm; import net.openhft.chronicle.core.OS; -import net.openhft.chronicle.core.time.LongTime; -import net.openhft.chronicle.core.time.TimeProvider; +import net.openhft.chronicle.core.time.*; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; @@ -34,6 +41,16 @@ public class DistributedUniqueTimeProviderTest extends BytesTestCommon { + private DistributedUniqueTimeProvider timeProvider; + private SetTimeProvider setTimeProvider; + + @Before + public void setUp() { + timeProvider = DistributedUniqueTimeProvider.instance(); + setTimeProvider = new SetTimeProvider(SystemTimeProvider.INSTANCE.currentTimeNanos()); + timeProvider.provider(setTimeProvider); + } + static volatile long blackHole; @BeforeClass @@ -48,10 +65,9 @@ public static void checks() throws IOException { @Test public void currentTimeMicros() { - TimeProvider tp = DistributedUniqueTimeProvider.instance(); long last = 0; for (int i = 0; i < 100_000; i++) { - long time = tp.currentTimeMicros(); + long time = timeProvider.currentTimeMicros(); assertTrue(time > last); assertEquals(LongTime.toMicros(time), time); last = time; @@ -60,12 +76,11 @@ public void currentTimeMicros() { @Test public void currentTimeMicrosPerf() { - TimeProvider tp = DistributedUniqueTimeProvider.instance(); long start = System.currentTimeMillis(), end; int count = 0; do { for (int i = 0; i < 1000; i++) - blackHole = tp.currentTimeMicros(); + blackHole = ((TimeProvider) timeProvider).currentTimeMicros(); count += 1000; } while ((end = System.currentTimeMillis()) < start + 500); long rate = 1000L * count / (end - start); @@ -75,12 +90,11 @@ public void currentTimeMicrosPerf() { @Test public void currentTimeNanosPerf() { - TimeProvider tp = DistributedUniqueTimeProvider.instance(); long start = System.currentTimeMillis(), end; int count = 0; do { for (int i = 0; i < 1000; i++) - blackHole = tp.currentTimeNanos(); + blackHole = ((TimeProvider) timeProvider).currentTimeNanos(); count += 1000; } while ((end = System.currentTimeMillis()) < start + 500); long rate = 1000L * count / (end - start); @@ -90,13 +104,12 @@ public void currentTimeNanosPerf() { @Test public void currentTimeNanos() { - TimeProvider tp = DistributedUniqueTimeProvider.instance(); - long start = tp.currentTimeNanos(); + long start = ((TimeProvider) timeProvider).currentTimeNanos(); long last = start; int count = 0; long runTime = Jvm.isArm() ? 3_000_000_000L : 500_000_000L; for (; ; ) { - long now = tp.currentTimeNanos(); + long now = ((TimeProvider) timeProvider).currentTimeNanos(); assertEquals(LongTime.toNanos(now), now); if (now > start + runTime) break; @@ -140,12 +153,152 @@ public void concurrentTimeNanos() { @Test public void testMonotonicallyIncreasing() { - TimeProvider tp = DistributedUniqueTimeProvider.instance(); long last = 0; for (int i = 0; i < 10_000; i++) { - long now = DistributedUniqueTimeProvider.timestampFor(tp.currentTimeNanos()); + long now = DistributedUniqueTimeProvider.timestampFor(((TimeProvider) timeProvider).currentTimeNanos()); assertTrue(now > last); last = now; } } + + @Test + public void shouldProvideUniqueTimeAcrossThreadsMicros() throws InterruptedException { + final Set allGeneratedTimestamps = ConcurrentHashMap.newKeySet(); + final int numberOfThreads = 50; + final int factor = 50; + final int iterationsPerThread = 500; + final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); + final CountDownLatch latch = new CountDownLatch(numberOfThreads * factor); + + for (int i = 0; i < numberOfThreads * factor; i++) { + executor.execute(() -> { + try { + List threadTimeSet = new ArrayList<>(iterationsPerThread); + long lastTimestamp = 0; + for (int j = 0; j < iterationsPerThread; j++) { + + // there could be a race condition for the next two methods, but it shouldn't matter for this test + setTimeProvider.advanceNanos(j); + long currentTimeMicros = timeProvider.currentTimeMicros(); + + threadTimeSet.add(currentTimeMicros); + assertTrue("Timestamps should always increase", currentTimeMicros > lastTimestamp); + lastTimestamp = currentTimeMicros; + } + allGeneratedTimestamps.addAll(threadTimeSet); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + executor.shutdown(); + + assertEquals("All timestamps across all threads and iterations should be unique", + numberOfThreads * iterationsPerThread * factor, allGeneratedTimestamps.size()); + } + + @Test + public void shouldProvideUniqueTimeAcrossThreadsNanos() throws InterruptedException { + final Set allGeneratedTimestamps = ConcurrentHashMap.newKeySet(); + final int numberOfThreads = 50; + final int factor = 50; + final int iterationsPerThread = 500; + final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); + final CountDownLatch latch = new CountDownLatch(numberOfThreads * factor); + + for (int i = 0; i < numberOfThreads * factor; i++) { + executor.execute(() -> { + try { + List threadTimeSet = new ArrayList<>(iterationsPerThread); + long lastTimestamp = 0; + for (int j = 0; j < iterationsPerThread; j++) { + + // there could be a race condition for the next two methods, but it shouldn't matter for this test + setTimeProvider.advanceNanos(j); + long currentTimeNanos = timeProvider.currentTimeNanos(); + + threadTimeSet.add(currentTimeNanos); + assertTrue("Timestamps should always be increasing", currentTimeNanos > lastTimestamp); + lastTimestamp = currentTimeNanos; + } + allGeneratedTimestamps.addAll(threadTimeSet); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + executor.shutdown(); + + assertEquals("All timestamps across all threads and iterations should be unique", + numberOfThreads * iterationsPerThread * factor, allGeneratedTimestamps.size()); + } + + @Test + public void shouldAdvanceTimeWhenExceedingCallsPerSecond() { + final int iterations = 1_000_001; + long lastTimeMicros = 0; + + for (int i = 0; i < iterations; i++) { + setTimeProvider.advanceNanos(i); + long currentTimeMicros = timeProvider.currentTimeMicros(); + assertTrue("Each timestamp must be greater than the last", currentTimeMicros > lastTimeMicros); + lastTimeMicros = currentTimeMicros; + } + } + + @Test + public void currentTimeMillisShouldBeCorrect() { + int iterations = 1_000; + long lastTimeMillis = 0; + final long startTimeMillis = setTimeProvider.currentTimeMillis(); + + for (int i = 0; i < iterations; i++) { + setTimeProvider.advanceNanos(i); + long currentTimeMillis = timeProvider.currentTimeMillis(); + assertTrue(currentTimeMillis >= startTimeMillis); + assertTrue(currentTimeMillis <= startTimeMillis + iterations); + assertTrue("Millisecond timestamps must increase or be the same", currentTimeMillis >= lastTimeMillis); + lastTimeMillis = currentTimeMillis; + } + } + + @Test + public void currentTimeMicrosShouldBeCorrect() { + long lastTimeMicros = 0; + + for (int i = 0; i < 4_000; i++) { + setTimeProvider.advanceNanos(i); + long currentTimeMicros = timeProvider.currentTimeMicros(); + assertTrue("Microsecond timestamps must increase", currentTimeMicros > lastTimeMicros); + lastTimeMicros = currentTimeMicros; + } + } + + @Test + public void currentTimeMicrosShouldBeCorrectBackwards() { + long lastTimeMicros = 0; + + for (int i = 0; i < 4_000; i++) { + setTimeProvider.advanceNanos(-i); + long currentTimeMicros = timeProvider.currentTimeMicros(); + assertTrue("Microsecond timestamps must increase", currentTimeMicros > lastTimeMicros); + lastTimeMicros = currentTimeMicros; + } + } + + @Test + public void currentTimeNanosShouldBeCorrect() { + long lastTimeNanos = 0; + + for (int i = 0; i < 4_000; i++) { + setTimeProvider.advanceNanos(i); + long currentTimeNanos = timeProvider.currentTimeNanos(); + assertTrue("Nanosecond timestamps should increase", currentTimeNanos > lastTimeNanos); + lastTimeNanos = currentTimeNanos / 1000; + } + } }