From 1253f10296e997f9f3d24057b9b5b54df2b7dabb Mon Sep 17 00:00:00 2001 From: Leif Walsh Date: Mon, 8 Dec 2014 21:14:50 -0500 Subject: [PATCH] incorporated com.codahale.metrics instead of hand-rolled reporting --- pom.xml | 48 +++++++ run.simple.bash | 18 +-- src/jmongosysbenchexecute.java | 244 +++++++++++++-------------------- src/jmongosysbenchload.java | 166 ++++------------------ 4 files changed, 181 insertions(+), 295 deletions(-) create mode 100644 pom.xml diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..a3721f9 --- /dev/null +++ b/pom.xml @@ -0,0 +1,48 @@ + + + 4.0.0 + + tokutek + sysbench-mongodb + 0.0.1-SNAPSHOT + MongoDB sysbench + + + target + target/classes + ${project.artifactId}-${project.version} + src + + + org.codehaus.mojo + exec-maven-plugin + 1.3.2 + + + + java + + + + + + + + + + io.dropwizard.metrics + metrics-core + 3.1.0 + + + org.slf4j + slf4j-log4j12 + 1.7.7 + + + org.mongodb + mongo-java-driver + 2.13.0-rc0 + + + diff --git a/run.simple.bash b/run.simple.bash index d7a644a..d84a662 100755 --- a/run.simple.bash +++ b/run.simple.bash @@ -20,22 +20,21 @@ else exit 1 fi -javac -cp $CLASSPATH:$PWD/src src/jmongosysbenchload.java -javac -cp $CLASSPATH:$PWD/src src/jmongosysbenchexecute.java - +mvn compile # load the data if [[ $DOLOAD = "yes" ]]; then echo Do load at $( date ) export LOG_NAME=mongoSysbenchLoad-${NUM_COLLECTIONS}-${NUM_DOCUMENTS_PER_COLLECTION}-${NUM_LOADER_THREADS}.txt - export BENCHMARK_TSV=${LOG_NAME}.tsv + export BENCHMARK_CSV_DIR=mongoSysbenchLoad-${NUM_COLLECTIONS}-${NUM_DOCUMENTS_PER_COLLECTION}-${NUM_LOADER_THREADS}-csv rm -f $LOG_NAME - rm -f $BENCHMARK_TSV + rm -rf $BENCHMARK_CSV_DIR + mkdir $BENCHMARK_CSV_DIR T="$(date +%s)" - java -cp $CLASSPATH:$PWD/src jmongosysbenchload $NUM_COLLECTIONS $DB_NAME $NUM_LOADER_THREADS $NUM_DOCUMENTS_PER_COLLECTION $NUM_DOCUMENTS_PER_INSERT $NUM_INSERTS_PER_FEEDBACK $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_TSV $MONGO_COMPRESSION $MONGO_BASEMENT $WRITE_CONCERN $MONGO_SERVER $MONGO_PORT $USERNAME $PASSWORD + mvn exec:java -Dexec.mainClass=jmongosysbenchload -Dexec.args="$NUM_COLLECTIONS $DB_NAME $NUM_LOADER_THREADS $NUM_DOCUMENTS_PER_COLLECTION $NUM_DOCUMENTS_PER_INSERT $NUM_INSERTS_PER_FEEDBACK $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_CSV_DIR $MONGO_COMPRESSION $MONGO_BASEMENT $WRITE_CONCERN $MONGO_SERVER $MONGO_PORT $USERNAME $PASSWORD" echo "" | tee -a $LOG_NAME T="$(($(date +%s)-T))" printf "`date` | sysbench loader duration = %02d:%02d:%02d:%02d\n" "$((T/86400))" "$((T/3600%24))" "$((T/60%60))" "$((T%60))" | tee -a $LOG_NAME @@ -47,13 +46,14 @@ fi if [[ $DOQUERY = "yes" ]]; then echo Do query at $( date ) export LOG_NAME=mongoSysbenchExecute-${NUM_COLLECTIONS}-${NUM_DOCUMENTS_PER_COLLECTION}-${NUM_WRITER_THREADS}.txt - export BENCHMARK_TSV=${LOG_NAME}.tsv + export BENCHMARK_CSV_DIR=mongoSysbenchExecute-${NUM_COLLECTIONS}-${NUM_DOCUMENTS_PER_COLLECTION}-${NUM_WRITER_THREADS}-csv rm -f $LOG_NAME - rm -f $BENCHMARK_TSV + rm -rf $BENCHMARK_CSV_DIR + mkdir $BENCHMARK_CSV_DIR T="$(date +%s)" - java -cp $CLASSPATH:$PWD/src jmongosysbenchexecute $NUM_COLLECTIONS $DB_NAME $NUM_WRITER_THREADS $NUM_DOCUMENTS_PER_COLLECTION $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_TSV $SYSBENCH_AUTO_COMMIT $RUN_TIME_SECONDS $SYSBENCH_RANGE_SIZE $SYSBENCH_POINT_SELECTS $SYSBENCH_SIMPLE_RANGES $SYSBENCH_SUM_RANGES $SYSBENCH_ORDER_RANGES $SYSBENCH_DISTINCT_RANGES $SYSBENCH_INDEX_UPDATES $SYSBENCH_NON_INDEX_UPDATES $SYSBENCH_INSERTS $WRITE_CONCERN $MAX_TPS $MONGO_SERVER $MONGO_PORT $SEED $USERNAME $PASSWORD | tee -a $LOG_NAME + mvn exec:java -Dexec.mainClass=jmongosysbenchexecute -Dexec.args="$NUM_COLLECTIONS $DB_NAME $NUM_WRITER_THREADS $NUM_DOCUMENTS_PER_COLLECTION $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_CSV_DIR $SYSBENCH_AUTO_COMMIT $RUN_TIME_SECONDS $SYSBENCH_RANGE_SIZE $SYSBENCH_POINT_SELECTS $SYSBENCH_SIMPLE_RANGES $SYSBENCH_SUM_RANGES $SYSBENCH_ORDER_RANGES $SYSBENCH_DISTINCT_RANGES $SYSBENCH_INDEX_UPDATES $SYSBENCH_NON_INDEX_UPDATES $SYSBENCH_INSERTS $WRITE_CONCERN $MAX_TPS $MONGO_SERVER $MONGO_PORT $SEED $USERNAME $PASSWORD" | tee -a $LOG_NAME echo "" | tee -a $LOG_NAME T="$(($(date +%s)-T))" printf "`date` | sysbench benchmark duration = %02d:%02d:%02d:%02d\n" "$((T/86400))" "$((T/3600%24))" "$((T/60%60))" "$((T%60))" | tee -a $LOG_NAME diff --git a/src/jmongosysbenchexecute.java b/src/jmongosysbenchexecute.java index bf35445..8516892 100644 --- a/src/jmongosysbenchexecute.java +++ b/src/jmongosysbenchexecute.java @@ -1,3 +1,8 @@ +import com.codahale.metrics.*; +import org.apache.log4j.BasicConfigurator; +import java.util.concurrent.TimeUnit; +import java.util.Locale; + //import com.mongodb.Mongo; import com.mongodb.MongoClient; import com.mongodb.MongoCredential; @@ -30,12 +35,14 @@ import java.util.concurrent.locks.ReentrantLock; public class jmongosysbenchexecute { - public static AtomicLong globalInserts = new AtomicLong(0); - public static AtomicLong globalDeletes = new AtomicLong(0); - public static AtomicLong globalUpdates = new AtomicLong(0); - public static AtomicLong globalPointQueries = new AtomicLong(0); - public static AtomicLong globalRangeQueries = new AtomicLong(0); - public static AtomicLong globalSysbenchTransactions = new AtomicLong(0); + static final MetricRegistry metrics = new MetricRegistry(); + private final Timer insertLatencies = metrics.timer(MetricRegistry.name("sysbench", "inserts")); + private final Timer deleteLatencies = metrics.timer(MetricRegistry.name("sysbench", "deletes")); + private final Timer updateLatencies = metrics.timer(MetricRegistry.name("sysbench", "updates")); + private final Timer pointQueryLatencies = metrics.timer(MetricRegistry.name("sysbench", "ptqueries")); + private final Timer rangeQueryLatencies = metrics.timer(MetricRegistry.name("sysbench", "rgqueries")); + private final Timer globalSysbenchTransactions = metrics.timer(MetricRegistry.name("sysbench", "tps")); + public static AtomicLong globalWriterThreads = new AtomicLong(0); public static Writer writer = null; @@ -78,6 +85,8 @@ public jmongosysbenchexecute() { } public static void main (String[] args) throws Exception { + BasicConfigurator.configure(); + if (args.length != 24) { logMe("*** ERROR : CONFIGURATION ISSUE ***"); logMe("jsysbenchexecute [number of collections] [database name] [number of writer threads] [documents per collection] [seconds feedback] "+ @@ -172,6 +181,19 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { DB db = m.getDB(dbName); + final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metrics) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + consoleReporter.start(10, TimeUnit.SECONDS); + + final CsvReporter csvReporter = CsvReporter.forRegistry(metrics) + .formatFor(Locale.US) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(new File(logFileName)); + csvReporter.start(1, TimeUnit.SECONDS); + // determine server type : mongo or tokumx DBObject checkServerCmd = new BasicDBObject(); CommandResult commandResult = db.command("buildInfo"); @@ -188,12 +210,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { logMe(" index technology = %s",indexTechnology); logMe("-------------------------------------------------------------------------------------------------"); - try { - writer = new BufferedWriter(new FileWriter(new File(logFileName))); - } catch (IOException e) { - e.printStackTrace(); - } - if ((!indexTechnology.toLowerCase().equals("tokumx")) && (!indexTechnology.toLowerCase().equals("mongo"))) { // unknown index technology, abort logMe(" *** Unknown Indexing Technology %s, shutting down",indexTechnology); @@ -213,24 +229,12 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { tWriterThreads[i].start(); } - Thread reporterThread = new Thread(t.new MyReporter()); - reporterThread.start(); - reporterThread.join(); - // wait for writer threads to terminate for (int i=0; i 0) - runEndMillis = t0 + (1000 * runSeconds); - - while ((System.currentTimeMillis() < runEndMillis) && (thisInserts < numMaxInserts)) - { - try { - Thread.sleep(100); - } catch (Exception e) { - e.printStackTrace(); - } - - long now = System.currentTimeMillis(); - - -// public static AtomicLong globalDeletes = new AtomicLong(0); -// public static AtomicLong globalUpdates = new AtomicLong(0); -// public static AtomicLong globalPointQueries = new AtomicLong(0); -// public static AtomicLong globalRangeQueries = new AtomicLong(0); - - - thisInserts = globalInserts.get(); - thisSysbenchTransactions = globalSysbenchTransactions.get(); - - if ((now > nextFeedbackMillis) && (secondsPerFeedback > 0)) - { - intervalNumber++; - nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1)); - - long elapsed = now - t0; - long thisIntervalMs = now - lastMs; - long thisWriterThreads = globalWriterThreads.get(); - - long thisIntervalSysbenchTransactions = thisSysbenchTransactions - lastSysbenchTransactions; - double thisIntervalSysbenchTransactionsPerSecond = thisIntervalSysbenchTransactions/(double)thisIntervalMs*1000.0; - double thisSysbenchTransactionsPerSecond = thisSysbenchTransactions/(double)elapsed*1000.0; - - long thisIntervalInserts = thisInserts - lastInserts; - double thisIntervalInsertsPerSecond = thisIntervalInserts/(double)thisIntervalMs*1000.0; - double thisInsertsPerSecond = thisInserts/(double)elapsed*1000.0; - - logMe("%,d seconds : cum tps=%,.2f : int tps=%,.2f : cum ips=%,.2f : int ips=%,.2f : writers=%,d", elapsed / 1000l, thisSysbenchTransactionsPerSecond, thisIntervalSysbenchTransactionsPerSecond, thisInsertsPerSecond, thisIntervalInsertsPerSecond, thisWriterThreads); - - try { - if (outputHeader) - { - writer.write("elap_secs\tcum_tps\tint_tps\tcum_ips\tint_ips\n"); - outputHeader = false; - } - - String statusUpdate = ""; - - statusUpdate = String.format("%d\t%.2f\t%.2f\t%.2f\t%.2f\n", elapsed / 1000l, thisSysbenchTransactionsPerSecond, thisIntervalSysbenchTransactionsPerSecond, thisInsertsPerSecond, thisIntervalInsertsPerSecond); - - writer.write(statusUpdate); - writer.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - - lastInserts = thisInserts; - lastSysbenchTransactions = thisSysbenchTransactions; - - lastMs = now; - } - } - - // shutdown all the writers - allDone = 1; - } - } - - public static void logMe(String format, Object... args) { System.out.println(Thread.currentThread() + String.format(format, args)); } diff --git a/src/jmongosysbenchload.java b/src/jmongosysbenchload.java index 420039e..fb8e46b 100644 --- a/src/jmongosysbenchload.java +++ b/src/jmongosysbenchload.java @@ -1,3 +1,8 @@ +import com.codahale.metrics.*; +import org.apache.log4j.BasicConfigurator; +import java.util.concurrent.TimeUnit; +import java.util.Locale; + //import com.mongodb.Mongo; import com.mongodb.MongoClient; import com.mongodb.MongoCredential; @@ -27,10 +32,11 @@ import java.util.concurrent.locks.ReentrantLock; public class jmongosysbenchload { - public static AtomicLong globalInserts = new AtomicLong(0); + static final MetricRegistry metrics = new MetricRegistry(); + private final Timer insertLatencies = metrics.timer(MetricRegistry.name("sysbench", "inserts")); + public static AtomicLong globalWriterThreads = new AtomicLong(0); - public static Writer writer = null; public static boolean outputHeader = true; public static int numCollections; @@ -56,6 +62,8 @@ public jmongosysbenchload() { } public static void main (String[] args) throws Exception { + BasicConfigurator.configure(); + if (args.length != 15) { logMe("*** ERROR : CONFIGURATION ISSUE ***"); logMe("jsysbenchload [number of collections] [database name] [number of writer threads] [documents per collection] [documents per insert] [inserts feedback] [seconds feedback] [log file name] [compression type] [basement node size (bytes)] [writeconcern] [server] [port] [username] [password]"); @@ -124,6 +132,19 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { DB db = m.getDB(dbName); + final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metrics) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + consoleReporter.start(10, TimeUnit.SECONDS); + + final CsvReporter csvReporter = CsvReporter.forRegistry(metrics) + .formatFor(Locale.US) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(new File(logFileName)); + csvReporter.start(1, TimeUnit.SECONDS); + // determine server type : mongo or tokumx DBObject checkServerCmd = new BasicDBObject(); CommandResult commandResult = db.command("buildInfo"); @@ -146,12 +167,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { logMe("--------------------------------------------------"); - try { - writer = new BufferedWriter(new FileWriter(new File(logFileName))); - } catch (IOException e) { - e.printStackTrace(); - } - if ((!indexTechnology.toLowerCase().equals("tokumx")) && (!indexTechnology.toLowerCase().equals("mongo"))) { // unknown index technology, abort logMe(" *** Unknown Indexing Technology %s, shutting down",indexTechnology); @@ -160,9 +175,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { jmongosysbenchload t = new jmongosysbenchload(); - Thread reporterThread = new Thread(t.new MyReporter()); - reporterThread.start(); - Thread[] tWriterThreads = new Thread[writerThreads]; for (int collectionNumber = 0; collectionNumber < numCollections; collectionNumber++) { @@ -215,17 +227,6 @@ else if ((myWriteConcern.toLowerCase().equals("safe"))) { // all the writers are finished allDone = 1; - if (reporterThread.isAlive()) - reporterThread.join(); - - try { - if (writer != null) { - writer.close(); - } - } catch (IOException e) { - e.printStackTrace(); - } - // m.dropDatabase("mydb"); m.close(); @@ -310,9 +311,13 @@ public void run() { aDocs[i]=doc; } - coll.insert(aDocs); - numInserts += documentsPerInsert; - globalInserts.addAndGet(documentsPerInsert); + final Timer.Context context = insertLatencies.time(); + try { + coll.insert(aDocs); + numInserts += documentsPerInsert; + } finally { + context.stop(); + } } } catch (Exception e) { @@ -340,117 +345,6 @@ public static String sysbenchString(java.util.Random rand, String thisMask) { return sb.toString(); } - - // reporting thread, outputs information to console and file - class MyReporter implements Runnable { - public void run() - { - long t0 = System.currentTimeMillis(); - long lastInserts = 0; - long lastMs = t0; - long intervalNumber = 0; - long nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1)); - long nextFeedbackInserts = lastInserts + insertsPerFeedback; - long thisInserts = 0; - - while (allDone == 0) - { - try { - Thread.sleep(100); - } catch (Exception e) { - e.printStackTrace(); - } - - long now = System.currentTimeMillis(); - thisInserts = globalInserts.get(); - if (((now > nextFeedbackMillis) && (secondsPerFeedback > 0)) || - ((thisInserts >= nextFeedbackInserts) && (insertsPerFeedback > 0))) - { - intervalNumber++; - nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1)); - nextFeedbackInserts = (intervalNumber + 1) * insertsPerFeedback; - - long elapsed = now - t0; - long thisIntervalMs = now - lastMs; - - long thisIntervalInserts = thisInserts - lastInserts; - double thisIntervalInsertsPerSecond = thisIntervalInserts/(double)thisIntervalMs*1000.0; - double thisInsertsPerSecond = thisInserts/(double)elapsed*1000.0; - - if (secondsPerFeedback > 0) - { - logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); - } else { - logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); - } - - try { - if (outputHeader) - { - writer.write("tot_inserts\telap_secs\tcum_ips\tint_ips\n"); - outputHeader = false; - } - - String statusUpdate = ""; - - if (secondsPerFeedback > 0) - { - statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); - } else { - statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); - } - writer.write(statusUpdate); - writer.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - - lastInserts = thisInserts; - - lastMs = now; - } - } - - // output final numbers... - long now = System.currentTimeMillis(); - thisInserts = globalInserts.get(); - intervalNumber++; - nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1)); - nextFeedbackInserts = (intervalNumber + 1) * insertsPerFeedback; - long elapsed = now - t0; - long thisIntervalMs = now - lastMs; - long thisIntervalInserts = thisInserts - lastInserts; - double thisIntervalInsertsPerSecond = thisIntervalInserts/(double)thisIntervalMs*1000.0; - double thisInsertsPerSecond = thisInserts/(double)elapsed*1000.0; - if (secondsPerFeedback > 0) - { - logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); - } else { - logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); - } - try { - if (outputHeader) - { - writer.write("tot_inserts\telap_secs\tcum_ips\tint_ips\n"); - outputHeader = false; - } - String statusUpdate = ""; - if (secondsPerFeedback > 0) - { - statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); - } else { - statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); - } - writer.write(statusUpdate); - writer.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - - } - } - - public static void logMe(String format, Object... args) { System.out.println(Thread.currentThread() + String.format(format, args)); }