From 4d1428d5f3dc5bb638803dc813c74b78e219654e Mon Sep 17 00:00:00 2001 From: keiran-lawrey Date: Tue, 24 Sep 2024 21:58:14 +0100 Subject: [PATCH 01/12] Added lots of javadoc --- .../openhft/chronicle/queue/BufferMode.java | 22 +- .../queue/ChronicleHistoryReaderMain.java | 96 +- .../chronicle/queue/ChronicleQueue.java | 1 + .../chronicle/queue/ChronicleReaderMain.java | 136 ++- .../chronicle/queue/ChronicleWriterMain.java | 12 + .../chronicle/queue/CycleCalculator.java | 16 + .../queue/DefaultCycleCalculator.java | 20 +- .../chronicle/queue/ExcerptAppender.java | 1 + .../chronicle/queue/ExcerptCommon.java | 1 + .../chronicle/queue/ExcerptTailer.java | 1 + .../chronicle/queue/NoMessageHistory.java | 96 +- .../queue/QueueSystemProperties.java | 47 +- .../openhft/chronicle/queue/RollCycle.java | 95 +- .../openhft/chronicle/queue/RollCycles.java | 1 + .../chronicle/queue/TailerDirection.java | 27 +- .../openhft/chronicle/queue/TailerState.java | 24 +- .../chronicle/queue/impl/CommonStore.java | 25 +- .../chronicle/queue/impl/ExcerptContext.java | 3 +- .../queue/impl/RollingChronicleQueue.java | 97 +- .../queue/impl/RollingResourcesCache.java | 1 + .../queue/impl/StoreFileListener.java | 43 +- .../queue/impl/StoreFileListeners.java | 25 +- .../chronicle/queue/impl/TableStore.java | 69 +- .../chronicle/queue/impl/WireStore.java | 89 +- .../queue/impl/WireStoreFactory.java | 9 + .../chronicle/queue/impl/WireStorePool.java | 46 +- .../queue/impl/WireStoreSupplier.java | 49 +- .../queue/impl/single/AppendLock.java | 21 + .../queue/impl/single/AsyncBufferCreator.java | 42 +- .../queue/impl/single/BinarySearch.java | 58 +- .../queue/impl/single/DirectoryListing.java | 54 ++ .../single/FileSystemDirectoryListing.java | 70 +- .../impl/single/IllegalIndexException.java | 35 +- .../single/IndexNotAvailableException.java | 20 +- .../chronicle/queue/impl/single/Indexing.java | 69 +- .../queue/impl/single/InternalAppender.java | 31 +- .../queue/impl/single/MetaDataField.java | 15 + .../queue/impl/single/MetaDataKeys.java | 26 + .../queue/impl/single/MicroToucher.java | 41 +- .../single/MissingStoreFileException.java | 13 +- .../NamedTailerNotAvailableException.java | 54 +- .../queue/impl/single/NoOpCondition.java | 54 +- .../impl/single/NotComparableException.java | 20 +- .../impl/single/NotReachedException.java | 12 + .../queue/impl/single/PrecreatedFiles.java | 26 + .../queue/impl/single/Pretoucher.java | 30 +- .../queue/impl/single/ReadOnlyWriteLock.java | 37 + .../impl/single/ReferenceCountedCache.java | 58 +- .../impl/single/RollCycleEncodeSequence.java | 61 +- .../queue/impl/single/SCQIndexing.java | 384 +++++++- .../chronicle/queue/impl/single/SCQMeta.java | 64 +- .../chronicle/queue/impl/single/SCQRoll.java | 109 ++- .../chronicle/queue/impl/single/SCQTools.java | 55 +- .../queue/impl/single/ScanResult.java | 27 +- .../impl/single/SingleChronicleQueue.java | 874 ++++++++++++++--- .../single/SingleChronicleQueueBuilder.java | 839 +++++++++++++--- .../single/SingleChronicleQueueStore.java | 345 +++++-- .../queue/impl/single/StoreAppender.java | 915 +++++++++++++----- .../queue/impl/single/StoreTailer.java | 608 +++++++++++- .../impl/single/TableDirectoryListing.java | 85 +- .../single/TableDirectoryListingReadOnly.java | 46 +- .../impl/single/TableStoreWriteLock.java | 77 +- .../impl/single/ThreadLocalAppender.java | 18 + .../queue/impl/single/WriteLock.java | 61 +- .../impl/single/namedtailer/IndexUpdater.java | 38 +- .../namedtailer/IndexUpdaterFactory.java | 69 +- .../queue/impl/table/AbstractTSQueueLock.java | 75 +- .../chronicle/queue/impl/table/Metadata.java | 38 + .../queue/impl/table/ReadonlyTableStore.java | 88 +- .../queue/impl/table/SingleTableBuilder.java | 1 + .../queue/impl/table/SingleTableStore.java | 71 +- .../queue/impl/table/TableStoreIterator.java | 14 + .../queue/impl/table/UnlockMode.java | 13 +- .../queue/internal/AnalyticsHolder.java | 22 +- .../internal/domestic/QueueOffsetSpec.java | 94 +- .../internal/main/InternalBenchmarkMain.java | 153 ++- .../queue/internal/main/InternalDumpMain.java | 80 +- .../internal/main/InternalPingPongMain.java | 89 +- ...ternalRemovableRollFileCandidatesMain.java | 17 +- .../internal/main/InternalUnlockMain.java | 33 +- ...nalDummyMethodReaderQueueEntryHandler.java | 35 +- ...nternalMessageToTextQueueEntryHandler.java | 49 +- .../MessageCountingMessageConsumer.java | 35 +- .../reader/PatternFilterMessageConsumer.java | 33 +- ...AbstractTailerPollingQueueEntryReader.java | 36 +- .../CustomPluginQueueEntryReader.java | 33 +- .../MethodReaderQueueEntryReader.java | 50 +- .../VanillaQueueEntryReader.java | 33 +- .../queue/internal/util/InternalFileUtil.java | 14 +- .../internal/writer/ChronicleWriter.java | 61 +- .../internal/writer/ChronicleWriterMain.java | 50 +- .../chronicle/queue/main/BenchmarkMain.java | 18 +- .../chronicle/queue/main/DumpMain.java | 38 +- .../chronicle/queue/main/HistoryMain.java | 19 +- .../chronicle/queue/main/PingPongMain.java | 16 +- .../chronicle/queue/main/ReaderMain.java | 10 +- .../main/RemovableRollFileCandidatesMain.java | 10 +- .../chronicle/queue/main/UnlockMain.java | 11 + .../queue/reader/ChronicleHistoryReader.java | 159 ++- .../queue/reader/ChronicleReader.java | 384 ++++++-- .../queue/reader/ChronicleReaderPlugin.java | 25 +- .../queue/reader/ContentBasedLimiter.java | 18 +- .../chronicle/queue/reader/HistoryReader.java | 98 +- .../queue/reader/MessageConsumer.java | 14 +- .../queue/reader/QueueEntryHandler.java | 15 + .../queue/reader/QueueEntryReader.java | 9 +- .../chronicle/queue/reader/Reader.java | 96 +- .../comparator/BinarySearchComparator.java | 11 + .../queue/rollcycles/LargeRollCycles.java | 49 +- .../queue/rollcycles/LegacyRollCycles.java | 49 +- .../queue/rollcycles/RollCycleArithmetic.java | 73 +- .../queue/rollcycles/SparseRollCycles.java | 52 +- .../queue/rollcycles/TestRollCycles.java | 62 +- .../chronicle/queue/util/FileState.java | 14 +- .../chronicle/queue/util/FileUtil.java | 90 +- .../chronicle/queue/util/MicroTouched.java | 13 +- .../chronicle/queue/util/PretouchUtil.java | 47 +- .../queue/util/PretoucherFactory.java | 23 + .../chronicle/queue/util/ToolsUtil.java | 14 +- sync-test/20240924F.cq4 | Bin 0 -> 83886080 bytes 120 files changed, 7611 insertions(+), 1556 deletions(-) create mode 100644 sync-test/20240924F.cq4 diff --git a/src/main/java/net/openhft/chronicle/queue/BufferMode.java b/src/main/java/net/openhft/chronicle/queue/BufferMode.java index fb261cc6de..d43e6f7c9b 100644 --- a/src/main/java/net/openhft/chronicle/queue/BufferMode.java +++ b/src/main/java/net/openhft/chronicle/queue/BufferMode.java @@ -18,10 +18,26 @@ package net.openhft.chronicle.queue; +/** + * Enum representing the different buffer modes that can be used within Chronicle Queue. + * Each mode has a specific use case and behavior depending on the configuration. + */ public enum BufferMode { - None, // The default + /** + * The default buffer mode. + * No additional buffering or special handling is applied. + */ + None, // Default mode, no buffering - Copy, // used in conjunction with encryption + /** + * Buffer mode used in conjunction with encryption. + * Data is copied into a buffer before being processed. + */ + Copy, // Used when encryption is enabled to handle buffered copies - Asynchronous // used by chronicle-ring [ which is an enterprise product ] + /** + * Buffer mode used for asynchronous processing. + * This mode is specific to Chronicle Ring, an enterprise product. + */ + Asynchronous // Asynchronous buffering used by Chronicle Ring (enterprise feature) } diff --git a/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java b/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java index 6c100f88b7..98d041c095 100644 --- a/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java +++ b/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java @@ -27,36 +27,58 @@ import java.util.concurrent.TimeUnit; /** - * Reads @see MessageHistory from a chronicle and outputs histograms for + * The main class for reading {@link net.openhft.chronicle.queue.MessageHistory} from a Chronicle queue + * and generating histograms for: * - * - * @author Jerry Shea + * This class provides options to configure the reader via command line arguments and outputs the results + * to the console. */ public class ChronicleHistoryReaderMain { + /** + * Entry point of the application. + * Initializes the {@link ChronicleHistoryReaderMain} and passes command-line arguments. + * + * @param args Command-line arguments + */ public static void main(@NotNull String[] args) { new ChronicleHistoryReaderMain().run(args); } + /** + * Runs the ChronicleHistoryReader setup and execution. + * Parses command-line options and configures the {@link ChronicleHistoryReader}. + * + * @param args Command-line arguments + */ protected void run(String[] args) { - final Options options = options(); - final CommandLine commandLine = parseCommandLine(args, options); + final Options options = options(); // Initialize command line options + final CommandLine commandLine = parseCommandLine(args, options); // Parse command line arguments try (final ChronicleHistoryReader chronicleHistoryReader = chronicleHistoryReader()) { + // Setup and execute the history reader setup(commandLine, chronicleHistoryReader); chronicleHistoryReader.execute(); } } + /** + * Configures the {@link ChronicleHistoryReader} based on the command-line options. + * + * @param commandLine Parsed command-line options + * @param chronicleHistoryReader The history reader to configure + */ protected void setup(@NotNull final CommandLine commandLine, @NotNull final ChronicleHistoryReader chronicleHistoryReader) { - chronicleHistoryReader. - withMessageSink(System.out::println). - withProgress(commandLine.hasOption('p')). - withHistosByMethod(commandLine.hasOption('m')). - withBasePath(Paths.get(commandLine.getOptionValue('d'))); + // Set message sink to output to System.out + chronicleHistoryReader.withMessageSink(System.out::println) + .withProgress(commandLine.hasOption('p')) // Enable progress if '-p' is specified + .withHistosByMethod(commandLine.hasOption('m')) // Enable histograms by method if '-m' is specified + .withBasePath(Paths.get(commandLine.getOptionValue('d'))); // Set base path from the directory option + + // Optionally configure time unit, items to ignore, window duration, and summary output if (commandLine.hasOption('t')) chronicleHistoryReader.withTimeUnit(TimeUnit.valueOf(commandLine.getOptionValue('t'))); if (commandLine.hasOption('i')) @@ -67,36 +89,65 @@ protected void setup(@NotNull final CommandLine commandLine, @NotNull final Chro chronicleHistoryReader.withSummaryOutput(Integer.parseInt(commandLine.getOptionValue('u'))); } + /** + * Initializes a new instance of {@link ChronicleHistoryReader}. + * + * @return A new {@link ChronicleHistoryReader} instance + */ @NotNull protected ChronicleHistoryReader chronicleHistoryReader() { return new ChronicleHistoryReader(); } + /** + * Parses command-line arguments using Apache Commons CLI. + * If help is requested, it prints the help message and exits. + * + * @param args Command-line arguments + * @param options Available command-line options + * @return Parsed {@link CommandLine} object + */ protected CommandLine parseCommandLine(@NotNull final String[] args, final Options options) { - final CommandLineParser parser = new DefaultParser(); + final CommandLineParser parser = new DefaultParser(); // Initialize command-line parser CommandLine commandLine = null; + try { - commandLine = parser.parse(options, args); + commandLine = parser.parse(options, args); // Parse arguments + // If help option is selected, print help and exit if (commandLine.hasOption('h')) { printHelpAndExit(options, 0); } } catch (ParseException e) { + // If parsing fails, print help with an error message and exit printHelpAndExit(options, 1, e.getMessage()); } return commandLine; } + /** + * Prints help and exits the program. + * + * @param options Command-line options + * @param status Exit status + */ protected void printHelpAndExit(final Options options, int status) { printHelpAndExit(options, status, null); } + /** + * Prints help and exits the program, optionally with a message. + * + * @param options Command-line options + * @param status Exit status + * @param message Optional message to print before help + */ protected void printHelpAndExit(final Options options, int status, String message) { final PrintWriter writer = new PrintWriter(System.out); new HelpFormatter().printHelp( writer, - 180, + 180, // Line width for formatting help output this.getClass().getSimpleName(), message, options, @@ -105,21 +156,26 @@ protected void printHelpAndExit(final Options options, int status, String messag null, true ); - writer.flush(); - System.exit(status); + writer.flush(); // Ensure everything is printed + System.exit(status); // Exit with provided status } + /** + * Configures command-line options for the ChronicleHistoryReaderMain. + * + * @return Configured {@link Options} object + */ @NotNull protected Options options() { - final Options options = new Options(); + final Options options = new Options(); // Initialize options ChronicleReaderMain.addOption(options, "d", "directory", true, "Directory containing chronicle queue files", true); ChronicleReaderMain.addOption(options, "h", "help-message", false, "Print this help and exit", false); ChronicleReaderMain.addOption(options, "t", "time unit", true, "Time unit. Default nanos", false); ChronicleReaderMain.addOption(options, "i", "ignore", true, "How many items to ignore from start", false); ChronicleReaderMain.addOption(options, "w", "window", true, "Window duration in time unit. Instead of one output at the end, will output every window period", false); ChronicleReaderMain.addOption(options, "u", "histo offset", true, "Summary output. Instead of histograms, will show one value only, in CSV format. Set this to 0 for 50th, 1 for 90th etc., -1 for worst", false); - options.addOption(new Option("p", false, "Show progress")); - options.addOption(new Option("m", false, "By method")); + options.addOption(new Option("p", false, "Show progress")); // Add 'p' option for showing progress + options.addOption(new Option("m", false, "By method")); // Add 'm' option for histogram by method return options; } } diff --git a/src/main/java/net/openhft/chronicle/queue/ChronicleQueue.java b/src/main/java/net/openhft/chronicle/queue/ChronicleQueue.java index 730dc45193..5175d6eeec 100644 --- a/src/main/java/net/openhft/chronicle/queue/ChronicleQueue.java +++ b/src/main/java/net/openhft/chronicle/queue/ChronicleQueue.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package net.openhft.chronicle.queue; import net.openhft.chronicle.core.io.Closeable; diff --git a/src/main/java/net/openhft/chronicle/queue/ChronicleReaderMain.java b/src/main/java/net/openhft/chronicle/queue/ChronicleReaderMain.java index c0bac2bd4c..342049d98d 100644 --- a/src/main/java/net/openhft/chronicle/queue/ChronicleReaderMain.java +++ b/src/main/java/net/openhft/chronicle/queue/ChronicleReaderMain.java @@ -34,66 +34,118 @@ import static java.util.Arrays.stream; /** - * Display records in a Chronicle in a text form. + * Main class for reading and displaying records from a Chronicle Queue in text form. + * Provides several command-line options to control behavior such as including/excluding records + * based on regex, following a live queue, or displaying records in various formats. */ public class ChronicleReaderMain { + /** + * Entry point of the application. Initializes the {@link ChronicleReaderMain} instance and + * passes command-line arguments for execution. + * + * @param args Command-line arguments + */ public static void main(@NotNull String[] args) { new ChronicleReaderMain().run(args); } + /** + * Adds an option to the provided {@link Options} object for command-line parsing. + * + * @param options The options object to add the option to + * @param opt The short name of the option + * @param argName The name of the argument + * @param hasArg Whether the option takes an argument + * @param description Description of the option + * @param isRequired Whether the option is required + */ public static void addOption(final Options options, final String opt, final String argName, final boolean hasArg, final String description, final boolean isRequired) { - final Option option = new Option(opt, hasArg, description); + final Option option = new Option(opt, hasArg, description); // Create option with argument option.setArgName(argName); - option.setRequired(isRequired); - options.addOption(option); + option.setRequired(isRequired); // Mark as required or not + options.addOption(option); // Add option to options object } + /** + * Runs the Chronicle Reader with the provided command-line arguments. + * Configures the {@link ChronicleReader} and executes the reader. + * + * @param args Command-line arguments + */ protected void run(@NotNull String[] args) { - final Options options = options(); - final CommandLine commandLine = parseCommandLine(args, options); + final Options options = options(); // Initialize command-line options + final CommandLine commandLine = parseCommandLine(args, options); // Parse command-line options - final ChronicleReader chronicleReader = chronicleReader(); + final ChronicleReader chronicleReader = chronicleReader(); // Create ChronicleReader instance - configureReader(chronicleReader, commandLine); + configureReader(chronicleReader, commandLine); // Configure the reader based on options - chronicleReader.execute(); + chronicleReader.execute(); // Execute the reader to display records } + /** + * Creates and returns a new instance of {@link ChronicleReader}. + * + * @return A new instance of {@link ChronicleReader} + */ protected ChronicleReader chronicleReader() { - return new ChronicleReader(); + return new ChronicleReader(); // Create and return ChronicleReader instance } + /** + * Parses the command-line arguments using Apache Commons CLI. + * If the help option is provided, prints the help message and exits. + * + * @param args Command-line arguments + * @param options Command-line options available + * @return The parsed {@link CommandLine} object + */ protected CommandLine parseCommandLine(final @NotNull String[] args, final Options options) { - final CommandLineParser parser = new DefaultParser(); + final CommandLineParser parser = new DefaultParser(); // Command-line parser CommandLine commandLine = null; try { - commandLine = parser.parse(options, args); + commandLine = parser.parse(options, args); // Parse arguments + // Print help if 'h' option is provided if (commandLine.hasOption('h')) { printHelpAndExit(options, 0); } } catch (ParseException e) { + // On parsing error, print help with an error message printHelpAndExit(options, 1, e.getMessage()); } return commandLine; } + /** + * Prints help information and exits the application. + * + * @param options Command-line options + * @param status Exit status code + */ protected void printHelpAndExit(final Options options, int status) { printHelpAndExit(options, status, null); } + /** + * Prints help information along with an optional message and exits the application. + * + * @param options Command-line options + * @param status Exit status code + * @param message Optional message to display before help + */ protected void printHelpAndExit(final Options options, int status, String message) { final PrintWriter writer = new PrintWriter(System.out); new HelpFormatter().printHelp( writer, - 180, + 180, // Line width for formatted help output this.getClass().getSimpleName(), message, options, @@ -102,18 +154,27 @@ protected void printHelpAndExit(final Options options, int status, String messag null, true ); - writer.flush(); - System.exit(status); + writer.flush(); // Ensure all help is printed + System.exit(status); // Exit with the provided status } + /** + * Configures the {@link ChronicleReader} based on the command-line options. + * Supports various options like regex filtering, tailing the queue, and more. + * + * @param chronicleReader The ChronicleReader instance to configure + * @param commandLine Parsed command-line options + */ protected void configureReader(final ChronicleReader chronicleReader, final CommandLine commandLine) { + // Set up message sink; squash output to single line if 'l' option is provided final Consumer messageSink = commandLine.hasOption('l') ? s -> System.out.println(s.replaceAll("\n", "")) : System.out::println; - chronicleReader. - withMessageSink(messageSink). - withBasePath(Paths.get(commandLine.getOptionValue('d'))); + chronicleReader + .withMessageSink(messageSink) // Configure the message sink + .withBasePath(Paths.get(commandLine.getOptionValue('d'))); // Set base path for chronicle queue files + // Apply various optional configurations based on command-line options if (commandLine.hasOption('i')) { stream(commandLine.getOptionValues('i')).forEach(chronicleReader::withInclusionRegex); } @@ -121,40 +182,40 @@ protected void configureReader(final ChronicleReader chronicleReader, final Comm stream(commandLine.getOptionValues('e')).forEach(chronicleReader::withExclusionRegex); } if (commandLine.hasOption('f')) { - chronicleReader.tail(); + chronicleReader.tail(); // Enable tail mode if 'f' option is provided } if (commandLine.hasOption('m')) { - chronicleReader.historyRecords(Long.parseLong(commandLine.getOptionValue('m'))); + chronicleReader.historyRecords(Long.parseLong(commandLine.getOptionValue('m'))); // Limit history records } if (commandLine.hasOption('n')) { - chronicleReader.withStartIndex(Long.decode(commandLine.getOptionValue('n'))); + chronicleReader.withStartIndex(Long.decode(commandLine.getOptionValue('n'))); // Set start index } if (commandLine.hasOption('r')) { final String r = commandLine.getOptionValue('r'); - chronicleReader.asMethodReader(r.equals("null") ? "" : r); - chronicleReader.showMessageHistory(commandLine.hasOption('g')); + chronicleReader.asMethodReader(r.equals("null") ? "" : r); // Configure as method reader + chronicleReader.showMessageHistory(commandLine.hasOption('g')); // Show message history if 'g' is present } if (commandLine.hasOption('w')) { - chronicleReader.withWireType(WireType.valueOf(commandLine.getOptionValue('w'))); + chronicleReader.withWireType(WireType.valueOf(commandLine.getOptionValue('w'))); // Set wire type } if (commandLine.hasOption('s')) { - chronicleReader.suppressDisplayIndex(); + chronicleReader.suppressDisplayIndex(); // Suppress index display if 's' is present } if (commandLine.hasOption('z')) { System.setProperty(AbstractTimestampLongConverter.TIMESTAMP_LONG_CONVERTERS_ZONE_ID_SYSTEM_PROPERTY, - ZoneId.systemDefault().toString()); + ZoneId.systemDefault().toString()); // Use local timezone if 'z' is present } if (commandLine.hasOption('a')) { - chronicleReader.withArg(commandLine.getOptionValue('a')); + chronicleReader.withArg(commandLine.getOptionValue('a')); // Pass argument to binary search if 'a' is present } if (commandLine.hasOption('b')) { - chronicleReader.withBinarySearch(commandLine.getOptionValue('b')); + chronicleReader.withBinarySearch(commandLine.getOptionValue('b')); // Configure binary search } if (commandLine.hasOption('k')) { - chronicleReader.inReverseOrder(); + chronicleReader.inReverseOrder(); // Read the queue in reverse if 'k' is present } if (commandLine.hasOption('x')) { - chronicleReader.withMatchLimit(Long.parseLong(commandLine.getOptionValue('x'))); + chronicleReader.withMatchLimit(Long.parseLong(commandLine.getOptionValue('x'))); // Limit match results } if (commandLine.hasOption("cbl")) { final String cbl = commandLine.getOptionValue("cbl"); @@ -171,17 +232,23 @@ protected void configureReader(final ChronicleReader chronicleReader, final Comm } } if (commandLine.hasOption("cblArg")) { - chronicleReader.withLimiterArg(commandLine.getOptionValue("cblArg")); + chronicleReader.withLimiterArg(commandLine.getOptionValue("cblArg")); // Set content-based limiter argument } if (commandLine.hasOption("named")) { - chronicleReader.withTailerId(commandLine.getOptionValue("named")); + chronicleReader.withTailerId(commandLine.getOptionValue("named")); // Set named tailer ID } } + /** + * Configures the available command-line options for the {@link ChronicleReaderMain}. + * + * @return A configured {@link Options} object with all available options + */ @NotNull protected Options options() { - final Options options = new Options(); + final Options options = new Options(); // Create new Options object + // Add various command-line options addOption(options, "d", "directory", true, "Directory containing chronicle queue files", true); addOption(options, "i", "include-regex", true, "Display records containing this regular expression", false); addOption(options, "e", "exclude-regex", true, "Do not display records containing this regular expression", false); @@ -202,6 +269,7 @@ protected Options options() { addOption(options, "cbl", "content-based-limiter", true, "Specify a content-based limiter", false); addOption(options, "cblArg", "content-based-limiter-argument", true, "Specify an argument for use by the content-based limiter", false); addOption(options, "named", "named", true, "Named tailer ID", false); - return options; + + return options; // Return configured options } } diff --git a/src/main/java/net/openhft/chronicle/queue/ChronicleWriterMain.java b/src/main/java/net/openhft/chronicle/queue/ChronicleWriterMain.java index 90700f79d7..127c6ae5a3 100644 --- a/src/main/java/net/openhft/chronicle/queue/ChronicleWriterMain.java +++ b/src/main/java/net/openhft/chronicle/queue/ChronicleWriterMain.java @@ -20,9 +20,21 @@ import org.jetbrains.annotations.NotNull; +/** + * The entry point for writing records to a Chronicle Queue. + * This class delegates the writing task to the internal {@link net.openhft.chronicle.queue.internal.writer.ChronicleWriterMain}. + */ public class ChronicleWriterMain { + /** + * Main method for executing the ChronicleWriterMain. + * It delegates to the internal writer implementation to run the application with the given arguments. + * + * @param args Command-line arguments for the writer + * @throws Exception if an error occurs during execution + */ public static void main(@NotNull String[] args) throws Exception { + // Delegate the task to the internal ChronicleWriterMain to handle the actual writing process new net.openhft.chronicle.queue.internal.writer.ChronicleWriterMain().run(args); } } diff --git a/src/main/java/net/openhft/chronicle/queue/CycleCalculator.java b/src/main/java/net/openhft/chronicle/queue/CycleCalculator.java index ab10b60655..70e6bcd4ad 100644 --- a/src/main/java/net/openhft/chronicle/queue/CycleCalculator.java +++ b/src/main/java/net/openhft/chronicle/queue/CycleCalculator.java @@ -20,7 +20,23 @@ import net.openhft.chronicle.core.time.TimeProvider; +/** + * Functional interface representing a calculator for determining the current cycle based on + * a {@link RollCycle}, a {@link TimeProvider}, and an optional offset in milliseconds. + *

+ * This interface is intended to be used for customizing the cycle calculation logic in + * Chronicle Queue, particularly when working with different roll cycles or time-based patterns. + */ @FunctionalInterface public interface CycleCalculator { + + /** + * Calculates the current cycle based on the provided {@link RollCycle}, {@link TimeProvider}, and an offset in milliseconds. + * + * @param rollCycle The roll cycle that defines the periodicity of the data rolls + * @param timeProvider The time provider that supplies the current time + * @param offsetMillis The time offset in milliseconds, typically used for adjusting the cycle calculation + * @return The current cycle as an integer, calculated according to the given roll cycle and time + */ int currentCycle(final RollCycle rollCycle, final TimeProvider timeProvider, final long offsetMillis); } diff --git a/src/main/java/net/openhft/chronicle/queue/DefaultCycleCalculator.java b/src/main/java/net/openhft/chronicle/queue/DefaultCycleCalculator.java index 27314a7213..f7b76efc5c 100644 --- a/src/main/java/net/openhft/chronicle/queue/DefaultCycleCalculator.java +++ b/src/main/java/net/openhft/chronicle/queue/DefaultCycleCalculator.java @@ -20,11 +20,29 @@ import net.openhft.chronicle.core.time.TimeProvider; +/** + * Singleton enum implementation of {@link CycleCalculator} that provides a default mechanism + * for calculating the current cycle based on the provided {@link RollCycle} and {@link TimeProvider}. + *

+ * This enum ensures there is only one instance of the cycle calculator, represented by {@code INSTANCE}. + */ public enum DefaultCycleCalculator implements CycleCalculator { + /** + * The single instance of the {@code DefaultCycleCalculator}. + */ INSTANCE; + /** + * Calculates the current cycle by delegating to the provided {@link RollCycle}. + * Uses the {@link TimeProvider} and an optional offset in milliseconds to determine the current cycle. + * + * @param rollCycle The roll cycle that defines the periodicity of the data rolls + * @param timeProvider The time provider that supplies the current time + * @param offsetMillis The time offset in milliseconds, typically used for adjusting the cycle calculation + * @return The current cycle as an integer, calculated according to the given roll cycle and time + */ @Override public int currentCycle(final RollCycle rollCycle, final TimeProvider timeProvider, final long offsetMillis) { - return rollCycle.current(timeProvider, offsetMillis); + return rollCycle.current(timeProvider, offsetMillis); // Delegate the cycle calculation to RollCycle } } diff --git a/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java b/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java index e882544daa..f57cc2b7ee 100644 --- a/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java +++ b/src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package net.openhft.chronicle.queue; import net.openhft.chronicle.bytes.Bytes; diff --git a/src/main/java/net/openhft/chronicle/queue/ExcerptCommon.java b/src/main/java/net/openhft/chronicle/queue/ExcerptCommon.java index 9eec4465ca..190b6ce752 100644 --- a/src/main/java/net/openhft/chronicle/queue/ExcerptCommon.java +++ b/src/main/java/net/openhft/chronicle/queue/ExcerptCommon.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package net.openhft.chronicle.queue; import net.openhft.chronicle.core.io.Closeable; diff --git a/src/main/java/net/openhft/chronicle/queue/ExcerptTailer.java b/src/main/java/net/openhft/chronicle/queue/ExcerptTailer.java index ec3ca76056..6e6658641a 100644 --- a/src/main/java/net/openhft/chronicle/queue/ExcerptTailer.java +++ b/src/main/java/net/openhft/chronicle/queue/ExcerptTailer.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package net.openhft.chronicle.queue; import net.openhft.chronicle.core.annotation.SingleThreaded; diff --git a/src/main/java/net/openhft/chronicle/queue/NoMessageHistory.java b/src/main/java/net/openhft/chronicle/queue/NoMessageHistory.java index 54da9a330b..f883fee700 100644 --- a/src/main/java/net/openhft/chronicle/queue/NoMessageHistory.java +++ b/src/main/java/net/openhft/chronicle/queue/NoMessageHistory.java @@ -15,66 +15,136 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package net.openhft.chronicle.queue; import net.openhft.chronicle.wire.*; -// Is this being used? -@SuppressWarnings("deprecation") +/** + * A singleton implementation of the {@link MessageHistory} interface that represents a "no-op" or "empty" message history. + * This is used when message history tracking is not required. All methods in this class return default values, + * indicating no history is being stored. + *

+ * It suppresses warnings for deprecated methods, though this may need to be verified in the broader context to ensure + * it's still appropriate. + */ +@SuppressWarnings("deprecation") // Suppressing warnings related to deprecated methods public enum NoMessageHistory implements MessageHistory { + /** + * The single instance of {@code NoMessageHistory}, following the singleton pattern. + */ INSTANCE; + /** + * Returns the number of timings recorded in the message history. Always returns 0 since this implementation does not track timings. + * + * @return 0, indicating no timings are recorded + */ @Override public int timings() { - return 0; + return 0; // No timings recorded } + /** + * Returns the timing for the specified index. Always returns -1 as no timings are recorded in this implementation. + * + * @param n The index of the timing (ignored) + * @return -1, indicating no timing available + */ @Override public long timing(int n) { - return -1; + return -1; // No timing information available } + /** + * Returns the number of sources that processed the message. Always returns 0 as no source tracking is done. + * + * @return 0, indicating no sources processed the message + */ @Override public int sources() { - return 0; + return 0; // No sources recorded } + /** + * Returns the source ID for the specified index. Always returns -1 since source tracking is not done. + * + * @param n The index of the source ID (ignored) + * @return -1, indicating no source ID available + */ @Override public int sourceId(int n) { - return -1; + return -1; // No source ID available } + /** + * Determines if the source IDs end with the specified array of source IDs. Always returns false as no source IDs are tracked. + * + * @param sourceIds Array of source IDs to check (ignored) + * @return false, indicating the source IDs do not match + */ @Override public boolean sourceIdsEndsWith(int[] sourceIds) { - return false; + return false; // No source IDs to compare } + /** + * Returns the source index for the specified index. Always returns -1 as source index tracking is not done. + * + * @param n The index of the source (ignored) + * @return -1, indicating no source index available + */ @Override public long sourceIndex(int n) { - return -1; + return -1; // No source index available } + /** + * Resets the message history with the provided source ID and source index. + * This implementation ignores the reset as no history is stored. + * + * @param sourceId The source ID to reset (ignored) + * @param sourceIndex The source index to reset (ignored) + */ @Override public void reset(int sourceId, long sourceIndex) { - // ignored + // No-op: no history is stored, so reset is ignored } + /** + * Resets the message history. This implementation performs no action as no history is stored. + */ public void reset() { - // no-op + // No-op: nothing to reset } + /** + * Returns the ID of the last source that processed the message. Always returns -1 as no source tracking is done. + * + * @return -1, indicating no last source ID available + */ @Override public int lastSourceId() { - return -1; + return -1; // No last source ID available } + /** + * Returns the index of the last source that processed the message. Always returns -1 as no source tracking is done. + * + * @return -1, indicating no last source index available + */ @Override public long lastSourceIndex() { - return -1; + return -1; // No last source index available } + /** + * Indicates if the message history has been modified. Always returns false as no history is stored or tracked. + * + * @return false, indicating the history is not dirty + */ @Override public boolean isDirty() { - return false; + return false; // No history modifications to track } } diff --git a/src/main/java/net/openhft/chronicle/queue/QueueSystemProperties.java b/src/main/java/net/openhft/chronicle/queue/QueueSystemProperties.java index 05cbc3b95f..190bd8a523 100644 --- a/src/main/java/net/openhft/chronicle/queue/QueueSystemProperties.java +++ b/src/main/java/net/openhft/chronicle/queue/QueueSystemProperties.java @@ -20,39 +20,56 @@ import net.openhft.chronicle.core.Jvm; +/** + * A utility class for managing Chronicle Queue related system properties. + * This class defines constants for various system property keys and provides mechanisms + * for configuring aspects of Chronicle Queue behavior via system properties. + *

+ * The class cannot be instantiated and only contains static fields and methods. + */ public final class QueueSystemProperties { + // Private constructor to prevent instantiation of the utility class private QueueSystemProperties() { } /** - * Returns if Chronicle Queue shall assert certain index invariants on various - * occasions throughout the code. Setting this property to "", "yes" or "true" - * will enable this feature. Enabling the feature will slow down execution if - * assertions (-ea) are enabled. + * Indicates whether Chronicle Queue should assert certain index invariants at various points in the code. + *

+ * This system property can be used to enable index checking, which will slow down execution if assertions are enabled (-ea). + * The feature is enabled by setting the system property "queue.check.index" to one of the following values: "", "yes", or "true". *

- * System Property key: "queue.check.index" - * Default unset value: false + * System Property key: "queue.check.index"
+ * Default unset value: false
* Activation values : "", "yes", or "true" + * + * @see Jvm#getBoolean(String) for more details on how boolean properties are evaluated. */ public static boolean CHECK_INDEX = Jvm.getBoolean("queue.check.index"); /** - * Name of a system property used to specify the default roll cycle. + * The system property key used to specify the default roll cycle for a Chronicle Queue. *

- * System Property key: "net.openhft.queue.builder.defaultRollCycle" - * Fallback if unset : to {@link net.openhft.chronicle.queue.RollCycles#DEFAULT} - * Valid values : Class name of an entity implementing RollCycle such as "net.openhft.chronicle.queue.harness.WeeklyRollCycle" - * or enum value in class:name format such as "net.openhft.chronicle.queue.RollCycles:HOURLY" + * This property allows configuration of a custom roll cycle by setting the property to either: + *

+ *

+ * System Property key: "net.openhft.queue.builder.defaultRollCycle"
+ * Fallback if unset: {@link net.openhft.chronicle.queue.RollCycles#DEFAULT} */ public static final String DEFAULT_ROLL_CYCLE_PROPERTY = "net.openhft.queue.builder.defaultRollCycle"; /** - * Name of a system property used to specify the default epoch offset property. + * The system property key used to specify the default epoch offset for Chronicle Queue timestamps. + *

+ * This property can be set to any long value, representing the epoch offset in milliseconds. + * The default value is 0L if the property is not set. *

- * System Property key: "net.openhft.queue.builder.defaultEpoch" - * Default unset value: 0L - * Valid values : Any long value + * System Property key: "net.openhft.queue.builder.defaultEpoch"
+ * Default unset value: 0L
+ * Valid values: Any long value */ public static final String DEFAULT_EPOCH_PROPERTY = "net.openhft.queue.builder.defaultEpoch"; diff --git a/src/main/java/net/openhft/chronicle/queue/RollCycle.java b/src/main/java/net/openhft/chronicle/queue/RollCycle.java index b164059a11..343d40d573 100644 --- a/src/main/java/net/openhft/chronicle/queue/RollCycle.java +++ b/src/main/java/net/openhft/chronicle/queue/RollCycle.java @@ -15,61 +15,71 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package net.openhft.chronicle.queue; import net.openhft.chronicle.core.time.TimeProvider; import org.jetbrains.annotations.NotNull; +/** + * Interface defining the behavior of a roll cycle in Chronicle Queue. A roll cycle dictates how frequently data rolls over + * to a new file and manages the indexing of records within those cycles. + *

+ * Roll cycles are defined by a period of time, such as MINUTELY or DAILY, and they provide methods for managing the + * cycle duration, formatting, and indexing. + */ public interface RollCycle { /** - * Returns the format that is to be applied when file names are calculated for a new roll cycle. + * Returns the format to be applied when file names are calculated for a new roll cycle. *

- * For example, the following formats can be returned: + * For example, the following formats may be returned: *

- * Lexicographical order of formatted cycles must preserve chronological order, i.e. if {@code cycle1 < cycle2}, - * the same relation must be kept for their string representations. + * The lexicographical order of formatted cycles must preserve chronological order, meaning that if {@code cycle1 < cycle2}, + * the same relation must hold true for their string representations. * - * @return the format that is to be applied when file names are calculated for a new roll cycle + * @return the format that is applied when file names are calculated for a new roll cycle */ @NotNull String format(); /** - * Returns the length in milliseconds (i.e. the maximum duration) for a roll cycle. + * Returns the length in milliseconds for a roll cycle, representing the maximum duration for one cycle. *

- * For example, the following lengths can be returned: + * Examples of cycle lengths: *

* - * @return the length in milliseconds (i.e. the maximum duration) for a roll cycle + * @return the length in milliseconds for the roll cycle */ int lengthInMillis(); /** - * @return the default epoch offset if one is not set + * Returns the default epoch offset if none is set. The epoch offset allows users to define their own start time for the cycle. + * + * @return the default epoch offset, typically 0 */ default int defaultEpoch() { return 0; } /** - * @return the size of each index array, note: indexCount^2 is the maximum number of index queue entries. + * Returns the size of each index array. The value {@code indexCount^2} represents the maximum number of index queue entries. + * + * @return the size of each index array */ int defaultIndexCount(); /** * Returns the space between excerpts that are explicitly indexed. + * A larger value improves sequential write performance but slows down random access reads. Sequential read performance is not impacted. *

- * A higher number means higher sequential write performance but slower random access read. The sequential read performance is not affected by - * this property. - *

- * For example, the following default index spacing can be returned: + * Example values: *

*/ public enum TailerState { - END_OF_CYCLE, // Tailer has reached the end of the current cycle - FOUND_IN_CYCLE, // An entry was found in the current cycle - BEYOND_START_OF_CYCLE, // Tailer has moved beyond the start of the cycle - CYCLE_NOT_FOUND, // The requested cycle could not be found - NOT_REACHED_IN_CYCLE, // Tailer has not yet reached an entry in the cycle - UNINITIALISED // Tailer has not been initialized yet + END_OF_CYCLE, + FOUND_IN_CYCLE, + BEYOND_START_OF_CYCLE, + CYCLE_NOT_FOUND, + NOT_REACHED_IN_CYCLE, + UNINITIALISED } diff --git a/src/main/java/net/openhft/chronicle/queue/impl/CommonStore.java b/src/main/java/net/openhft/chronicle/queue/impl/CommonStore.java index 0fe2370f18..b8f82d0cd9 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/CommonStore.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/CommonStore.java @@ -33,10 +33,9 @@ * and dumping the contents of the store in a specific wire format. * *

This interface extends both {@link Demarshallable} and {@link WriteMarshallable}, allowing the store to - * be serialized and deserialized using Chronicle's wire formats.

+ * be serialized and deserialized using Chronicle's wire formats. */ public interface CommonStore extends Demarshallable, WriteMarshallable { - /** * Returns the file associated with this store, if any. * diff --git a/src/main/java/net/openhft/chronicle/queue/impl/ExcerptContext.java b/src/main/java/net/openhft/chronicle/queue/impl/ExcerptContext.java index 26b8da2633..ad608976a3 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/ExcerptContext.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/ExcerptContext.java @@ -14,6 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package net.openhft.chronicle.queue.impl; diff --git a/src/main/java/net/openhft/chronicle/queue/impl/RollingChronicleQueue.java b/src/main/java/net/openhft/chronicle/queue/impl/RollingChronicleQueue.java index 6c5d17afe3..6615e52d8d 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/RollingChronicleQueue.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/RollingChronicleQueue.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package net.openhft.chronicle.queue.impl; import net.openhft.chronicle.queue.ChronicleQueue; @@ -32,7 +31,7 @@ * additional methods for managing roll cycles, storing and retrieving data, and counting excerpts in a Chronicle Queue. * *

It defines the epoch offset, cycle management, and excerpt counting mechanisms, along with various queue - * parameters such as index count, spacing, and delta checkpoint intervals.

+ * parameters such as index count, spacing, and delta checkpoint intervals. */ public interface RollingChronicleQueue extends ChronicleQueue { @@ -90,10 +89,16 @@ public interface RollingChronicleQueue extends ChronicleQueue { int nextCycle(int currentCycle, @NotNull TailerDirection direction) throws ParseException; /** - * Calculates the number of excerpts (messages) between two specified index positions. + * The number of excerpts between the indexes, {@code fromIndex} inclusive, {@code toIndex} + * exclusive. + *

+ * When {@code fromIndex} and {@code toIndex} are in different cycles which are not adjacent, this + * operation can be expensive, as the index count for each intermediate cycle has to be found + * and calculated. As such, and in this situation, it's not recommended to call this method + * regularly in latency sensitive systems. * *

This operation can be expensive if the indexes are in different, non-adjacent cycles, - * as it may involve querying and calculating index counts for intermediate cycles.

+ * as it may involve querying and calculating index counts for intermediate cycles. * * @param fromIndex the starting index (inclusive). No validation is performed on this index. * @param toIndex the ending index (exclusive). No validation is performed on this index. diff --git a/src/main/java/net/openhft/chronicle/queue/impl/RollingResourcesCache.java b/src/main/java/net/openhft/chronicle/queue/impl/RollingResourcesCache.java index 6bea670fd3..38bbeab18e 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/RollingResourcesCache.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/RollingResourcesCache.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package net.openhft.chronicle.queue.impl; import net.openhft.chronicle.core.Jvm; diff --git a/src/main/java/net/openhft/chronicle/queue/impl/StoreFileListener.java b/src/main/java/net/openhft/chronicle/queue/impl/StoreFileListener.java index bff7e6f479..c0a6380cc0 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/StoreFileListener.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/StoreFileListener.java @@ -14,6 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package net.openhft.chronicle.queue.impl; @@ -26,14 +27,12 @@ * opened or closed, such as managing resources or triggering background processes. * *

The interface provides a default method for determining if the listener is active, and methods for handling - * file acquisition and release events. It also defines a {@code NO_OP} listener that performs no actions.

+ * file acquisition and release events. It also defines a {@code NO_OP} listener that performs no actions. * - *

Listeners implementing this interface may be notified asynchronously when files are acquired or released.

+ *

Listeners implementing this interface may be notified asynchronously when files are acquired or released. */ @FunctionalInterface public interface StoreFileListener { - - // A no-operation listener that performs no actions on file events. StoreFileListener NO_OP = StoreFileListeners.NO_OP; /** @@ -49,20 +48,20 @@ default boolean isActive() { * Called when a file is acquired. * *

This method is called asynchronously when a store file is acquired for use, allowing for any - * necessary handling of the file acquisition event. By default, this method does nothing.

+ * necessary handling of the file acquisition event. By default, this method does nothing. * * @param cycle the cycle associated with the acquired file. * @param file the {@link File} object representing the acquired file. */ default void onAcquired(int cycle, File file) { - // Default implementation does nothing + } /** * Called when a file is released. * *

This method is called asynchronously when a store file is released, allowing for any - * necessary handling of the file release event.

+ * necessary handling of the file release event. * * @param cycle the cycle associated with the released file. * @param file the {@link File} object representing the released file. diff --git a/src/main/java/net/openhft/chronicle/queue/impl/StoreFileListeners.java b/src/main/java/net/openhft/chronicle/queue/impl/StoreFileListeners.java index 4a76fa4b58..6888afb041 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/StoreFileListeners.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/StoreFileListeners.java @@ -26,7 +26,7 @@ * The {@code StoreFileListeners} enum provides predefined implementations of the {@link StoreFileListener} interface. * These listeners allow for different levels of actions when store files are acquired or released. * - *

This enum provides two implementations:

+ *

This enum provides two implementations: *

*/ public class InternalBenchmarkMain { - // Flags and configuration for the benchmark static volatile boolean running = true; - static int throughput = Integer.getInteger("throughput", 250); // Target throughput in MB/s - static int runtime = Integer.getInteger("runtime", 300); // Benchmark runtime in seconds - static String basePath = System.getProperty("path", OS.TMP); // Base path for the Chronicle Queue - static volatile long readerLoopTime = 0; // Reader loop time - static volatile long readerEndLoopTime = 0; // Reader end loop time - static int counter = 0; // Counter for iterations + static int throughput = Integer.getInteger("throughput", 250); // MB/s + static int runtime = Integer.getInteger("runtime", 300); // seconds + static String basePath = System.getProperty("path", OS.TMP); + static volatile long readerLoopTime = 0; + static volatile long readerEndLoopTime = 0; + static int counter = 0; - // Static block for enabling JVM safepoint logging static { System.setProperty("jvm.safepoint.enabled", "true"); } @@ -93,12 +91,11 @@ public static void main(String[] args) { * @param messageSize the size of each message in bytes */ static void benchmark(int messageSize) { - Histogram writeTime = new Histogram(32, 7); // Stores write latencies - Histogram transportTime = new Histogram(32, 7); // Stores transport latencies - Histogram readTime = new Histogram(32, 7); // Stores read latencies - String path = basePath + "/test-q-" + messageSize; // Path for the queue files + Histogram writeTime = new Histogram(32, 7); + Histogram transportTime = new Histogram(32, 7); + Histogram readTime = new Histogram(32, 7); + String path = basePath + "/test-q-" + messageSize; - // Create a new ChronicleQueue at the given path ChronicleQueue queue = createQueue(path); // Pretoucher will only work with Queue Enterprise in the path @@ -106,24 +103,24 @@ static void benchmark(int messageSize) { try (ExcerptAppender appender = queue.createAppender()) { Thread thread = Thread.currentThread(); while (!thread.isInterrupted()) { - appender.pretouch(); // Pre-touch the queue to pre-load memory pages - Jvm.pause(10); // Pause between touches + appender.pretouch(); + Jvm.pause(10); } } }); pretoucher.setDaemon(true); - pretoucher.start(); // Start the pre-touching thread + pretoucher.start(); - Histogram loopTime = new Histogram(); // Stores loop time measurements + Histogram loopTime = new Histogram(); // Start a thread to read from the queue Thread reader = new Thread(() -> { // try (ChronicleQueue queue2 = createQueue(path)) - ExcerptTailer tailer = queue.createTailer().toEnd(); // Create a tailer that starts at the end of the queue + ExcerptTailer tailer = queue.createTailer().toEnd(); long endLoop = System.nanoTime(); while (running) { - loopTime.sample((double) (System.nanoTime() - endLoop)); // Measure loop times - Jvm.safepoint(); // Trigger JVM safepoint + loopTime.sample((double) (System.nanoTime() - endLoop)); + Jvm.safepoint(); // readerLoopTime = System.nanoTime(); // if (readerLoopTime - readerEndLoopTime > 1000) @@ -137,30 +134,30 @@ static void benchmark(int messageSize) { // } finally { // readerEndLoopTime = System.nanoTime(); // } - Jvm.safepoint(); // Trigger JVM safepoint - endLoop = System.nanoTime(); // Update the loop time for the next iteration + Jvm.safepoint(); + endLoop = System.nanoTime(); } }); - reader.start(); // Start the reader thread - Jvm.pause(250); // Give the reader time to start + reader.start(); + Jvm.pause(250); // give the reader time to start long next = System.nanoTime(); - long end = (long) (next + runtime * 1e9); // End time for the benchmark + long end = (long) (next + runtime * 1e9); - ExcerptAppender appender = queue.createAppender(); // Create an appender to write to the queue + ExcerptAppender appender = queue.createAppender(); while (end > System.nanoTime()) { long start = System.nanoTime(); try (DocumentContext dc = appender.writingDocument(false)) { - writeMessage(dc.wire(), messageSize); // Write a message to the queue + writeMessage(dc.wire(), messageSize); } long written = System.nanoTime(); - long time = written - start; // Calculate write latency + long time = written - start; // System.out.println(time); - writeTime.sample(time); // Sample the write time + writeTime.sample(time); // Ensure the reader is keeping up with the writer long diff = writeTime.totalCount() - readTime.totalCount(); - Thread.yield(); // Yield to give the reader time to catch up - if (diff >= 200) { // If the difference is too large, log details + Thread.yield(); + if (diff >= 200) { // long rlt = readerLoopTime; // long delay = System.nanoTime() - rlt; System.out.println("diff=" + diff /* +" delay= " + delay*/); @@ -174,7 +171,7 @@ static void benchmark(int messageSize) { next += (long) (messageSize * 1e9 / (throughput * 1e6)); long delay = next - System.nanoTime(); if (delay > 0) - LockSupport.parkNanos(delay); // Pause to maintain target throughput + LockSupport.parkNanos(delay); } // Wait for the reader to catch up before shutting down @@ -208,7 +205,7 @@ static void benchmark(int messageSize) { * @param tailer The ExcerptTailer used to read from the queue */ private static void runInner(Histogram transportTime, Histogram readTime, ExcerptTailer tailer) { - Jvm.safepoint(); // Trigger JVM safepoint + Jvm.safepoint(); /*if (tailer.peekDocument()) { if (counter++ < 1000) { Jvm.safepoint(); @@ -220,21 +217,21 @@ private static void runInner(Histogram transportTime, Histogram readTime, Excerp else Jvm.safepoint(); counter = 0; - try (DocumentContext dc = tailer.readingDocument(false)) { // Read the next document + try (DocumentContext dc = tailer.readingDocument(false)) { Jvm.safepoint(); if (!dc.isPresent()) { return; } - long transport = System.nanoTime(); // Start measuring transport time + long transport = System.nanoTime(); Jvm.safepoint(); Wire wire = dc.wire(); Bytes bytes = wire.bytes(); - long start = readMessage(bytes); // Process the message from the bytes - long end = System.nanoTime(); // End of read operation - transportTime.sample((double) (transport - start)); // Sample transport time - readTime.sample((double) (end - transport)); // Sample read time + long start = readMessage(bytes); + long end = System.nanoTime(); + transportTime.sample((double) (transport - start)); + readTime.sample((double) (end - transport)); } - Jvm.safepoint(); // Trigger JVM safepoint + Jvm.safepoint(); } /** @@ -246,8 +243,8 @@ private static void runInner(Histogram transportTime, Histogram readTime, Excerp @NotNull private static ChronicleQueue createQueue(String path) { return ChronicleQueue.singleBuilder(path) - .blockSize(1 << 30) // Set the block size to 1GB - .pauserSupplier(Pauser::timedBusy) // Use a timed busy pauser + .blockSize(1 << 30) + .pauserSupplier(Pauser::timedBusy) .build(); } @@ -259,13 +256,13 @@ private static ChronicleQueue createQueue(String path) { */ private static long readMessage(Bytes bytes) { Jvm.safepoint(); - long start = bytes.readLong(); // Read the start time + long start = bytes.readLong(); long rp = bytes.readPosition(); long rl = bytes.readLimit(); long addr = bytes.addressForRead(rp); long addrEnd = bytes.addressForRead(rl); Memory memory = OS.memory(); - for (addr += 8; addr + 7 < addrEnd; addr += 8) // Read the rest of the message + for (addr += 8; addr + 7 < addrEnd; addr += 8) memory.readLong(addr); Jvm.safepoint(); return start; @@ -282,12 +279,12 @@ private static void writeMessage(Wire wire, int messageSize) { long wp = bytes.writePosition(); long addr = bytes.addressForWrite(wp); Memory memory = OS.memory(); - for (int i = 0; i < messageSize; i += 16) { // Write the message data + for (int i = 0; i < messageSize; i += 16) { memory.writeLong(addr + i, 0L); memory.writeLong(addr + i + 8, 0L); } bytes.writeSkip(messageSize); - bytes.writeLong(wp, System.nanoTime()); // Record the current time as the start time + bytes.writeLong(wp, System.nanoTime()); } } diff --git a/src/main/java/net/openhft/chronicle/queue/internal/main/InternalDumpMain.java b/src/main/java/net/openhft/chronicle/queue/internal/main/InternalDumpMain.java index 0741098db9..cd11cd29c1 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/main/InternalDumpMain.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/main/InternalDumpMain.java @@ -38,15 +38,12 @@ * The dump includes detailed binary structure of the files in a human-readable format. */ public class InternalDumpMain { - - // File system properties for dumping files private static final String FILE = System.getProperty("file"); private static final boolean SKIP_TABLE_STORE = Jvm.getBoolean("skipTableStoreDump"); private static final boolean UNALIGNED = Jvm.getBoolean("dumpUnaligned"); private static final int LENGTH = ", 0".length(); static { - // Register aliases for SingleChronicleQueueBuilder SingleChronicleQueueBuilder.addAliases(); } @@ -57,7 +54,7 @@ public class InternalDumpMain { * @throws FileNotFoundException if the provided file path is invalid */ public static void main(String[] args) throws FileNotFoundException { - dump(args[0]); // Dump the contents of the provided path + dump(args[0]); } /** @@ -69,10 +66,9 @@ public static void main(String[] args) throws FileNotFoundException { */ public static void dump(@NotNull String path) throws FileNotFoundException { File path2 = new File(path); - // PrintStream to the specified file, or stdout if no file specified PrintStream out = FILE == null ? System.out : new PrintStream(FILE); long upperLimit = Long.MAX_VALUE; - dump(path2, out, upperLimit); // Dump the file with an upper limit + dump(path2, out, upperLimit); } /** @@ -96,14 +92,14 @@ public static void dump(@NotNull File path, @NotNull PrintStream out, long upper System.exit(1); } - Arrays.sort(files); // Sort the files by name + Arrays.sort(files); for (File file : files) { out.println("## " + file); - dumpFile(file, out, upperLimit); // Dump each file + dumpFile(file, out, upperLimit); } } else if (path.getName().endsWith(SingleChronicleQueue.SUFFIX) || path.getName().endsWith(SingleTableStore.SUFFIX)) { - dumpFile(path, out, upperLimit); // Dump a single file + dumpFile(path, out, upperLimit); } } @@ -116,14 +112,14 @@ public static void dump(@NotNull File path, @NotNull PrintStream out, long upper * @param upperLimit Maximum number of bytes to dump */ private static void dumpFile(@NotNull File file, @NotNull PrintStream out, long upperLimit) { - Bytes buffer = Bytes.elasticByteBuffer(); // Temporary buffer for reading file contents + Bytes buffer = Bytes.elasticByteBuffer(); try (MappedBytes bytes = MappedBytes.mappedBytes(file, 4 << 20, OS.pageSize(), !OS.isWindows())) { - bytes.readLimit(bytes.realCapacity()); // Set the read limit to the file's actual capacity - StringBuilder sb = new StringBuilder(); // StringBuilder to hold the dumped output - WireDumper dumper = WireDumper.of(bytes, !UNALIGNED); // Create a WireDumper for reading the file + bytes.readLimit(bytes.realCapacity()); + StringBuilder sb = new StringBuilder(); + WireDumper dumper = WireDumper.of(bytes, !UNALIGNED); while (bytes.readRemaining() >= 4) { - sb.setLength(0); // Clear the StringBuilder - boolean last = dumper.dumpOne(sb, buffer); // Dump one entry into the StringBuilder + sb.setLength(0); + boolean last = dumper.dumpOne(sb, buffer); if (sb.indexOf("\nindex2index:") != -1 || sb.indexOf("\nindex:") != -1) { // Truncate trailing zeros for readability if (sb.indexOf(", 0\n]\n") == sb.length() - 6) { @@ -134,9 +130,9 @@ private static void dumpFile(@NotNull File file, @NotNull PrintStream out, long } } - out.println(sb); // Print the dumped entry + out.println(sb); - if (last) // Stop if it was the last entry + if (last) break; if (bytes.readPosition() > upperLimit) { out.println("# limit reached."); @@ -144,9 +140,9 @@ private static void dumpFile(@NotNull File file, @NotNull PrintStream out, long } } } catch (IOException ioe) { - err.println("Failed to read " + file + " " + ioe); // Handle I/O exceptions + err.println("Failed to read " + file + " " + ioe); } finally { - buffer.releaseLast(); // Release the buffer + buffer.releaseLast(); } } @@ -161,7 +157,7 @@ private static int indexOfLastZero(@NotNull CharSequence str) { int i = str.length() - 3; do { i -= LENGTH; - CharSequence charSequence = str.subSequence(i, i + 3); // Check if the sequence is ", 0" + CharSequence charSequence = str.subSequence(i, i + 3); if (!", 0".contentEquals(charSequence)) return i + LENGTH; } while (i > 3); diff --git a/src/main/java/net/openhft/chronicle/queue/internal/main/InternalPingPongMain.java b/src/main/java/net/openhft/chronicle/queue/internal/main/InternalPingPongMain.java index bdb14c69cb..ecb5db7303 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/main/InternalPingPongMain.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/main/InternalPingPongMain.java @@ -40,20 +40,14 @@ * for both operations. The benchmark runs for a defined duration, and results are printed at the end. */ public final class InternalPingPongMain { - - // Runtime duration in seconds, default is 30 seconds if not specified via system properties - static int runtime = Integer.getInteger("runtime", 30); - // Base directory path for the queue files, defaults to system temporary directory if not set + static int runtime = Integer.getInteger("runtime", 30); // seconds static String basePath = System.getProperty("path", OS.TMP); - // Atomic variables to track write timestamps and counts static AtomicLong writeTime = new AtomicLong(); static AtomicInteger writeCount = new AtomicInteger(); static AtomicInteger readCount = new AtomicInteger(); - // Atomic flag to indicate whether the benchmark is still running static AtomicBoolean running = new AtomicBoolean(true); static { - // Enable JVM safepoint tracking for latency measurement accuracy System.setProperty("jvm.safepoint.enabled", "true"); } @@ -78,9 +72,8 @@ public static void main(String[] args) { */ static void pingPong(int size) { String path = InternalPingPongMain.basePath + "/test-q-" + Time.uniqueId(); - Histogram readDelay = new Histogram(); // Histogram for read delays - Histogram readDelay2 = new Histogram(); // Another histogram for additional read latency analysis - + Histogram readDelay = new Histogram(); + Histogram readDelay2 = new Histogram(); try (ChronicleQueue queue = createQueue(path)) { // Thread responsible for reading from the queue @@ -90,52 +83,48 @@ static void pingPong(int size) { // Wait until there's a message to read while (readCount.get() == writeCount.get()) ; - long wakeTime = System.nanoTime(); // Record the time we started reading + long wakeTime = System.nanoTime(); while (running.get()) { try (DocumentContext dc = tailer.readingDocument(true)) { if (!dc.isPresent()) - continue; // Skip if there's no document present + continue; } break; } // Measure the time between when the write happened and the read started final long delay = wakeTime - writeTime.get(); final long time = System.nanoTime() - wakeTime; - readDelay2.sample(time); // Record the time it took to read the message - readDelay.sample(delay); // Record the delay before the read started + readDelay2.sample(time); + readDelay.sample(delay); if (time + delay > 20_000) System.out.println("td " + delay + " + " + time); - - if (readCount.get() == 100000) { // Reset histograms after a certain number of reads + if (readCount.get() == 100000) { System.out.println("reset"); readDelay.reset(); readDelay2.reset(); } - readCount.incrementAndGet(); // Increment the read count + readCount.incrementAndGet(); } }); - reader.setDaemon(true); // Mark the reader thread as a daemon - reader.start(); // Start the reader thread - Jvm.pause(100); // Pause to allow the reader to start up + reader.setDaemon(true); + reader.start(); + Jvm.pause(100); // Calculate the finish time based on the runtime property final long finish = System.currentTimeMillis() + runtime * 1000L; - final ExcerptAppender appender = queue.createAppender(); // Create an appender to write to the queue - - // Write messages to the queue until the runtime limit is reached + final ExcerptAppender appender = queue.createAppender(); while (System.currentTimeMillis() < finish) { - if (readCount.get() < writeCount.get()) { // Wait for the reader to catch up if necessary + if (readCount.get() < writeCount.get()) { Thread.yield(); continue; } try (DocumentContext dc = appender.writingDocument(false)) { - dc.wire().bytes().writeSkip(size); // Write a message of the specified size + dc.wire().bytes().writeSkip(size); } - writeCount.incrementAndGet(); // Increment the write count - writeTime.set(System.nanoTime()); // Record the time the write occurred + writeCount.incrementAndGet(); + writeTime.set(System.nanoTime()); } - running.set(false); // Stop the benchmark - + running.set(false); } // Output the histograms for the read delays @@ -154,6 +143,6 @@ static void pingPong(int size) { */ @NotNull private static ChronicleQueue createQueue(String path) { - return ChronicleQueue.single(path); // Create a single ChronicleQueue instance at the given path + return ChronicleQueue.single(path); } } diff --git a/src/main/java/net/openhft/chronicle/queue/internal/main/InternalRemovableRollFileCandidatesMain.java b/src/main/java/net/openhft/chronicle/queue/internal/main/InternalRemovableRollFileCandidatesMain.java index 7d9062d2d6..462033050f 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/main/InternalRemovableRollFileCandidatesMain.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/main/InternalRemovableRollFileCandidatesMain.java @@ -28,25 +28,19 @@ * If no directory is provided as an argument, the current directory is used by default. */ public final class InternalRemovableRollFileCandidatesMain { - /** - * Produces a list of removable roll file candidates from the given directory - * and prints their absolute paths to standard output, row by row. + * Produces a list of removable roll file candidates and prints + * their absolute path to standard out row-by-row. * - * @param args The directory to search for roll file candidates. - * If no directory is provided, the current directory ("./") is used. + * @param args the directory. If no directory is given, "." is assumed */ public static void main(String[] args) { final File dir; if (args.length == 0) { - // Use the current directory if no argument is provided dir = new File("."); } else { - // Use the provided directory dir = new File(args[0]); } - - // Find removable roll file candidates and print their absolute paths FileUtil.removableRollFileCandidates(dir) .map(File::getAbsolutePath) .forEach(System.out::println); diff --git a/src/main/java/net/openhft/chronicle/queue/internal/main/InternalUnlockMain.java b/src/main/java/net/openhft/chronicle/queue/internal/main/InternalUnlockMain.java index c2b10e57de..2a4fe239f7 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/main/InternalUnlockMain.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/main/InternalUnlockMain.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package net.openhft.chronicle.queue.internal.main; import net.openhft.chronicle.queue.impl.TableStore; @@ -36,8 +35,6 @@ * The class requires a queue directory as input and operates on the queue metadata file located within that directory. */ public final class InternalUnlockMain { - - // Adds aliases for the SingleChronicleQueueBuilder to support configuration shortcuts static { SingleChronicleQueueBuilder.addAliases(); } @@ -60,31 +57,24 @@ public static void main(String[] args) { */ private static void unlock(@NotNull String dir) { File path = new File(dir); - - // Validate that the provided path is a directory if (!path.isDirectory()) { System.err.println("Path argument must be a queue directory"); System.exit(1); } - // Path to the metadata file File storeFilePath = new File(path, QUEUE_METADATA_FILE); - // Check if the metadata file exists if (!storeFilePath.exists()) { System.err.println("Metadata file not found, nothing to unlock"); System.exit(1); } - // Load the table store from the metadata file in read-write mode - final TableStore store = SingleTableBuilder.binary(storeFilePath, Metadata.NoMeta.INSTANCE) - .readOnly(false) - .build(); + final TableStore store = SingleTableBuilder.binary(storeFilePath, Metadata.NoMeta.INSTANCE).readOnly(false).build(); - // Force unlock the appender lock + // appender lock (new TableStoreWriteLock(store, BusyTimedPauser::new, 0L, TableStoreWriteLock.APPEND_LOCK_KEY)).forceUnlock(); - // Force unlock the main write lock + // write lock (new TableStoreWriteLock(store, BusyTimedPauser::new, 0L)).forceUnlock(); System.out.println("Done"); diff --git a/src/main/java/net/openhft/chronicle/queue/internal/reader/InternalDummyMethodReaderQueueEntryHandler.java b/src/main/java/net/openhft/chronicle/queue/internal/reader/InternalDummyMethodReaderQueueEntryHandler.java index 916f4da462..9263af1319 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/reader/InternalDummyMethodReaderQueueEntryHandler.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/reader/InternalDummyMethodReaderQueueEntryHandler.java @@ -37,8 +37,8 @@ * This implementation is particularly useful when you need to process queue entries as a text representation. */ public final class InternalDummyMethodReaderQueueEntryHandler implements QueueEntryHandler { - private final Bytes textConversionTarget = Bytes.allocateElasticOnHeap(); // Bytes object for storing the text representation - private final WireType wireType; // The wire type for converting binary wire entries + private final Bytes textConversionTarget = Bytes.allocateElasticOnHeap(); + private final WireType wireType; /** * Constructs an {@code InternalDummyMethodReaderQueueEntryHandler} with the specified {@link WireType}. @@ -46,7 +46,7 @@ public final class InternalDummyMethodReaderQueueEntryHandler implements QueueEn * @param wireType The wire type to be used for converting entries, must not be null */ public InternalDummyMethodReaderQueueEntryHandler(@NotNull WireType wireType) { - this.wireType = requireNonNull(wireType); // Ensures that the wire type is not null + this.wireType = requireNonNull(wireType); } /** @@ -80,6 +80,6 @@ public void accept(final WireIn wireIn, final Consumer messageHandler) { */ @Override public void close() { - textConversionTarget.releaseLast(); // Release the memory used by the buffer + textConversionTarget.releaseLast(); } } diff --git a/src/main/java/net/openhft/chronicle/queue/internal/reader/InternalMessageToTextQueueEntryHandler.java b/src/main/java/net/openhft/chronicle/queue/internal/reader/InternalMessageToTextQueueEntryHandler.java index 6a0cb45b96..b984cf13db 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/reader/InternalMessageToTextQueueEntryHandler.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/reader/InternalMessageToTextQueueEntryHandler.java @@ -37,8 +37,8 @@ * This handler can be used to transform queue entries into a human-readable format such as JSON or YAML. */ public final class InternalMessageToTextQueueEntryHandler implements QueueEntryHandler { - private final Bytes textConversionTarget = Bytes.allocateElasticOnHeap(); // Buffer for storing converted text - private final WireType wireType; // The wire type for text conversion + private final Bytes textConversionTarget = Bytes.allocateElasticOnHeap(); + private final WireType wireType; /** * Constructs an {@code InternalMessageToTextQueueEntryHandler} with the specified {@link WireType}. @@ -46,7 +46,7 @@ public final class InternalMessageToTextQueueEntryHandler implements QueueEntryH * @param wireType The wire type used for converting binary data, must not be null */ public InternalMessageToTextQueueEntryHandler(WireType wireType) { - this.wireType = requireNonNull(wireType); // Ensures the wire type is not null + this.wireType = requireNonNull(wireType); } /** @@ -71,9 +71,8 @@ private static boolean isBinaryFormat(final byte dataFormatIndicator) { */ @Override public void accept(final WireIn wireIn, final Consumer messageHandler) { - final Bytes serialisedMessage = wireIn.bytes(); // Retrieve the serialized message bytes - final byte dataFormatIndicator = serialisedMessage.readByte(serialisedMessage.readPosition()); // Check the format - + final Bytes serialisedMessage = wireIn.bytes(); + final byte dataFormatIndicator = serialisedMessage.readByte(serialisedMessage.readPosition()); String text; if (isBinaryFormat(dataFormatIndicator)) { @@ -87,7 +86,7 @@ public void accept(final WireIn wireIn, final Consumer messageHandler) { text = serialisedMessage.toString(); } - messageHandler.accept(text); // Pass the converted or raw text to the message handler + messageHandler.accept(text); } /** @@ -95,6 +94,6 @@ public void accept(final WireIn wireIn, final Consumer messageHandler) { */ @Override public void close() { - textConversionTarget.releaseLast(); // Release the memory used by the buffer + textConversionTarget.releaseLast(); } } diff --git a/src/main/java/net/openhft/chronicle/queue/internal/reader/MessageCountingMessageConsumer.java b/src/main/java/net/openhft/chronicle/queue/internal/reader/MessageCountingMessageConsumer.java index 7580851f66..9c3c8fd9e1 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/reader/MessageCountingMessageConsumer.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/reader/MessageCountingMessageConsumer.java @@ -27,9 +27,9 @@ * This class is useful for scenarios where processing should stop after a certain number of messages have been consumed. */ public final class MessageCountingMessageConsumer implements MessageConsumer { - private final long matchLimit; // The maximum number of messages to consume before stopping - private final MessageConsumer wrappedConsumer; // The underlying message consumer to which messages are passed - private long matches = 0; // Counter for how many messages have been consumed + private final long matchLimit; + private final MessageConsumer wrappedConsumer; + private long matches = 0; /** * Constructs a {@code MessageCountingMessageConsumer} with the specified match limit and wrapped consumer. @@ -65,6 +65,6 @@ public boolean consume(long index, String message) { * @return {@code true} if the number of consumed messages equals or exceeds the match limit, {@code false} otherwise */ public boolean matchLimitReached() { - return matchLimit > 0 && matches >= matchLimit; // Return true if match limit is reached + return matchLimit > 0 && matches >= matchLimit; } } diff --git a/src/main/java/net/openhft/chronicle/queue/internal/reader/PatternFilterMessageConsumer.java b/src/main/java/net/openhft/chronicle/queue/internal/reader/PatternFilterMessageConsumer.java index 9616d4885c..4c734ba859 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/reader/PatternFilterMessageConsumer.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/reader/PatternFilterMessageConsumer.java @@ -33,9 +33,9 @@ */ public final class PatternFilterMessageConsumer implements MessageConsumer { - private final List patterns; // List of patterns to match against - private final boolean shouldBePresent; // True if the patterns should match, false if they should not - private final MessageConsumer nextMessageConsumer; // The next consumer in the chain for filtered messages + private final List patterns; + private final boolean shouldBePresent; + private final MessageConsumer nextMessageConsumer; /** * Constructs a {@code PatternFilterMessageConsumer} with the specified patterns, matching condition, @@ -64,7 +64,7 @@ public boolean consume(long index, String message) { for (Pattern pattern : patterns) { // Check if the message matches the pattern, based on the shouldBePresent flag if (shouldBePresent != pattern.matcher(message).find()) { - return false; // If it doesn't meet the condition, filter out the message + return false; } } // Pass the message to the next consumer if all patterns matched (or didn't, depending on the flag) diff --git a/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/AbstractTailerPollingQueueEntryReader.java b/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/AbstractTailerPollingQueueEntryReader.java index 5eb19da7f2..695220fddb 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/AbstractTailerPollingQueueEntryReader.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/AbstractTailerPollingQueueEntryReader.java @@ -34,8 +34,8 @@ */ public abstract class AbstractTailerPollingQueueEntryReader implements QueueEntryReader { - private final ExcerptTailer tailer; // The ExcerptTailer used to read from the queue - private final Function pollMethod; // Function for polling queue entries + private final ExcerptTailer tailer; + private final Function pollMethod; /** * Constructs an {@code AbstractTailerPollingQueueEntryReader} with the given tailer and polling method. @@ -59,10 +59,10 @@ protected AbstractTailerPollingQueueEntryReader(ExcerptTailer tailer, Function consumer.consume(documentContext.index(), value)); } } diff --git a/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/MethodReaderQueueEntryReader.java b/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/MethodReaderQueueEntryReader.java index 228ea5f1f6..afab01f2b0 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/MethodReaderQueueEntryReader.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/MethodReaderQueueEntryReader.java @@ -36,10 +36,10 @@ */ public final class MethodReaderQueueEntryReader implements QueueEntryReader { - private final ExcerptTailer tailer; // The ExcerptTailer used to read from the queue - private final MessageConsumer messageConsumer; // The message consumer that processes the read messages - private final MethodReader methodReader; // The method reader that reads method calls from the queue - private final Bytes bytes; // A buffer for holding the serialized message + private final ExcerptTailer tailer; + private final MessageConsumer messageConsumer; + private final MethodReader methodReader; + private final Bytes bytes; /** * Constructs a {@code MethodReaderQueueEntryReader} with the provided tailer, message consumer, wire type, and method reader interface. @@ -55,15 +55,12 @@ public MethodReaderQueueEntryReader(ExcerptTailer tailer, MessageConsumer messag Class methodReaderInterface, boolean showMessageHistory) { this.tailer = tailer; this.messageConsumer = messageConsumer; - bytes = Bytes.elasticHeapByteBuffer(256); // Allocate a buffer for holding serialized data + bytes = Bytes.elasticHeapByteBuffer(256); Wire wire = wireType.apply(bytes); if (wire instanceof TextWire) - ((TextWire) wire).useTextDocuments(); // Use text documents if it's a TextWire - - // Build the MethodWriter from the provided method reader interface + ((TextWire) wire).useTextDocuments(); MethodWriterBuilder mwb = wire.methodWriterBuilder(methodReaderInterface); - if (showMessageHistory) { - // If message history is enabled, log message history details + if (showMessageHistory) mwb.updateInterceptor((methodName, t) -> { MessageHistory messageHistory = MessageHistory.get(); // this is an attempt to recognise that no MH was read and instead the method reader called reset(...) on it @@ -71,8 +68,6 @@ public MethodReaderQueueEntryReader(ExcerptTailer tailer, MessageConsumer messag bytes.append(messageHistory + System.lineSeparator()); return true; }); - } - // Initialize the method reader methodReader = tailer.methodReader(mwb.build()); } @@ -86,11 +81,10 @@ public MethodReaderQueueEntryReader(ExcerptTailer tailer, MessageConsumer messag @Override public boolean read() { if (!methodReader.readOne()) { - return false; // No method call to read + return false; } - // Consume the read message and pass it to the message consumer messageConsumer.consume(tailer.lastReadIndex(), bytes.toString()); - bytes.clear(); // Clear the buffer for the next message + bytes.clear(); return true; } } diff --git a/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/VanillaQueueEntryReader.java b/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/VanillaQueueEntryReader.java index 1451bcfa86..5c48b2a36a 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/VanillaQueueEntryReader.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/reader/queueentryreaders/VanillaQueueEntryReader.java @@ -36,10 +36,10 @@ */ public final class VanillaQueueEntryReader implements QueueEntryReader { - private final ExcerptTailer tailer; // The ExcerptTailer used to read entries from the queue - private final Function pollMethod; // A function for polling the tailer - private final QueueEntryHandler messageConverter; // Converts the queue entry into a consumable message - private final MessageConsumer messageConsumer; // The consumer that processes the converted messages + private final ExcerptTailer tailer; + private final Function pollMethod; + private final QueueEntryHandler messageConverter; + private final MessageConsumer messageConsumer; /** * Constructs a {@code VanillaQueueEntryReader} with the given tailer, polling method, message converter, and message consumer. @@ -68,10 +68,9 @@ public VanillaQueueEntryReader(@NotNull ExcerptTailer tailer, @NotNull Function< public boolean read() { try (DocumentContext dc = pollMethod.apply(tailer)) { if (!dc.isPresent()) { - return false; // No entry available to read + return false; } - // Convert the wire format into a consumable message and pass it to the consumer messageConverter.accept(dc.wire(), val -> messageConsumer.consume(dc.index(), val)); return true; } diff --git a/src/main/java/net/openhft/chronicle/queue/internal/util/InternalFileUtil.java b/src/main/java/net/openhft/chronicle/queue/internal/util/InternalFileUtil.java index afaa76cf9a..87861a2db6 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/util/InternalFileUtil.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/util/InternalFileUtil.java @@ -50,7 +50,6 @@ public final class InternalFileUtil { private static final Comparator EARLIEST_FIRST = comparing(File::getName); private InternalFileUtil() { - // Utility class, no instances. } /** diff --git a/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriter.java b/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriter.java index af43faca0f..7c87707886 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriter.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriter.java @@ -34,13 +34,13 @@ *

* It reads data from a list of files and writes the contents to the queue, optionally using a method writer * if an interface class is provided. - *

+ * */ public class ChronicleWriter { - private Path basePath; // The base path of the Chronicle Queue - private String methodName; // The method name used to write each message - private List files; // List of files to read from and write to the queue - private Class writeTo; // The interface class for method writing + private Path basePath; + private String methodName; + private List files; + private Class writeTo; /** * Executes the process of reading from files and writing their contents to the Chronicle Queue. @@ -52,12 +52,12 @@ public void execute() throws IOException { final ExcerptAppender appender = queue.createAppender()) { for (final String file : files) { - final Object payload = Marshallable.fromFile(Object.class, file); // Load the file into a payload object + final Object payload = Marshallable.fromFile(Object.class, file); try (final DocumentContext dc = appender.writingDocument()) { if (writeTo != null) - dc.wire().write(methodName).marshallable((WriteMarshallable) payload); // Use method writer if interface is provided + dc.wire().write(methodName).marshallable((WriteMarshallable) payload); else - dc.wire().write(methodName).object(payload); // Write as a generic object if no method writer + dc.wire().write(methodName).object(payload); } } } @@ -78,16 +78,16 @@ public ChronicleWriter withBasePath(final Path path) { * Sets the interface class to use for writing through method calls. *

* This method allows writing through a method writer by specifying the name of an interface class. - *

+ * * * @param interfaceName The fully qualified name of the interface class * @return This {@code ChronicleWriter} instance for method chaining */ public ChronicleWriter asMethodWriter(String interfaceName) { try { - this.writeTo = Class.forName(interfaceName); // Load the interface class + this.writeTo = Class.forName(interfaceName); } catch (ClassNotFoundException e) { - throw Jvm.rethrow(e); // Handle class loading error + throw Jvm.rethrow(e); } return this; } diff --git a/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriterMain.java b/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriterMain.java index 981599526b..578ccca05b 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriterMain.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriterMain.java @@ -51,7 +51,7 @@ public void run(@NotNull String[] args) throws Exception { /** * Parses the command-line arguments using Apache Commons CLI. - *

If there are issues with parsing or required arguments are missing, it prints help and exits the program.

+ *

If there are issues with parsing or required arguments are missing, it prints help and exits the program. * * @param args Command-line arguments * @param options The defined options for command-line parsing @@ -108,15 +108,15 @@ private void printHelpAndExit(final Options options, int status, String message) * @param commandLine The parsed command-line options */ private void configure(final ChronicleWriter writer, final CommandLine commandLine) { - writer.withBasePath(Paths.get(commandLine.getOptionValue('d'))); // Set the base path for the Chronicle Queue - writer.withMethodName(commandLine.getOptionValue('m')); // Set the method name for writing + writer.withBasePath(Paths.get(commandLine.getOptionValue('d'))); + writer.withMethodName(commandLine.getOptionValue('m')); if (commandLine.hasOption('i')) { final String r = commandLine.getOptionValue('i'); - writer.asMethodWriter(r.equals("null") ? null : r); // Set the interface for method writer if provided + writer.asMethodWriter(r.equals("null") ? null : r); } - writer.withFiles(commandLine.getArgList()); // Set the files to be written to the queue + writer.withFiles(commandLine.getArgList()); } /** @@ -128,9 +128,9 @@ private void configure(final ChronicleWriter writer, final CommandLine commandLi private Options options() { final Options options = new Options(); - addOption(options, "m", "method", true, "Method name", true); // Required method name - addOption(options, "d", "directory", true, "Directory containing chronicle queue to write to", true); // Required queue directory - addOption(options, "i", "interface", true, "Interface to write via", false); // Optional interface for method writer + addOption(options, "m", "method", true, "Method name", true); + addOption(options, "d", "directory", true, "Directory containing chronicle queue to write to", true); + addOption(options, "i", "interface", true, "Interface to write via", false); return options; } } diff --git a/src/main/java/net/openhft/chronicle/queue/main/DumpMain.java b/src/main/java/net/openhft/chronicle/queue/main/DumpMain.java index 01fc47f3ea..57d13dac3d 100644 --- a/src/main/java/net/openhft/chronicle/queue/main/DumpMain.java +++ b/src/main/java/net/openhft/chronicle/queue/main/DumpMain.java @@ -27,7 +27,7 @@ /** * DumpMain is an entry point for dumping the contents of a Chronicle Queue file. - *

This class uses several system properties to configure the dumping process:

+ *

This class uses several system properties to configure the dumping process: *

    *
  • file: Specifies the file to be dumped
  • *
  • skipTableStoreDump: Set to true to skip dumping the TableStore
  • @@ -60,7 +60,7 @@ public static void dump(@NotNull String path) throws FileNotFoundException { /** * Dumps the contents of a Chronicle Queue file to the specified {@link PrintStream}. - *

    This method provides more fine-grained control over the output, including setting an upper limit for the dump.

    + *

    This method provides more fine-grained control over the output, including setting an upper limit for the dump. * * @param path The Chronicle Queue file to be dumped * @param out The {@link PrintStream} to which the dump will be written diff --git a/src/main/java/net/openhft/chronicle/queue/main/HistoryMain.java b/src/main/java/net/openhft/chronicle/queue/main/HistoryMain.java index 7a7485561b..c180a42c3c 100644 --- a/src/main/java/net/openhft/chronicle/queue/main/HistoryMain.java +++ b/src/main/java/net/openhft/chronicle/queue/main/HistoryMain.java @@ -30,6 +30,9 @@ *

  • Latencies for each component that has processed a message
  • *
  • Latencies between each component that has processed a message
  • *
+ * + * @author Jerry Shea + * */ public final class HistoryMain { diff --git a/src/main/java/net/openhft/chronicle/queue/main/ReaderMain.java b/src/main/java/net/openhft/chronicle/queue/main/ReaderMain.java index 6eb4055c22..fb440c7738 100644 --- a/src/main/java/net/openhft/chronicle/queue/main/ReaderMain.java +++ b/src/main/java/net/openhft/chronicle/queue/main/ReaderMain.java @@ -23,7 +23,7 @@ /** * ReaderMain is an entry point for displaying records from a Chronicle Queue in text format. - *

This class delegates the reading and display of queue records to {@link ChronicleReaderMain}.

+ *

This class delegates the reading and display of queue records to {@link ChronicleReaderMain}. */ public final class ReaderMain { diff --git a/src/main/java/net/openhft/chronicle/queue/main/RemovableRollFileCandidatesMain.java b/src/main/java/net/openhft/chronicle/queue/main/RemovableRollFileCandidatesMain.java index 32536a8680..1a77695346 100644 --- a/src/main/java/net/openhft/chronicle/queue/main/RemovableRollFileCandidatesMain.java +++ b/src/main/java/net/openhft/chronicle/queue/main/RemovableRollFileCandidatesMain.java @@ -22,7 +22,7 @@ /** * RemovableRollFileCandidatesMain is an entry point for producing a list of removable roll file candidates from a given directory. - *

This utility prints the absolute path of each removable file to the standard output, one file per row.

+ *

This utility prints the absolute path of each removable file to the standard output, one file per row. */ public final class RemovableRollFileCandidatesMain { diff --git a/src/main/java/net/openhft/chronicle/queue/main/UnlockMain.java b/src/main/java/net/openhft/chronicle/queue/main/UnlockMain.java index 6b205ba4ee..35090539ca 100644 --- a/src/main/java/net/openhft/chronicle/queue/main/UnlockMain.java +++ b/src/main/java/net/openhft/chronicle/queue/main/UnlockMain.java @@ -15,14 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package net.openhft.chronicle.queue.main; import net.openhft.chronicle.queue.internal.main.InternalUnlockMain; /** * UnlockMain is an entry point for unlocking resources or files used by a Chronicle Queue. - *

This utility handles the unlocking of locked resources, such as files, that may be in use by the queue.

+ *

This utility handles the unlocking of locked resources, such as files, that may be in use by the queue. */ public final class UnlockMain { diff --git a/src/main/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReader.java b/src/main/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReader.java index 04c945b203..7cf55ae1d4 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReader.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/ChronicleHistoryReader.java @@ -51,25 +51,23 @@ public class ChronicleHistoryReader implements HistoryReader, Closeable { private static final int SUMMARY_OUTPUT_UNSET = -999; public static final String SEPARATOR = "_"; + protected Path basePath; + protected Consumer messageSink; + protected boolean progress = false; + protected TimeUnit timeUnit = TimeUnit.NANOSECONDS; + protected boolean histosByMethod = false; + protected Map histos = new LinkedHashMap<>(); + protected long ignore = 0; + protected long counter = 0; + protected long measurementWindowNanos = 0; + protected long firstTimeStampNanos = 0; + protected long lastWindowCount = 0; + protected int summaryOutputOffset = SUMMARY_OUTPUT_UNSET; + protected Long startIndex; + protected Supplier histoSupplier = () -> new Histogram(60, 4); + protected int lastHistosSize = 0; + protected ExcerptTailer tailer; - protected Path basePath; // The base directory path for the Chronicle Queue - protected Consumer messageSink; // Sink for processed messages - protected boolean progress = false; // Flag indicating whether to display progress - protected TimeUnit timeUnit = TimeUnit.NANOSECONDS; // Time unit for measurements - protected boolean histosByMethod = false; // Flag for method-specific histograms - protected Map histos = new LinkedHashMap<>(); // Histograms for timing measurements - protected long ignore = 0; // Number of initial messages to ignore - protected long counter = 0; // Counter for processed messages - protected long measurementWindowNanos = 0; // Time window for histogram measurements in nanoseconds - protected long firstTimeStampNanos = 0; // Timestamp of the first message in nanoseconds - protected long lastWindowCount = 0; // Number of messages in the last measurement window - protected int summaryOutputOffset = SUMMARY_OUTPUT_UNSET; // Offset for summary output - protected Long startIndex; // Starting index for reading messages - protected Supplier histoSupplier = () -> new Histogram(60, 4); // Supplier for creating histograms - protected int lastHistosSize = 0; // The size of histograms processed - protected ExcerptTailer tailer; // Tailer for reading from the Chronicle Queue - - // Static block to warn if resource tracing is enabled static { ToolsUtil.warnIfResourceTracing(); } diff --git a/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReader.java b/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReader.java index 6a66cb3ac8..ddb1e05865 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReader.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReader.java @@ -62,7 +62,6 @@ public class ChronicleReader implements Reader { private static final long UNSET_VALUE = Long.MIN_VALUE; - // Configuration fields for filtering, queue direction, and message processing private final List inclusionRegex = new ArrayList<>(); private final List exclusionRegex = new ArrayList<>(); private final Pauser pauser = Pauser.millis(1, 100); @@ -88,12 +87,10 @@ public class ChronicleReader implements Reader { private String limiterArg; private String tailerId = null; - // Warn if resource tracing is enabled at initialization static { ToolsUtil.warnIfResourceTracing(); } - // Checks if a given configuration value has been set private static boolean isSet(final long configValue) { return configValue != UNSET_VALUE; } @@ -110,7 +107,6 @@ public void execute() { boolean isFirstIteration = true; boolean retryLastOperation; boolean queueHasBeenModified; - do { try (final ChronicleQueue queue = createQueue(); final ExcerptTailer tailer = queue.createTailer(tailerId); @@ -192,7 +188,7 @@ private void readWhileNotInterrupted(ExcerptTailer tailer, MessageCountingMessag /** * Validates the arguments for the {@link ChronicleReader}. - *

Throws an {@link IllegalArgumentException} if a named tailer is used with a read-only queue.

+ *

Throws an {@link IllegalArgumentException} if a named tailer is used with a read-only queue. */ private void validateArgs() { if (tailerId != null && readOnly) @@ -201,11 +197,12 @@ private void validateArgs() { /** * Configures the content-based limiter if specified. - *

This method ensures that the content-based limiter is properly initialized before queue processing.

+ *

This method ensures that the content-based limiter is properly initialized before queue processing. */ private void configureContentBasedLimiter() { - if (contentBasedLimiter != null) - contentBasedLimiter.configure(this); // Configure limiter with the current reader instance + if (contentBasedLimiter != null) { + contentBasedLimiter.configure(this); + } } /** @@ -216,22 +213,22 @@ private void configureContentBasedLimiter() { */ private boolean shouldHaltReadingDueToContentBasedLimit(ExcerptTailer tailer) { if (contentBasedLimiter == null) { - return false; // No limiter configured, so no need to halt + return false; } - long originalIndex = tailer.index(); // Store current tailer index + long originalIndex = tailer.index(); try (final DocumentContext documentContext = tailer.readingDocument()) { if (documentContext.isPresent()) { - return contentBasedLimiter.shouldHaltReading(documentContext); // Delegate to limiter + return contentBasedLimiter.shouldHaltReading(documentContext); } return false; } finally { - tailer.moveToIndex(originalIndex); // Reset tailer index after reading + tailer.moveToIndex(originalIndex); } } /** * Creates a {@link QueueEntryReader} for processing entries in the queue. - *

This method chooses between a vanilla reader, a plugin-based reader, or a method reader based on the configuration.

+ *

This method chooses between a vanilla reader, a plugin-based reader, or a method reader based on the configuration. * * @param tailer The {@link ExcerptTailer} for reading queue entries * @param messageConsumer The {@link MessageConsumer} for processing queue entries @@ -255,11 +252,13 @@ private QueueEntryReader createQueueEntryReader(ExcerptTailer tailer, MessageCon * @return The head of the chain of message consumers */ private MessageConsumer createMessageConsumers() { - MessageConsumer tail = this::writeToSink; // Initialize consumer that writes to the message sink - if (!exclusionRegex.isEmpty()) - tail = new PatternFilterMessageConsumer(exclusionRegex, false, tail); // Add exclusion filters - if (!inclusionRegex.isEmpty()) - tail = new PatternFilterMessageConsumer(inclusionRegex, true, tail); // Add inclusion filters + MessageConsumer tail = this::writeToSink; + if (!exclusionRegex.isEmpty()) { + tail = new PatternFilterMessageConsumer(exclusionRegex, false, tail); + } + if (!inclusionRegex.isEmpty()) { + tail = new PatternFilterMessageConsumer(inclusionRegex, true, tail); + } return tail; } @@ -272,9 +271,9 @@ private MessageConsumer createMessageConsumers() { */ private boolean writeToSink(long index, String text) { if (displayIndex) - messageSink.accept("0x" + Long.toHexString(index) + ": "); // Display entry index + messageSink.accept("0x" + Long.toHexString(index) + ": "); if (!text.isEmpty()) - messageSink.accept(text); // Display entry text + messageSink.accept(text); return true; } @@ -398,18 +397,18 @@ public ChronicleReader historyRecords(final long maxHistoryRecords) { /** * Sets the method reader interface for reading queue entries. - *

If the provided interface name is empty, it uses a dummy handler; otherwise, it loads the class specified by the methodReaderInterface parameter.

+ *

If the provided interface name is empty, it uses a dummy handler; otherwise, it loads the class specified by the methodReaderInterface parameter. * * @param methodReaderInterface The fully qualified class name of the method reader interface * @return The current instance of {@link ChronicleReader} */ public ChronicleReader asMethodReader(@NotNull String methodReaderInterface) { if (methodReaderInterface.isEmpty()) { - entryHandlerFactory = () -> new InternalDummyMethodReaderQueueEntryHandler(wireType); // Use dummy handler if no interface is specified + entryHandlerFactory = () -> new InternalDummyMethodReaderQueueEntryHandler(wireType); } else try { - this.methodReaderInterface = Class.forName(methodReaderInterface); // Dynamically load the class for the interface + this.methodReaderInterface = Class.forName(methodReaderInterface); } catch (ClassNotFoundException e) { - throw Jvm.rethrow(e); // Handle class loading errors + throw Jvm.rethrow(e); } return this; } @@ -428,7 +427,7 @@ public ChronicleReader showMessageHistory(boolean showMessageHistory) { /** * Configures a binary search comparator for the reader. - *

This method dynamically loads a binary search class and allows it to configure itself by passing the current {@link ChronicleReader} instance.

+ *

This method dynamically loads a binary search class and allows it to configure itself by passing the current {@link ChronicleReader} instance. * * @param binarySearchClass The fully qualified class name of the binary search comparator * @return The current instance of {@link ChronicleReader} @@ -436,11 +435,12 @@ public ChronicleReader showMessageHistory(boolean showMessageHistory) { @Override public ChronicleReader withBinarySearch(@NotNull String binarySearchClass) { try { - Class clazz = Class.forName(binarySearchClass); // Dynamically load the binary search class - this.binarySearch = (BinarySearchComparator) clazz.getDeclaredConstructor().newInstance(); // Create an instance of the comparator - this.binarySearch.accept(this); // Allow the comparator to configure itself with this reader + Class clazz = Class.forName(binarySearchClass); + this.binarySearch = (BinarySearchComparator) clazz.getDeclaredConstructor().newInstance(); + // allow binary search to configure itself + this.binarySearch.accept(this); } catch (Exception e) { - throw Jvm.rethrow(e); // Handle any exception during class loading or instantiation + throw Jvm.rethrow(e); } return this; } @@ -572,8 +572,8 @@ public ChronicleReader withTailerId(String tailerId) { * @return {@code true} if the queue has been modified, {@code false} otherwise */ private boolean queueHasBeenModifiedSinceLastCheck(final long lastObservedTailIndex, ExcerptTailer tailer) { - long currentTailIndex = indexOfEnd(tailer); // Get the current end index of the queue - return currentTailIndex > lastObservedTailIndex; // Check if the queue has been modified + long currentTailIndex = indexOfEnd(tailer); + return currentTailIndex > lastObservedTailIndex; } /** @@ -585,20 +585,21 @@ private boolean queueHasBeenModifiedSinceLastCheck(final long lastObservedTailIn */ private void moveToSpecifiedPosition(final ChronicleQueue ic, final ExcerptTailer tailer, final boolean isFirstIteration) { if (isFirstIteration) { - // Set the reading direction (forward or backward) + + // Set the direction, if we're reading backwards, start at the end by default tailer.direction(tailerDirection); if (tailerDirection == BACKWARD) { - tailer.toEnd(); // Move to the end if reading in reverse order + tailer.toEnd(); } if (isSet(startIndex)) { - tryMoveToIndex(ic, tailer); // Move to the specified start index + tryMoveToIndex(ic, tailer); } else if (binarySearch != null) { - seekBinarySearch(tailer); // Use binary search to find the starting point + seekBinarySearch(tailer); } if (tailerDirection == FORWARD) { - moveTailerToEnd(tailer); // Move the tailer to the end if reading forward + moveTailerToEnd(tailer); } } } @@ -606,16 +607,16 @@ private void moveToSpecifiedPosition(final ChronicleQueue ic, final ExcerptTaile /** * Moves the {@link ExcerptTailer} to the end of the queue. *

If {@code maxHistoryRecords} is set, it moves the tailer to a specific number of entries from the end. - * Otherwise, if tailing is enabled, it simply moves the tailer to the end.

+ * Otherwise, if tailing is enabled, it simply moves the tailer to the end. * * @param tailer The {@link ExcerptTailer} to move */ private void moveTailerToEnd(ExcerptTailer tailer) { if (isSet(maxHistoryRecords)) { - tailer.toEnd(); // Move to the end of the queue - moveToIndexNFromTheEnd(tailer, maxHistoryRecords); // Move to a specific number of entries from the end + tailer.toEnd(); + moveToIndexNFromTheEnd(tailer, maxHistoryRecords); } else if (tailInputSource) { - tailer.toEnd(); // Move to the end if tailing input source + tailer.toEnd(); } } @@ -639,10 +640,10 @@ private void tryMoveToIndex(ChronicleQueue ic, ExcerptTailer tailer) { boolean firstTime = true; while (!tailer.moveToIndex(startIndex)) { if (firstTime) { - messageSink.accept("Waiting for startIndex " + Long.toHexString(startIndex)); // Notify if waiting + messageSink.accept("Waiting for startIndex " + Long.toHexString(startIndex)); firstTime = false; } - Jvm.pause(100); // Pause before retrying + Jvm.pause(100); } } @@ -652,19 +653,18 @@ private void tryMoveToIndex(ChronicleQueue ic, ExcerptTailer tailer) { * @param tailer The {@link ExcerptTailer} to move */ private void seekBinarySearch(ExcerptTailer tailer) { - TailerDirection originalDirection = tailer.direction(); // Store the original direction + TailerDirection originalDirection = tailer.direction(); tailer.direction(FORWARD); - final Wire key = binarySearch.wireKey(); // Get the search key - long rv = BinarySearch.search(tailer, key, binarySearch); // Perform binary search - + final Wire key = binarySearch.wireKey(); + long rv = BinarySearch.search(tailer, key, binarySearch); if (rv == -1) { - tailer.toStart(); // Move to the start if no match found + tailer.toStart(); } else if (rv < 0) { - scanToFirstEntryFollowingMatch(tailer, key, -rv); // Find the first entry following the match + scanToFirstEntryFollowingMatch(tailer, key, -rv); } else { - scanToFirstMatchingEntry(tailer, key, rv); // Find the first matching entry + scanToFirstMatchingEntry(tailer, key, rv); } - tailer.direction(originalDirection); // Restore original direction + tailer.direction(originalDirection); } /** @@ -676,16 +676,15 @@ private void seekBinarySearch(ExcerptTailer tailer) { */ private void scanToFirstMatchingEntry(ExcerptTailer tailer, Wire key, long matchingIndex) { long indexToMoveTo = matchingIndex; - tailer.direction(tailerDirection == FORWARD ? BACKWARD : FORWARD); // Switch direction to find the first match + tailer.direction(tailerDirection == FORWARD ? BACKWARD : FORWARD); tailer.moveToIndex(indexToMoveTo); - while (true) { try (DocumentContext dc = tailer.readingDocument()) { if (!dc.isPresent()) break; try { if (binarySearch.compare(dc.wire(), key) == 0) - indexToMoveTo = dc.index(); // Keep moving to the first matching index + indexToMoveTo = dc.index(); else break; } catch (NotComparableException e) { @@ -693,7 +692,7 @@ private void scanToFirstMatchingEntry(ExcerptTailer tailer, Wire key, long match } } } - tailer.moveToIndex(indexToMoveTo); // Move to the first matching index + tailer.moveToIndex(indexToMoveTo); } /** @@ -706,9 +705,8 @@ private void scanToFirstMatchingEntry(ExcerptTailer tailer, Wire key, long match */ private void scanToFirstEntryFollowingMatch(ExcerptTailer tailer, Wire key, long indexAdjacentMatch) { long indexToMoveTo = -1; - tailer.direction(tailerDirection); // Set the tailer direction + tailer.direction(tailerDirection); tailer.moveToIndex(indexAdjacentMatch); - while (true) { try (DocumentContext dc = tailer.readingDocument()) { if (!dc.isPresent()) @@ -716,7 +714,7 @@ private void scanToFirstEntryFollowingMatch(ExcerptTailer tailer, Wire key, long try { if ((tailer.direction() == TailerDirection.FORWARD && binarySearch.compare(dc.wire(), key) >= 0) || (tailer.direction() == BACKWARD && binarySearch.compare(dc.wire(), key) <= 0)) { - indexToMoveTo = dc.index(); // Move to the entry following the match + indexToMoveTo = dc.index(); break; } } catch (NotComparableException e) { @@ -725,7 +723,7 @@ private void scanToFirstEntryFollowingMatch(ExcerptTailer tailer, Wire key, long } } if (indexToMoveTo >= 0) { - tailer.moveToIndex(indexToMoveTo); // Move to the found index + tailer.moveToIndex(indexToMoveTo); } } @@ -736,7 +734,7 @@ private void scanToFirstEntryFollowingMatch(ExcerptTailer tailer, Wire key, long * @param numberOfEntriesFromEnd The number of entries from the end to move to */ private void moveToIndexNFromTheEnd(ExcerptTailer tailer, long numberOfEntriesFromEnd) { - tailer.direction(TailerDirection.BACKWARD).toEnd(); // Start from the end and move backwards + tailer.direction(TailerDirection.BACKWARD).toEnd(); for (int i = 0; i < numberOfEntriesFromEnd - 1; i++) { try (final DocumentContext documentContext = tailer.readingDocument()) { if (!documentContext.isPresent()) { @@ -744,7 +742,7 @@ private void moveToIndexNFromTheEnd(ExcerptTailer tailer, long numberOfEntriesFr } } } - tailer.direction(FORWARD); // Reset to forward direction + tailer.direction(FORWARD); } /** @@ -754,7 +752,7 @@ private void moveToIndexNFromTheEnd(ExcerptTailer tailer, long numberOfEntriesFr * @return The index of the last entry */ private long indexOfEnd(ExcerptTailer excerptTailer) { - return excerptTailer.toEnd().index(); // Get the end index of the queue + return excerptTailer.toEnd().index(); } /** @@ -770,8 +768,8 @@ private ChronicleQueue createQueue() { } return SingleChronicleQueueBuilder .binary(basePath.toFile()) - .readOnly(readOnly) // Configure read-only mode if applicable - .storeFileListener(NO_OP) // Set a no-op store file listener + .readOnly(readOnly) + .storeFileListener(NO_OP) .build(); } @@ -779,6 +777,6 @@ private ChronicleQueue createQueue() { * Stops the reader, halting any further processing of the queue. */ public void stop() { - running = false; // Set running to false to stop processing + running = false; } } diff --git a/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReaderPlugin.java b/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReaderPlugin.java index 5e8de34a40..ccea23028e 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReaderPlugin.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/ChronicleReaderPlugin.java @@ -26,14 +26,14 @@ * Plugin interface for handling documents read from the queue in {@code ChronicleReader}. *

This interface allows for custom handling of the documents, which can be particularly useful when working with non-textual * queues, such as those written in binary format. Implementing this plugin provides a way to process the raw {@link DocumentContext} - * from the queue.

+ * from the queue. */ public interface ChronicleReaderPlugin { /** * Handle the document from the queue that is read in {@code ChronicleReader}. *

Implement this method to define how a document should be processed when read from the queue. - * This method provides access to the raw {@link DocumentContext}.

+ * This method provides access to the raw {@link DocumentContext}. * * @param dc The document context representing the queue entry */ @@ -42,12 +42,12 @@ public interface ChronicleReaderPlugin { /** * Handle the document and optionally pass it back to the {@code ChronicleReader} as a text representation. *

This method allows for additional processing of the document and the ability to convert it to a string form using the - * provided {@link Consumer}. This is useful when inclusion filters or other processing steps need to be applied.

+ * provided {@link Consumer}. This is useful when inclusion filters or other processing steps need to be applied. * * @param dc The document context representing the queue entry * @param messageConsumer A consumer used to pass the text representation back to the {@code ChronicleReader} */ default void onReadDocument(DocumentContext dc, Consumer messageConsumer) { - onReadDocument(dc); // Default behavior calls the standard document handling method + onReadDocument(dc); } } diff --git a/src/main/java/net/openhft/chronicle/queue/reader/ContentBasedLimiter.java b/src/main/java/net/openhft/chronicle/queue/reader/ContentBasedLimiter.java index e2a5ff54a5..a5e4aeadac 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/ContentBasedLimiter.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/ContentBasedLimiter.java @@ -22,13 +22,13 @@ /** * Interface for signaling when to halt reading from a queue based on the content of a message. - *

This can be used to limit processing within the {@link ChronicleReader} based on specific conditions found in the messages.

+ *

This can be used to limit processing within the {@link ChronicleReader} based on specific conditions found in the messages. */ public interface ContentBasedLimiter { /** * Determines whether the {@link ChronicleReader} should stop processing further messages. - *

This method examines the content of the next message and decides if reading should halt before processing it.

+ *

This method examines the content of the next message and decides if reading should halt before processing it. * * @param documentContext The document context representing the next message to be processed * @return {@code true} to halt processing, {@code false} to continue processing the message @@ -37,7 +37,8 @@ public interface ContentBasedLimiter { /** * Configures the limiter with parameters before the reader begins processing. - *

This method allows the limiter to be customized using arguments provided via {@link Reader#limiterArg()}.

+ *

+ * This method allows the limiter to be customized using arguments provided via {@link Reader#limiterArg()}. * * @param reader The reader that is about to be executed, providing context and parameters for the limiter */ diff --git a/src/main/java/net/openhft/chronicle/queue/reader/HistoryReader.java b/src/main/java/net/openhft/chronicle/queue/reader/HistoryReader.java index aeb0106aae..f8c323639b 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/HistoryReader.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/HistoryReader.java @@ -33,7 +33,7 @@ * and collect latency histograms from the queue entries over time. *

This interface provides methods for configuring the reader, managing message sinks, * and accumulating histograms for performance analysis. It supports flexible options such as - * setting the base path, start index, measurement windows, and other performance metrics.

+ * setting the base path, start index, measurement windows, and other performance metrics. */ public interface HistoryReader { @@ -136,10 +136,11 @@ public interface HistoryReader { void outputData(); /** - * Creates and returns a new history reader that will use the queue located at the path - * provided later via {@link #withBasePath}. + * Creates and returns a new history reader that will use + * the queue located at {@link #withBasePath } provided later. * - * @return A new instance of {@link HistoryReader} + * @return a new history reader that will use + * the queue located at {@link #withBasePath } provided later */ static HistoryReader create() { return new ChronicleHistoryReader(); diff --git a/src/main/java/net/openhft/chronicle/queue/reader/MessageConsumer.java b/src/main/java/net/openhft/chronicle/queue/reader/MessageConsumer.java index 423052173d..cd0c60dc8e 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/MessageConsumer.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/MessageConsumer.java @@ -19,9 +19,9 @@ package net.openhft.chronicle.queue.reader; /** - * Represents a message consumer in a chain-of-responsibility pattern. - *

Message consumers can filter or transform input messages as they are passed through the chain. - * The final destination of the message is the sink, which is the last consumer in the pipeline.

+ * Message consumers make a chain-of-responsibility pattern, they + * can filter or transform the input. The sink will be the final + * consumer in the pipeline. */ public interface MessageConsumer { diff --git a/src/main/java/net/openhft/chronicle/queue/reader/QueueEntryHandler.java b/src/main/java/net/openhft/chronicle/queue/reader/QueueEntryHandler.java index 04b1bd3141..d8947a0d7f 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/QueueEntryHandler.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/QueueEntryHandler.java @@ -29,7 +29,7 @@ /** * Handles the processing of queue entries, converting them to text or other forms for consumption. *

Implements the {@link BiConsumer} interface to consume a {@link WireIn} object, which represents the serialized data, - * and a {@link Consumer} that processes the resulting string.

+ * and a {@link Consumer} that processes the resulting string. */ public interface QueueEntryHandler extends BiConsumer>, AutoCloseable { @@ -41,7 +41,7 @@ public interface QueueEntryHandler extends BiConsumer>, /** * Creates a {@link QueueEntryHandler} that converts messages to text based on the provided {@link WireType}. - *

This is useful when reading queues written in different formats such as binary, JSON, or text.

+ *

This is useful when reading queues written in different formats such as binary, JSON, or text. * * @param wireType The {@link WireType} used to interpret the data * @return A {@link QueueEntryHandler} that converts messages to text diff --git a/src/main/java/net/openhft/chronicle/queue/reader/QueueEntryReader.java b/src/main/java/net/openhft/chronicle/queue/reader/QueueEntryReader.java index 0ea838753c..393c050a2a 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/QueueEntryReader.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/QueueEntryReader.java @@ -21,7 +21,7 @@ /** * Interface for reading and processing entries from a queue. *

Implementations of this interface are responsible for reading the next available entry - * from the queue and processing it as necessary.

+ * from the queue and processing it as necessary. */ public interface QueueEntryReader { diff --git a/src/main/java/net/openhft/chronicle/queue/reader/Reader.java b/src/main/java/net/openhft/chronicle/queue/reader/Reader.java index 0199da378f..132a34bd80 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/Reader.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/Reader.java @@ -26,9 +26,10 @@ /** * The Reader interface provides methods for reading messages from a Chronicle Queue. - *

It allows for extensive customization of the reading process through various configuration methods, + *

+ * It allows for extensive customization of the reading process through various configuration methods, * including setting the base path, inclusion/exclusion filters, content-based limiters, and method reader interfaces. - * A new Reader can be created using the {@link #create()} method.

+ * A new Reader can be created using the {@link #create()} method. */ public interface Reader { @@ -60,7 +61,7 @@ public interface Reader { /** * Adds an inclusion regex for filtering messages. - *

Messages that match the inclusion regex will be processed.

+ *

Messages that match the inclusion regex will be processed. * * @param regex The inclusion regex. * @return The current instance of {@link Reader} @@ -69,7 +70,7 @@ public interface Reader { /** * Adds an exclusion regex for filtering messages. - *

Messages that match the exclusion regex will be filtered out.

+ *

Messages that match the exclusion regex will be filtered out. * * @param regex The exclusion regex. * @return The current instance of {@link Reader} @@ -133,7 +134,7 @@ public interface Reader { /** * Sets the method reader interface for this Reader. - *

If the provided interface name is empty, a dummy method reader will be created.

+ *

If the provided interface name is empty, a dummy method reader will be created. * * @param methodReaderInterface The fully qualified class name of the method reader interface. * @return The current instance of {@link Reader} diff --git a/src/main/java/net/openhft/chronicle/queue/reader/comparator/BinarySearchComparator.java b/src/main/java/net/openhft/chronicle/queue/reader/comparator/BinarySearchComparator.java index cf6e643051..f185c4a8b3 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/comparator/BinarySearchComparator.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/comparator/BinarySearchComparator.java @@ -27,7 +27,7 @@ /** * Interface for implementing a comparator used in binary search operations within the {@link Reader}. *

This interface extends {@link Comparator} to compare {@link Wire} objects, and {@link Consumer} to allow the comparator - * to configure itself using a {@link Reader} instance.

+ * to configure itself using a {@link Reader} instance. */ public interface BinarySearchComparator extends Comparator, Consumer { diff --git a/src/main/java/net/openhft/chronicle/queue/rollcycles/LargeRollCycles.java b/src/main/java/net/openhft/chronicle/queue/rollcycles/LargeRollCycles.java index df47b6ba53..1ef4064688 100644 --- a/src/main/java/net/openhft/chronicle/queue/rollcycles/LargeRollCycles.java +++ b/src/main/java/net/openhft/chronicle/queue/rollcycles/LargeRollCycles.java @@ -23,24 +23,23 @@ /** * Enum representing large roll cycles, designed to minimize file rolls but resulting in very large files. *

These roll cycles are typically used in scenarios where fewer rollovers are preferred, but the file sizes - * can grow quite large and may exceed typical limits.

+ * can grow quite large and may exceed typical limits. */ public enum LargeRollCycles implements RollCycle { - /** - * Roll cycle allowing up to 0xffffffff entries per hour, indexing every 64th entry. + * 0xffffffff entries per hour, indexing every 64th entry */ LARGE_HOURLY(/*----*/"yyyyMMdd-HH'L'", 60 * 60 * 1000, 8 << 10, 64), /** - * Roll cycle allowing up to 0x1fffffffff entries per day, indexing every 128th entry. + * 0x1fffffffff entries per day, indexing every 128th entry */ LARGE_DAILY(/*-----*/"yyyyMMdd'L'", 24 * 60 * 60 * 1000, MAX_INDEX_COUNT, 128), /** - * Roll cycle allowing up to 0x3ffffffffff entries per day, indexing every 256th entry. + * 0x3ffffffffff entries per day, indexing every 256th entry */ XLARGE_DAILY(/*----*/"yyyyMMdd'X'", 24 * 60 * 60 * 1000, MAX_INDEX_COUNT, 256), /** - * Roll cycle allowing up to 0xffffffffffff entries per day, with sparse indexing (every 1024th entry). + * 0xffffffffffff entries per day with sparse indexing (every 1024th entry) */ HUGE_DAILY(/*------*/"yyyyMMdd'H'", 24 * 60 * 60 * 1000, MAX_INDEX_COUNT, 1024), ; @@ -84,7 +83,7 @@ public int lengthInMillis() { /** * Returns the default size of the index array. - *

Note: {@code indexCount^2} is the maximum number of index queue entries.

+ *

Note: {@code indexCount^2} is the maximum number of index queue entries. * * @return The default index count */ diff --git a/src/main/java/net/openhft/chronicle/queue/rollcycles/LegacyRollCycles.java b/src/main/java/net/openhft/chronicle/queue/rollcycles/LegacyRollCycles.java index 0c7189f38a..41d030d73e 100644 --- a/src/main/java/net/openhft/chronicle/queue/rollcycles/LegacyRollCycles.java +++ b/src/main/java/net/openhft/chronicle/queue/rollcycles/LegacyRollCycles.java @@ -23,22 +23,19 @@ /** * Enum representing legacy roll cycles, kept for historical reasons. *

These roll cycles were used in older versions of Chronicle Queue and retain their original - * configurations for backward compatibility.

+ * configurations for backward compatibility. */ public enum LegacyRollCycles implements RollCycle { - /** - * Roll cycle allowing up to 0x4000000 entries per minute, indexing every 16th entry. + * 0x4000000 entries per minute, indexing every 16th entry */ MINUTELY(/*--------*/"yyyyMMdd-HHmm", 60 * 1000, 2 << 10, 16), /** - * Roll cycle allowing up to 0x10000000 entries per hour, indexing every 16th entry. - *

Maintained as 4K index count and 16 index spacing for historical reasons.

+ * 0x10000000 entries per hour, indexing every 16th entry, leave as 4K and 16 for historical reasons. */ HOURLY(/*----------*/"yyyyMMdd-HH", 60 * 60 * 1000, 4 << 10, 16), /** - * Roll cycle allowing up to 0xffffffff entries per day, indexing every 64th entry. - *

Maintained as 8K index count and 64 index spacing for historical reasons.

+ * 0xffffffff entries per day, indexing every 64th entry, leave as 8K and 64 for historical reasons. */ DAILY(/*-----------*/"yyyyMMdd", 24 * 60 * 60 * 1000, 8 << 10, 64), ; @@ -82,7 +79,7 @@ public int lengthInMillis() { /** * Returns the default size of the index array. - *

Note: {@code indexCount^2} is the maximum number of index queue entries.

+ *

Note: {@code indexCount^2} is the maximum number of index queue entries. * * @return The default index count */ diff --git a/src/main/java/net/openhft/chronicle/queue/rollcycles/SparseRollCycles.java b/src/main/java/net/openhft/chronicle/queue/rollcycles/SparseRollCycles.java index b6a45a4b47..47bd66086c 100644 --- a/src/main/java/net/openhft/chronicle/queue/rollcycles/SparseRollCycles.java +++ b/src/main/java/net/openhft/chronicle/queue/rollcycles/SparseRollCycles.java @@ -23,27 +23,23 @@ /** * Enum representing sparse roll cycles, primarily used for testing and benchmarking purposes. *

These roll cycles are designed to minimize indexing, making them useful for scenarios where - * indexing is either unnecessary or should be kept minimal to reduce overhead.

+ * indexing is either unnecessary or should be kept minimal to reduce overhead. */ public enum SparseRollCycles implements RollCycle { - /** - * Roll cycle allowing up to 0x20000000 entries per day, indexing every 8th entry. + * 0x20000000 entries per day, indexing every 8th entry */ SMALL_DAILY(/*-----*/"yyyyMMdd'S'", 24 * 60 * 60 * 1000, 8 << 10, 8), - /** - * Roll cycle allowing up to 0x3ffffffff entries per hour, with sparse indexing (every 1024th entry). + * 0x3ffffffff entries per hour with sparse indexing (every 1024th entry) */ LARGE_HOURLY_SPARSE("yyyyMMdd-HH'LS'", 60 * 60 * 1000, 4 << 10, 1024), - /** - * Roll cycle allowing up to 0x3ffffffffff entries per hour, with super-sparse indexing (every (2^20)th entry). + * 0x3ffffffffff entries per hour with super-sparse indexing (every (2^20)th entry) */ LARGE_HOURLY_XSPARSE("yyyyMMdd-HH'LX'", 60 * 60 * 1000, 2 << 10, 1 << 20), - /** - * Roll cycle allowing up to 0xffffffffffff entries per day, with super-sparse indexing (every (2^20)th entry). + * 0xffffffffffff entries per day with super-sparse indexing (every (2^20)th entry) */ HUGE_DAILY_XSPARSE("yyyyMMdd'HX'", 24 * 60 * 60 * 1000, 16 << 10, 1 << 20), ; @@ -87,7 +83,7 @@ public int lengthInMillis() { /** * Returns the default size of the index array. - *

Note: {@code indexCount^2} is the maximum number of index queue entries.

+ *

Note: {@code indexCount^2} is the maximum number of index queue entries. * * @return The default index count */ diff --git a/src/main/java/net/openhft/chronicle/queue/rollcycles/TestRollCycles.java b/src/main/java/net/openhft/chronicle/queue/rollcycles/TestRollCycles.java index b919a44b11..e69ee3f476 100644 --- a/src/main/java/net/openhft/chronicle/queue/rollcycles/TestRollCycles.java +++ b/src/main/java/net/openhft/chronicle/queue/rollcycles/TestRollCycles.java @@ -24,42 +24,35 @@ * Enum representing various test roll cycles, designed to reduce the size of a queue dump * when performing small tests. *

These roll cycles are intended for testing purposes only and are not suited for production use - * due to their limited capacity and reduced indexing granularity.

+ * due to their limited capacity and reduced indexing granularity. */ public enum TestRollCycles implements RollCycle { - /** - * Roll cycle allowing up to 0xffffffff entries per second. Only suitable for testing. + * 0xffffffff entries - Only good for testing */ TEST_SECONDLY(/*---*/"yyyyMMdd-HHmmss'T'", 1000, MAX_INDEX_COUNT, 4), - /** - * Roll cycle allowing up to 0x1000 entries per second. Only suitable for testing. + * 0x1000 entries - Only good for testing */ TEST4_SECONDLY(/*---*/"yyyyMMdd-HHmmss'T4'", 1000, 32, 4), - /** - * Roll cycle allowing up to 0x400 entries per hour. Only suitable for testing. + * 0x400 entries per hour - Only good for testing */ TEST_HOURLY(/*-----*/"yyyyMMdd-HH'T'", 60 * 60 * 1000, 16, 4), - /** - * Roll cycle allowing up to 0x40 entries per day. Only suitable for testing. + * 0x40 entries per day - Only good for testing */ TEST_DAILY(/*------*/"yyyyMMdd'T1'", 24 * 60 * 60 * 1000, 8, 1), - /** - * Roll cycle allowing up to 0x200 entries per day. Only suitable for testing. + * 0x200 entries per day - Only good for testing */ TEST2_DAILY(/*-----*/"yyyyMMdd'T2'", 24 * 60 * 60 * 1000, 16, 2), - /** - * Roll cycle allowing up to 0x1000 entries per day. Only suitable for testing. + * 0x1000 entries per day - Only good for testing */ TEST4_DAILY(/*-----*/"yyyyMMdd'T4'", 24 * 60 * 60 * 1000, 32, 4), - /** - * Roll cycle allowing up to 0x20000 entries per day. Only suitable for testing. + * 0x20000 entries per day - Only good for testing */ TEST8_DAILY(/*-----*/"yyyyMMdd'T8'", 24 * 60 * 60 * 1000, 128, 8), ; @@ -103,7 +96,7 @@ public int lengthInMillis() { /** * Returns the default size of the index array. - *

Note: {@code indexCount^2} is the maximum number of index queue entries.

+ *

Note: {@code indexCount^2} is the maximum number of index queue entries. * * @return The default index count */ diff --git a/src/main/java/net/openhft/chronicle/queue/util/FileUtil.java b/src/main/java/net/openhft/chronicle/queue/util/FileUtil.java index a86223a8f6..0657e1ad75 100644 --- a/src/main/java/net/openhft/chronicle/queue/util/FileUtil.java +++ b/src/main/java/net/openhft/chronicle/queue/util/FileUtil.java @@ -30,70 +30,91 @@ * Utility methods for handling files in connection with ChronicleQueue. *

Provides functions for identifying removable files, checking open file states, and determining file suffixes. * - *

Note: This utility class is final and cannot be instantiated. + * @author Per Minborg + * @since 5.17.34 */ public final class FileUtil { - // Private constructor to prevent instantiation private FileUtil() {} /** - * Returns a Stream of roll queue files that are likely removable from the given {@code baseDir} - * without affecting any queue process currently active in the {@code baseDir} and reading data sequentially. + * Returns a Stream of roll Queue files that are likely removable + * from the given {@code baseDir} without affecting any Queue + * process that is currently active in the given {@code baseDir} reading + * data sequentially. *

- * Files are returned in order of creation and can be removed successively in that order. If removal of a particular - * file fails, subsequent files must be left untouched. - *

Warning: This method is inherently non-deterministic, as new queue processes may join or leave at any time asynchronously. - *

Only sequential reading is supported, as random access tailers can read at any location at any time. - *

Example of how unused files can be removed: + * Files are returned in order of creation and can successively be removed + * in that order. If the removal of a particular file fails, then subsequent + * files must be untouched. + *

+ * WARNING: This method is inherently un-deterministic as new Queue processes may + * join or leave at any time asynchronously. Thus, it is not recommended to store + * results produced by this method for longer periods. + *

+ * Only sequential reading is supported because random access Tailers can read at + * any location at any time. + *

+ * Here is an example of how unused files can be removed: + * *

{@code
-     * for (File file : removableFileCandidates(baseDir).collect(Collectors.toList())) {
-     *     if (!file.delete()) {
-     *         break;
+     *     for (File file : removableFileCandidates(baseDir).collect(Collectors.toList())) {
+     *         if (!file.delete()) {
+     *             break;
+     *         }
      *     }
-     * }
      * }
* - * @param baseDir The directory containing queue file removal candidates - * @return A Stream of roll queue files that are likely removable from the given {@code baseDir} without affecting any active queue process - * @throws UnsupportedOperationException If the operation is not supported on the current platform (e.g., Windows) + * @param baseDir containing queue file removal candidates + * @return a Stream of roll Queue files that are likely removable + * from the given {@code baseDir} without affecting any Queue + * process that is currently active in the given {@code baseDir} + * reading data sequentially + * @throws UnsupportedOperationException if this operation is not + * supported for the current platform (e.g. Windows). */ @NotNull public static Stream removableRollFileCandidates(@NotNull File baseDir) { - return InternalFileUtil.removableRollFileCandidates(baseDir); // Delegate to internal utility + return InternalFileUtil.removableRollFileCandidates(baseDir); } /** - * Returns all files currently opened by any process, along with the PID of the process holding the file open. - *

Note: This method is currently supported only on Linux operating systems. + * Returns all files currently opened by any process, including the PID of the process holding the file open. + *

+ * Method is only supported currently on Linux operating systems. * - * @return A {@link Map} of absolute paths to open files on the system, mapped to the PID of the process holding the file open - * @throws UnsupportedOperationException If the operation is not supported by the operating system - * @throws IOException If an error occurs while traversing filesystem metadata for open files + * @return a {@link Map} of the absolute paths to all the open files on the system, mapped to the PID holding the file open + * @throws UnsupportedOperationException if getAllOpenFiles is not supported by the operating system + * @throws IOException if an error occurs while traversing filesystem metadata for open files */ public static Map getAllOpenFiles() throws IOException { - return InternalFileUtil.getAllOpenFiles(); // Delegate to internal utility + return InternalFileUtil.getAllOpenFiles(); } /** - * Checks if the provided {@code file} has the Chronicle Queue file suffix (".cq4"). + * Returns if the provided {@code file} has the Chronicle Queue file + * suffix. The current file suffix is ".cq4". * - * @param file The file to check - * @return {@code true} if the file has the Chronicle Queue suffix, {@code false} otherwise + * @param file to check + * @return if the provided {@code file} has the ChronicleQueue file + * suffix */ public static boolean hasQueueSuffix(@NotNull File file) { - return InternalFileUtil.hasQueueSuffix(file); // Delegate to internal utility + return InternalFileUtil.hasQueueSuffix(file); } /** - * Determines whether the given {@code file} is being used by any process (i.e., opened for reading or writing). - *

If the open state of the given {@code file} cannot be determined, {@code true} is returned by default. + * Returns if the given {@code file } is used by any process (i.e. + * has the file open for reading or writing). + *

+ * If the open state of the given {@code file} can not be determined, {@code true } + * is returned. * - * @param file The file to check - * @return {@code true} if the file is open by any process, or if its state cannot be determined - * @throws UnsupportedOperationException If the operation is not supported on the current platform (e.g., Windows) + * @param file to check + * @return if the given {@code file } is used by any process + * @throws UnsupportedOperationException if this operation is not + * supported for the current platform (e.g. Windows). */ public static FileState state(@NotNull File file) { - return InternalFileUtil.state(file); // Delegate to internal utility + return InternalFileUtil.state(file); } } diff --git a/src/main/java/net/openhft/chronicle/queue/util/MicroTouched.java b/src/main/java/net/openhft/chronicle/queue/util/MicroTouched.java index ce442ba798..9029f40129 100644 --- a/src/main/java/net/openhft/chronicle/queue/util/MicroTouched.java +++ b/src/main/java/net/openhft/chronicle/queue/util/MicroTouched.java @@ -23,7 +23,6 @@ *

Provides methods to perform tiny operations either on the current thread or in a background thread to improve performance consistency. */ public interface MicroTouched { - /** * Performs a tiny operation to improve jitter in the current thread. *

This method should be called in contexts where reducing jitter or improving performance consistency is desired. diff --git a/src/main/java/net/openhft/chronicle/queue/util/PretouchUtil.java b/src/main/java/net/openhft/chronicle/queue/util/PretouchUtil.java index d134e43f0d..cc8c81a99a 100644 --- a/src/main/java/net/openhft/chronicle/queue/util/PretouchUtil.java +++ b/src/main/java/net/openhft/chronicle/queue/util/PretouchUtil.java @@ -89,7 +89,7 @@ private static class PretouchFactoryEmpty implements PretoucherFactory { */ @Override public EventHandler createEventHandler(@NotNull final SingleChronicleQueue queue) { - return () -> false; // No-op event handler + return () -> false; } /** @@ -100,7 +100,7 @@ public EventHandler createEventHandler(@NotNull final SingleChronicleQueue queue */ @Override public Pretoucher createPretoucher(@NotNull final SingleChronicleQueue queue) { - return queue.createPretoucher(); // Create a simple pretoucher + return queue.createPretoucher(); } } } diff --git a/src/main/java/net/openhft/chronicle/queue/util/PretoucherFactory.java b/src/main/java/net/openhft/chronicle/queue/util/PretoucherFactory.java index 5a4c41ec52..af402223b0 100644 --- a/src/main/java/net/openhft/chronicle/queue/util/PretoucherFactory.java +++ b/src/main/java/net/openhft/chronicle/queue/util/PretoucherFactory.java @@ -31,7 +31,6 @@ * {@link SingleChronicleQueue}. */ public interface PretoucherFactory { - /** * Creates an {@link EventHandler} for the specified {@link SingleChronicleQueue}. *

The event handler can be used to periodically pretouch or handle other events related to the queue. diff --git a/src/main/java/net/openhft/chronicle/queue/util/ToolsUtil.java b/src/main/java/net/openhft/chronicle/queue/util/ToolsUtil.java index 7a89e768c9..ac86164d5d 100644 --- a/src/main/java/net/openhft/chronicle/queue/util/ToolsUtil.java +++ b/src/main/java/net/openhft/chronicle/queue/util/ToolsUtil.java @@ -22,11 +22,10 @@ /** * Utility class for tools-related functions, such as resource tracing warnings. - *

This class is final and cannot be instantiated.

+ *

This class is final and cannot be instantiated. */ public final class ToolsUtil { - // Private constructor to prevent instantiation private ToolsUtil() { } @@ -37,8 +36,8 @@ private ToolsUtil() { * properly set up in certain tool environments (e.g., when running shell scripts like {@code queue_reader.sh}). */ public static void warnIfResourceTracing() { - // Print a warning to System.err if resource tracing is enabled + // System.err (*not* logger as slf4j may not be set up e.g. when running queue_reader.sh) if (Jvm.isResourceTracing()) - System.err.println("Resource tracing is turned on - this will eventually cause an OutOfMemoryError (OOME)"); + System.err.println("Resource tracing is turned on - this will eventually die with OOME"); } } diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/MetadataDeletionTests.java b/src/test/java/net/openhft/chronicle/queue/impl/single/MetadataDeletionTests.java index 7fee47c9e0..93a0e3c286 100644 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/MetadataDeletionTests.java +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/MetadataDeletionTests.java @@ -37,7 +37,7 @@ void singleCycleFile() { // Imagine that system has shut down, delete metadata boolean delete = new File(queuePath, "metadata.cq4t").delete(); - assertTrue(delete, "metadata file should be deleted"); + assertTrue(delete, "metadata file should be deleted"); // Verify it has really been deleted assertFalse(new File(queuePath, "metadata.cq4t").exists(), "metadata file should not exist"); From 39479692d4d5dba5cf8f7232f48f85434b0e15d5 Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Fri, 8 Nov 2024 14:55:37 +0000 Subject: [PATCH 12/12] Introduce a named constant --- .../openhft/chronicle/queue/ChronicleHistoryReaderMain.java | 4 +++- .../java/net/openhft/chronicle/queue/ChronicleReaderMain.java | 4 +++- .../chronicle/queue/internal/writer/ChronicleWriterMain.java | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java b/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java index 6b54316044..b3f4cf73c0 100644 --- a/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java +++ b/src/main/java/net/openhft/chronicle/queue/ChronicleHistoryReaderMain.java @@ -40,6 +40,8 @@ */ public class ChronicleHistoryReaderMain { + private static final int HELP_OUTPUT_LINE_WIDTH = 180; + /** * Entry point of the application. * Initializes the {@link ChronicleHistoryReaderMain} and passes command-line arguments. @@ -146,7 +148,7 @@ protected void printHelpAndExit(final Options options, int status, String messag final PrintWriter writer = new PrintWriter(System.out); new HelpFormatter().printHelp( writer, - 180, // Line width for formatting help output + HELP_OUTPUT_LINE_WIDTH, this.getClass().getSimpleName(), message, options, diff --git a/src/main/java/net/openhft/chronicle/queue/ChronicleReaderMain.java b/src/main/java/net/openhft/chronicle/queue/ChronicleReaderMain.java index c8432974af..756f74a6ad 100644 --- a/src/main/java/net/openhft/chronicle/queue/ChronicleReaderMain.java +++ b/src/main/java/net/openhft/chronicle/queue/ChronicleReaderMain.java @@ -40,6 +40,8 @@ */ public class ChronicleReaderMain { + private static final int HELP_OUTPUT_LINE_WIDTH = 180; + /** * Entry point of the application. Initializes the {@link ChronicleReaderMain} instance and * passes command-line arguments for execution. @@ -145,7 +147,7 @@ protected void printHelpAndExit(final Options options, int status, String messag final PrintWriter writer = new PrintWriter(System.out); new HelpFormatter().printHelp( writer, - 180, // Line width for formatted help output + HELP_OUTPUT_LINE_WIDTH, this.getClass().getSimpleName(), message, options, diff --git a/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriterMain.java b/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriterMain.java index 578ccca05b..5a4ca3f6b4 100644 --- a/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriterMain.java +++ b/src/main/java/net/openhft/chronicle/queue/internal/writer/ChronicleWriterMain.java @@ -32,6 +32,8 @@ */ public class ChronicleWriterMain { + private static final int HELP_OUTPUT_LINE_WIDTH = 180; + /** * Runs the ChronicleWriter based on the provided command-line arguments. * @@ -88,7 +90,7 @@ private void printHelpAndExit(final Options options, int status, String message) final PrintWriter writer = new PrintWriter(System.out); new HelpFormatter().printHelp( writer, - 180, + HELP_OUTPUT_LINE_WIDTH, this.getClass().getSimpleName() + " files..", message, options,