diff --git a/config.bash b/config.bash index c030e16..5ad2764 100644 --- a/config.bash +++ b/config.bash @@ -6,16 +6,18 @@ export DB_NAME=sbtest # database username on DB_NAME # Use USERNAME=none # to login to mongodb without using credentials. -export USERNAME=myuser +#export USERNAME=myuser # database password to use for USERNAME -export PASSWORD=mypass +#export PASSWORD=mypass # name of the server to connect to export MONGO_SERVER=localhost # port of the server to connect to -export MONGO_PORT=27017 +#export MONGO_PORT=27017 + +export MONGO_PORT=$(docker port mongos1 27017/tcp | cut -f2 -d":") # Use "yes" to load the collections DOLOAD=yes diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..25fcac0 --- /dev/null +++ b/install.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +wget https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/2.13.0/mongo-java-driver-2.13.0.jar + +export CLASSPATH=$PWD/mongo-java-driver-2.13.0.jar:$CLASSPATH + +exit 0 diff --git a/run.sharded.bash b/run.sharded.bash new file mode 100755 index 0000000..4fde77c --- /dev/null +++ b/run.sharded.bash @@ -0,0 +1,61 @@ +#!/bin/bash + +# simple script to run against running MongoDB/TokuMX server localhost:(default port) + +# enable passing different config files + +if [ ! $1 ]; +then + FILE="config.bash" +else + FILE=$1 +fi + +if [ -f $FILE ]; +then + echo "Loading config from $FILE....." + source $FILE +else + echo "Unable to read config $FILE" + exit 1 +fi + +javac -cp $CLASSPATH:$PWD/src src/jmongosysbenchload_sharded.java +javac -cp $CLASSPATH:$PWD/src src/jmongosysbenchexecute.java + + +# load the data + +if [[ $DOLOAD = "yes" ]]; then + echo Do load at $( date ) + export LOG_NAME=mongoSysbenchLoad-${NUM_COLLECTIONS}-${NUM_DOCUMENTS_PER_COLLECTION}-${NUM_LOADER_THREADS}.txt + export BENCHMARK_TSV=${LOG_NAME}.tsv + + rm -f $LOG_NAME + rm -f $BENCHMARK_TSV + + T="$(date +%s)" + java -cp $CLASSPATH:$PWD/src jmongosysbenchload_sharded $NUM_COLLECTIONS $DB_NAME $NUM_LOADER_THREADS $NUM_DOCUMENTS_PER_COLLECTION $NUM_DOCUMENTS_PER_INSERT $NUM_INSERTS_PER_FEEDBACK $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_TSV $MONGO_COMPRESSION $MONGO_BASEMENT $WRITE_CONCERN $MONGO_SERVER $MONGO_PORT "$USERNAME" "$PASSWORD" + echo "" | tee -a $LOG_NAME + T="$(($(date +%s)-T))" + printf "`date` | sysbench loader duration = %02d:%02d:%02d:%02d\n" "$((T/86400))" "$((T/3600%24))" "$((T/60%60))" "$((T%60))" | tee -a $LOG_NAME +fi + + +# execute the benchmark + +if [[ $DOQUERY = "yes" ]]; then + echo Do query at $( date ) + export LOG_NAME=mongoSysbenchExecute-${NUM_COLLECTIONS}-${NUM_DOCUMENTS_PER_COLLECTION}-${NUM_WRITER_THREADS}.txt + export BENCHMARK_TSV=${LOG_NAME}.tsv + + rm -f $LOG_NAME + rm -f $BENCHMARK_TSV + + T="$(date +%s)" + java -cp $CLASSPATH:$PWD/src jmongosysbenchexecute $NUM_COLLECTIONS $DB_NAME $NUM_WRITER_THREADS $NUM_DOCUMENTS_PER_COLLECTION $NUM_SECONDS_PER_FEEDBACK $BENCHMARK_TSV $SYSBENCH_AUTO_COMMIT $RUN_TIME_SECONDS $SYSBENCH_RANGE_SIZE $SYSBENCH_POINT_SELECTS $SYSBENCH_SIMPLE_RANGES $SYSBENCH_SUM_RANGES $SYSBENCH_ORDER_RANGES $SYSBENCH_DISTINCT_RANGES $SYSBENCH_INDEX_UPDATES $SYSBENCH_NON_INDEX_UPDATES $SYSBENCH_INSERTS $WRITE_CONCERN $MAX_TPS $MONGO_SERVER $MONGO_PORT $SEED "$USERNAME" "$PASSWORD" | tee -a $LOG_NAME + echo "" | tee -a $LOG_NAME + T="$(($(date +%s)-T))" + printf "`date` | sysbench benchmark duration = %02d:%02d:%02d:%02d\n" "$((T/86400))" "$((T/3600%24))" "$((T/60%60))" "$((T%60))" | tee -a $LOG_NAME +fi + diff --git a/src/jmongosysbenchload_sharded.java b/src/jmongosysbenchload_sharded.java new file mode 100644 index 0000000..24f7511 --- /dev/null +++ b/src/jmongosysbenchload_sharded.java @@ -0,0 +1,474 @@ +//import com.mongodb.Mongo; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; + +import com.mongodb.BasicDBObject; +import com.mongodb.CommandResult; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; +import com.mongodb.WriteConcern; + +public class jmongosysbenchload_sharded { + public static AtomicLong globalInserts = new AtomicLong(0); + public static AtomicLong globalWriterThreads = new AtomicLong(0); + + public static Writer writer = null; + public static boolean outputHeader = true; + + public static int numCollections; + public static String dbName; + public static int writerThreads; + public static Integer numMaxInserts; + public static int documentsPerInsert; + public static long insertsPerFeedback; + public static long secondsPerFeedback; + public static String compressionType; + public static int basementSize; + public static String logFileName; + public static String indexTechnology; + public static String myWriteConcern; + public static String serverName; + public static int serverPort; + public static String userName; + public static String passWord; + + public static int allDone = 0; + + public jmongosysbenchload_sharded() { + } + + public static void main (String[] args) throws Exception { + if (args.length != 15) { + logMe("*** ERROR : CONFIGURATION ISSUE ***"); + logMe("jsysbenchload [number of collections] [database name] [number of writer threads] [documents per collection] [documents per insert] [inserts feedback] [seconds feedback] [log file name] [compression type] [basement node size (bytes)] [writeconcern] [server] [port] [username] [password]"); + System.exit(1); + } + + numCollections = Integer.valueOf(args[0]); + dbName = args[1]; + writerThreads = Integer.valueOf(args[2]); + numMaxInserts = Integer.valueOf(args[3]); + documentsPerInsert = Integer.valueOf(args[4]); + insertsPerFeedback = Long.valueOf(args[5]); + secondsPerFeedback = Long.valueOf(args[6]); + logFileName = args[7]; + compressionType = args[8]; + basementSize = Integer.valueOf(args[9]); + myWriteConcern = args[10]; + serverName = args[11]; + serverPort = Integer.valueOf(args[12]); + userName = args[13]; + passWord = args[14]; + + WriteConcern myWC = new WriteConcern(); + if (myWriteConcern.toLowerCase().equals("fsync_safe")) { + myWC = WriteConcern.FSYNC_SAFE; + } + else if ((myWriteConcern.toLowerCase().equals("none"))) { + myWC = WriteConcern.NONE; + } + else if ((myWriteConcern.toLowerCase().equals("normal"))) { + myWC = WriteConcern.NORMAL; + } + else if ((myWriteConcern.toLowerCase().equals("replicas_safe"))) { + myWC = WriteConcern.REPLICAS_SAFE; + } + else if ((myWriteConcern.toLowerCase().equals("safe"))) { + myWC = WriteConcern.SAFE; + } + else { + logMe("*** ERROR : WRITE CONCERN ISSUE ***"); + logMe(" write concern %s is not supported",myWriteConcern); + System.exit(1); + } + + logMe("Application Parameters"); + logMe("--------------------------------------------------"); + logMe(" %d collections",numCollections); + logMe(" database name = %s",dbName); + logMe(" %d writer thread(s)",writerThreads); + logMe(" %,d documents per collection",numMaxInserts); + logMe(" Documents Per Insert = %d",documentsPerInsert); + logMe(" Feedback every %,d seconds(s)",secondsPerFeedback); + logMe(" Feedback every %,d inserts(s)",insertsPerFeedback); + logMe(" logging to file %s",logFileName); + logMe(" write concern = %s",myWriteConcern); + logMe(" Server:Port = %s:%d",serverName,serverPort); + logMe(" Username = %s",userName); + + MongoClientOptions clientOptions = new MongoClientOptions.Builder().connectionsPerHost(2048).socketTimeout(60000).writeConcern(myWC).build(); + ServerAddress srvrAdd = new ServerAddress(serverName,serverPort); + + // Credential login is optional. + MongoClient m; + if (userName.isEmpty() || userName.equalsIgnoreCase("none")) { + m = new MongoClient(srvrAdd); + } else { + MongoCredential credential = MongoCredential.createCredential(userName, dbName, passWord.toCharArray()); + m = new MongoClient(srvrAdd, Arrays.asList(credential)); + } + + logMe("mongoOptions | " + m.getMongoOptions().toString()); + logMe("mongoWriteConcern | " + m.getWriteConcern().toString()); + + DB db = m.getDB(dbName); + + CommandResult result = m.getDB("admin").command(new BasicDBObject("enablesharding", "sbtest")); + System.out.println("Enabling sharding on test db: " + result); + + // determine server type : mongo or tokumx + DBObject checkServerCmd = new BasicDBObject(); + CommandResult commandResult = db.command("buildInfo"); + + // check if tokumxVersion exists, otherwise assume mongo + if (commandResult.toString().contains("tokumxVersion")) { + indexTechnology = "tokumx"; + } + else + { + indexTechnology = "mongo"; + } + + logMe(" index technology = %s",indexTechnology); + + if (indexTechnology.toLowerCase().equals("tokumx")) { + logMe(" + compression type = %s",compressionType); + logMe(" + basement node size (bytes) = %d",basementSize); + } + + logMe("--------------------------------------------------"); + + try { + writer = new BufferedWriter(new FileWriter(new File(logFileName))); + } catch (IOException e) { + e.printStackTrace(); + } + + if ((!indexTechnology.toLowerCase().equals("tokumx")) && (!indexTechnology.toLowerCase().equals("mongo"))) { + // unknown index technology, abort + logMe(" *** Unknown Indexing Technology %s, shutting down",indexTechnology); + System.exit(1); + } + + jmongosysbenchload_sharded t = new jmongosysbenchload_sharded(); + + Thread reporterThread = new Thread(t.new MyReporter()); + reporterThread.start(); + + Thread[] tWriterThreads = new Thread[writerThreads]; + + for (int collectionNumber = 0; collectionNumber < numCollections; collectionNumber++) { + // if necessary, wait for an available slot for this loader + boolean waitingForSlot = true; + while (globalWriterThreads.get() >= writerThreads) { + if (waitingForSlot) { + logMe(" collection %d is waiting for an available loader slot",collectionNumber+1); + waitingForSlot = false; + } + try { + Thread.sleep(5000); + } catch (Exception e) { + e.printStackTrace(); + } + } + + // start the loader + for (int i=0; i 0) { + try { + Thread.sleep(5000); + } catch (Exception e) { + e.printStackTrace(); + } + } + + // wait for writer threads to terminate + //for (int i=0; i nextFeedbackMillis) && (secondsPerFeedback > 0)) || + ((thisInserts >= nextFeedbackInserts) && (insertsPerFeedback > 0))) + { + intervalNumber++; + nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1)); + nextFeedbackInserts = (intervalNumber + 1) * insertsPerFeedback; + + long elapsed = now - t0; + long thisIntervalMs = now - lastMs; + + long thisIntervalInserts = thisInserts - lastInserts; + double thisIntervalInsertsPerSecond = thisIntervalInserts/(double)thisIntervalMs*1000.0; + double thisInsertsPerSecond = thisInserts/(double)elapsed*1000.0; + + if (secondsPerFeedback > 0) + { + logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); + } else { + logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); + } + + try { + if (outputHeader) + { + writer.write("tot_inserts\telap_secs\tcum_ips\tint_ips\n"); + outputHeader = false; + } + + String statusUpdate = ""; + + if (secondsPerFeedback > 0) + { + statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); + } else { + statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); + } + writer.write(statusUpdate); + writer.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + + lastInserts = thisInserts; + + lastMs = now; + } + } + + // output final numbers... + long now = System.currentTimeMillis(); + thisInserts = globalInserts.get(); + intervalNumber++; + nextFeedbackMillis = t0 + (1000 * secondsPerFeedback * (intervalNumber + 1)); + nextFeedbackInserts = (intervalNumber + 1) * insertsPerFeedback; + long elapsed = now - t0; + long thisIntervalMs = now - lastMs; + long thisIntervalInserts = thisInserts - lastInserts; + double thisIntervalInsertsPerSecond = thisIntervalInserts/(double)thisIntervalMs*1000.0; + double thisInsertsPerSecond = thisInserts/(double)elapsed*1000.0; + if (secondsPerFeedback > 0) + { + logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); + } else { + logMe("%,d inserts : %,d seconds : cum ips=%,.2f : int ips=%,.2f", intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); + } + try { + if (outputHeader) + { + writer.write("tot_inserts\telap_secs\tcum_ips\tint_ips\n"); + outputHeader = false; + } + String statusUpdate = ""; + if (secondsPerFeedback > 0) + { + statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",thisInserts, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); + } else { + statusUpdate = String.format("%d\t%d\t%.2f\t%.2f\n",intervalNumber * insertsPerFeedback, elapsed / 1000l, thisInsertsPerSecond, thisIntervalInsertsPerSecond); + } + writer.write(statusUpdate); + writer.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + } + + + public static void logMe(String format, Object... args) { + System.out.println(Thread.currentThread() + String.format(format, args)); + } +}