From 4c66cde07c0fb7bfc8d859a0d299b6c2aa4b178e Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Thu, 10 Aug 2023 14:08:37 +0200 Subject: [PATCH] further integrate the previous merge * this commit also moved some packages --- .../cc/config/elements/StorageConfig.java | 9 +- .../iguana/cc/controller/MainController.java | 2 +- .../stresstest => }/metrics/Metric.java | 2 +- .../metrics/MetricManager.java | 2 +- .../iguana/cc/metrics/ModelWritingMetric.java | 20 ++ .../aksw/iguana/cc/metrics/QueryMetric.java | 9 + .../aksw/iguana/cc/metrics/TaskMetric.java | 9 + .../aksw/iguana/cc/metrics/WorkerMetric.java | 9 + .../impl/AggregatedExecutionStatistics.java | 92 +++++++ .../stresstest => }/metrics/impl/AvgQPS.java | 24 +- .../metrics/impl/EachExecutionStatistic.java | 49 ++++ .../org/aksw/iguana/cc/metrics/impl/NoQ.java | 38 +++ .../aksw/iguana/cc/metrics/impl/NoQPH.java | 48 ++++ .../stresstest => }/metrics/impl/PAvgQPS.java | 24 +- .../stresstest => }/metrics/impl/PQPS.java | 17 +- .../org/aksw/iguana/cc/metrics/impl/QMPH.java | 50 ++++ .../stresstest => }/metrics/impl/QPS.java | 17 +- .../iguana/cc/query/handler/QueryHandler.java | 8 +- .../cc/query/selector/QuerySelector.java | 2 + .../selector/impl/LinearQuerySelector.java | 14 +- .../stresstest => }/storage/Storage.java | 2 +- .../storage/StorageManager.java | 2 +- .../storage/impl/CSVStorage.java | 24 +- .../storage/impl/RDFFileStorage.java | 20 +- .../storage/impl/TriplestoreStorage.java | 56 ++-- .../java/org/aksw/iguana/cc/suite/Suite.java | 19 +- .../java/org/aksw/iguana/cc/tasks/Task.java | 5 +- .../{stresstest => impl}/Stresstest.java | 71 ++++- .../tasks/impl/StresstestResultProcessor.java | 253 ++++++++++++++++++ .../tasks/stresstest/StresstestMetadata.java | 24 -- .../stresstest/StresstestResultProcessor.java | 230 ---------------- .../metrics/ModelWritingMetric.java | 20 -- .../tasks/stresstest/metrics/QueryMetric.java | 9 - .../tasks/stresstest/metrics/TaskMetric.java | 10 - .../stresstest/metrics/WorkerMetric.java | 10 - .../impl/AggregatedExecutionStatistics.java | 101 ------- .../metrics/impl/EachExecutionStatistic.java | 52 ---- .../cc/tasks/stresstest/metrics/impl/NoQ.java | 43 --- .../tasks/stresstest/metrics/impl/NoQPH.java | 53 ---- .../tasks/stresstest/metrics/impl/QMPH.java | 54 ---- .../storage/TripleBasedStorage.java | 29 -- .../org/aksw/iguana/cc/worker/HttpWorker.java | 83 +++++- .../aksw/iguana/cc/worker/WorkerMetadata.java | 10 - .../cc/worker/impl/SPARQLProtocolWorker.java | 22 +- .../org/aksw/iguana/commons/rdf/IONT.java | 2 +- .../org/aksw/iguana/commons/rdf/IPROP.java | 2 +- .../org/aksw/iguana/commons/rdf/IRES.java | 44 ++- .../aksw/iguana/commons/time/TimeUtils.java | 6 + .../cc/config/elements/StorageConfigTest.java | 2 +- .../aksw/iguana/cc/tasks/MockupStorage.java | 2 +- .../cc/tasks/storage/impl/CSVStorageTest.java | 18 +- .../tasks/storage/impl/NTFileStorageTest.java | 71 ----- .../storage/impl/RDFFileStorageTest.java | 4 +- .../storage/impl/TriplestoreStorageTest.java | 2 +- .../worker/impl/SPARQLProtocolWorkerTest.java | 15 +- 55 files changed, 901 insertions(+), 914 deletions(-) rename src/main/java/org/aksw/iguana/cc/{tasks/stresstest => }/metrics/Metric.java (91%) rename src/main/java/org/aksw/iguana/cc/{tasks/stresstest => }/metrics/MetricManager.java (84%) create mode 100644 src/main/java/org/aksw/iguana/cc/metrics/ModelWritingMetric.java create mode 100644 src/main/java/org/aksw/iguana/cc/metrics/QueryMetric.java create mode 100644 src/main/java/org/aksw/iguana/cc/metrics/TaskMetric.java create mode 100644 src/main/java/org/aksw/iguana/cc/metrics/WorkerMetric.java create mode 100644 src/main/java/org/aksw/iguana/cc/metrics/impl/AggregatedExecutionStatistics.java rename src/main/java/org/aksw/iguana/cc/{tasks/stresstest => }/metrics/impl/AvgQPS.java (55%) create mode 100644 src/main/java/org/aksw/iguana/cc/metrics/impl/EachExecutionStatistic.java create mode 100644 src/main/java/org/aksw/iguana/cc/metrics/impl/NoQ.java create mode 100644 src/main/java/org/aksw/iguana/cc/metrics/impl/NoQPH.java rename src/main/java/org/aksw/iguana/cc/{tasks/stresstest => }/metrics/impl/PAvgQPS.java (59%) rename src/main/java/org/aksw/iguana/cc/{tasks/stresstest => }/metrics/impl/PQPS.java (66%) create mode 100644 src/main/java/org/aksw/iguana/cc/metrics/impl/QMPH.java rename src/main/java/org/aksw/iguana/cc/{tasks/stresstest => }/metrics/impl/QPS.java (61%) rename src/main/java/org/aksw/iguana/cc/{tasks/stresstest => }/storage/Storage.java (86%) rename src/main/java/org/aksw/iguana/cc/{tasks/stresstest => }/storage/StorageManager.java (95%) rename src/main/java/org/aksw/iguana/cc/{tasks/stresstest => }/storage/impl/CSVStorage.java (95%) rename src/main/java/org/aksw/iguana/cc/{tasks/stresstest => }/storage/impl/RDFFileStorage.java (82%) rename src/main/java/org/aksw/iguana/cc/{tasks/stresstest => }/storage/impl/TriplestoreStorage.java (69%) rename src/main/java/org/aksw/iguana/cc/tasks/{stresstest => impl}/Stresstest.java (52%) create mode 100644 src/main/java/org/aksw/iguana/cc/tasks/impl/StresstestResultProcessor.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/StresstestMetadata.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/StresstestResultProcessor.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/ModelWritingMetric.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/QueryMetric.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/TaskMetric.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/WorkerMetric.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/AggregatedExecutionStatistics.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/EachExecutionStatistic.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/NoQ.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/NoQPH.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/QMPH.java delete mode 100644 src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/TripleBasedStorage.java delete mode 100644 src/main/java/org/aksw/iguana/cc/worker/WorkerMetadata.java delete mode 100644 src/test/java/org/aksw/iguana/cc/tasks/storage/impl/NTFileStorageTest.java diff --git a/src/main/java/org/aksw/iguana/cc/config/elements/StorageConfig.java b/src/main/java/org/aksw/iguana/cc/config/elements/StorageConfig.java index 65d716a66..7e72d45ed 100644 --- a/src/main/java/org/aksw/iguana/cc/config/elements/StorageConfig.java +++ b/src/main/java/org/aksw/iguana/cc/config/elements/StorageConfig.java @@ -1,11 +1,9 @@ package org.aksw.iguana.cc.config.elements; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.aksw.iguana.cc.worker.impl.SPARQLProtocolWorker; -import org.aksw.iguana.rp.storage.impl.RDFFileStorage; -import org.aksw.iguana.rp.storage.impl.TriplestoreStorage; +import org.aksw.iguana.cc.storage.impl.RDFFileStorage; +import org.aksw.iguana.cc.storage.impl.TriplestoreStorage; /** * Storage Configuration class @@ -16,7 +14,8 @@ property = "type") @JsonSubTypes({ @JsonSubTypes.Type(value = TriplestoreStorage.Config.class, name = "triplestore"), - @JsonSubTypes.Type(value = RDFFileStorage.Config.class, name = "RDF file"), + @JsonSubTypes.Type(value = RDFFileStorage.Config.class, name = "rdf"), + @JsonSubTypes.Type(value = CSVStorage.Config.class, name = "csv") }) public interface StorageConfig {} diff --git a/src/main/java/org/aksw/iguana/cc/controller/MainController.java b/src/main/java/org/aksw/iguana/cc/controller/MainController.java index 3a955ebc3..bc1b4d953 100644 --- a/src/main/java/org/aksw/iguana/cc/controller/MainController.java +++ b/src/main/java/org/aksw/iguana/cc/controller/MainController.java @@ -63,7 +63,7 @@ public static void main(String[] argc) throws IOException { } // TODO: a bit of error handling Suite parse = IguanaSuiteParser.parse(args.suitePath); - Suite.Result run = parse.run(); + parse.run(); System.exit(0); } diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/Metric.java b/src/main/java/org/aksw/iguana/cc/metrics/Metric.java similarity index 91% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/Metric.java rename to src/main/java/org/aksw/iguana/cc/metrics/Metric.java index 2e6a79403..e821c1623 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/Metric.java +++ b/src/main/java/org/aksw/iguana/cc/metrics/Metric.java @@ -1,4 +1,4 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics; +package org.aksw.iguana.cc.metrics; public abstract class Metric { private final String name; diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/MetricManager.java b/src/main/java/org/aksw/iguana/cc/metrics/MetricManager.java similarity index 84% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/MetricManager.java rename to src/main/java/org/aksw/iguana/cc/metrics/MetricManager.java index 5320774fa..bf0aca1fb 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/MetricManager.java +++ b/src/main/java/org/aksw/iguana/cc/metrics/MetricManager.java @@ -1,4 +1,4 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics; +package org.aksw.iguana.cc.metrics; import java.util.List; diff --git a/src/main/java/org/aksw/iguana/cc/metrics/ModelWritingMetric.java b/src/main/java/org/aksw/iguana/cc/metrics/ModelWritingMetric.java new file mode 100644 index 000000000..f696cf4ff --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/metrics/ModelWritingMetric.java @@ -0,0 +1,20 @@ +package org.aksw.iguana.cc.metrics; + +import org.aksw.iguana.cc.worker.HttpWorker; +import org.aksw.iguana.commons.rdf.IRES; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; + +import javax.annotation.Nonnull; +import java.util.List; +import java.util.Map; + +public interface ModelWritingMetric { + default @Nonnull Model createMetricModel(List workers, List[][] data, IRES.Factory iresFactory) { + return ModelFactory.createDefaultModel(); + } + + default @Nonnull Model createMetricModel(List workers, Map> data, IRES.Factory iresFactory) { + return ModelFactory.createDefaultModel(); + } +} diff --git a/src/main/java/org/aksw/iguana/cc/metrics/QueryMetric.java b/src/main/java/org/aksw/iguana/cc/metrics/QueryMetric.java new file mode 100644 index 000000000..9b771a570 --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/metrics/QueryMetric.java @@ -0,0 +1,9 @@ +package org.aksw.iguana.cc.metrics; + +import org.aksw.iguana.cc.worker.HttpWorker; + +import java.util.List; + +public interface QueryMetric { + Number calculateQueryMetric(List data); +} diff --git a/src/main/java/org/aksw/iguana/cc/metrics/TaskMetric.java b/src/main/java/org/aksw/iguana/cc/metrics/TaskMetric.java new file mode 100644 index 000000000..8b4360306 --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/metrics/TaskMetric.java @@ -0,0 +1,9 @@ +package org.aksw.iguana.cc.metrics; + +import org.aksw.iguana.cc.worker.HttpWorker; + +import java.util.List; + +public interface TaskMetric { + Number calculateTaskMetric(List workers, List[][] data); +} diff --git a/src/main/java/org/aksw/iguana/cc/metrics/WorkerMetric.java b/src/main/java/org/aksw/iguana/cc/metrics/WorkerMetric.java new file mode 100644 index 000000000..1fe5b763f --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/metrics/WorkerMetric.java @@ -0,0 +1,9 @@ +package org.aksw.iguana.cc.metrics; + +import org.aksw.iguana.cc.worker.HttpWorker; + +import java.util.List; + +public interface WorkerMetric { + Number calculateWorkerMetric(HttpWorker.Config worker, List[] data); +} diff --git a/src/main/java/org/aksw/iguana/cc/metrics/impl/AggregatedExecutionStatistics.java b/src/main/java/org/aksw/iguana/cc/metrics/impl/AggregatedExecutionStatistics.java new file mode 100644 index 000000000..c607b48c5 --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/metrics/impl/AggregatedExecutionStatistics.java @@ -0,0 +1,92 @@ +package org.aksw.iguana.cc.metrics.impl; + +import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.metrics.ModelWritingMetric; +import org.aksw.iguana.cc.worker.HttpWorker; +import org.aksw.iguana.commons.rdf.IONT; +import org.aksw.iguana.commons.rdf.IPROP; +import org.aksw.iguana.commons.rdf.IRES; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.rdf.model.Resource; +import org.apache.jena.rdf.model.ResourceFactory; +import org.apache.jena.vocabulary.RDF; + +import javax.annotation.Nonnull; +import java.math.BigInteger; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.aksw.iguana.commons.time.TimeUtils.toXSDDurationInSeconds; + +public class AggregatedExecutionStatistics extends Metric implements ModelWritingMetric { + + public AggregatedExecutionStatistics() { + super("Aggregated Execution Statistics", "AES", "Sums up the statistics of each query execution for each query a worker and task has. The result size only contains the value of the last execution."); + } + + @Override + @Nonnull + public Model createMetricModel(List workers, List[][] data, IRES.Factory iresFactory) { + Model m = ModelFactory.createDefaultModel(); + for (var worker : workers) { + for (int i = 0; i < worker.config().queries().getQueryCount(); i++) { + Resource queryRes = iresFactory.getWorkerQueryResource(worker, i); + m.add(createAggregatedModel(data[(int) worker.getWorkerID()][i], queryRes)); + } + } + return m; + } + + @Override + @Nonnull + public Model createMetricModel(List workers, Map> data, IRES.Factory iresFactory) { + Model m = ModelFactory.createDefaultModel(); + for (String queryID : data.keySet()) { + Resource queryRes = iresFactory.getTaskQueryResource(queryID); + m.add(createAggregatedModel(data.get(queryID), queryRes)); + } + return m; + } + + private static Model createAggregatedModel(List data, Resource queryRes) { + Model m = ModelFactory.createDefaultModel(); + BigInteger succeeded = BigInteger.ZERO; + BigInteger failed = BigInteger.ZERO; + Optional resultSize = Optional.empty(); + BigInteger wrongCodes = BigInteger.ZERO; + BigInteger timeOuts = BigInteger.ZERO; + BigInteger unknownExceptions = BigInteger.ZERO; + Duration totalTime = Duration.ZERO; + + for (HttpWorker.ExecutionStats exec : data) { + switch (exec.endState()) { + case SUCCESS -> succeeded = succeeded.add(BigInteger.ONE); + case TIMEOUT -> timeOuts = timeOuts.add(BigInteger.ONE); + case HTTP_ERROR -> wrongCodes = wrongCodes.add(BigInteger.ONE); + case MISCELLANEOUS_EXCEPTION -> unknownExceptions = unknownExceptions.add(BigInteger.ONE); + } + + if (!exec.successful()) + failed = failed.add(BigInteger.ONE); + + totalTime = totalTime.plus(exec.duration()); + if (exec.contentLength().isPresent()) + resultSize = Optional.of(BigInteger.valueOf(exec.contentLength().getAsLong())); + } + + m.add(queryRes, IPROP.succeeded, ResourceFactory.createTypedLiteral(succeeded)); + m.add(queryRes, IPROP.failed, ResourceFactory.createTypedLiteral(failed)); + if (resultSize.isPresent()) + m.add(queryRes, IPROP.resultSize, ResourceFactory.createTypedLiteral(resultSize.get())); + m.add(queryRes, IPROP.timeOuts, ResourceFactory.createTypedLiteral(timeOuts)); + m.add(queryRes, IPROP.wrongCodes, ResourceFactory.createTypedLiteral(wrongCodes)); + m.add(queryRes, IPROP.unknownException, ResourceFactory.createTypedLiteral(unknownExceptions)); + m.add(queryRes, IPROP.totalTime, ResourceFactory.createTypedLiteral(toXSDDurationInSeconds(totalTime))); + m.add(queryRes, RDF.type, IONT.executedQuery); + + return m; + } +} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/AvgQPS.java b/src/main/java/org/aksw/iguana/cc/metrics/impl/AvgQPS.java similarity index 55% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/AvgQPS.java rename to src/main/java/org/aksw/iguana/cc/metrics/impl/AvgQPS.java index 5d56e7f9e..ceb325787 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/AvgQPS.java +++ b/src/main/java/org/aksw/iguana/cc/metrics/impl/AvgQPS.java @@ -1,18 +1,14 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics.impl; +package org.aksw.iguana.cc.metrics.impl; -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.tasks.stresstest.StresstestMetadata; -import org.aksw.iguana.cc.worker.WorkerMetadata; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.TaskMetric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.WorkerMetric; -import org.aksw.iguana.commons.annotation.Shorthand; +import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.metrics.TaskMetric; +import org.aksw.iguana.cc.metrics.WorkerMetric; +import org.aksw.iguana.cc.worker.HttpWorker; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.List; -@Shorthand("AvgQPS") public class AvgQPS extends Metric implements TaskMetric, WorkerMetric { public AvgQPS() { @@ -20,10 +16,10 @@ public AvgQPS() { } @Override - public Number calculateTaskMetric(StresstestMetadata task, List[][] data) { + public Number calculateTaskMetric(List workers, List[][] data) { BigDecimal sum = BigDecimal.ZERO; - for (WorkerMetadata worker : task.workers()) { - sum = sum.add((BigDecimal) this.calculateWorkerMetric(worker, data[worker.workerID()])); + for (var worker : workers) { + sum = sum.add((BigDecimal) this.calculateWorkerMetric(worker.config(), data[(int) worker.getWorkerID()])); } try { @@ -34,10 +30,10 @@ public Number calculateTaskMetric(StresstestMetadata task, List[] data) { + public Number calculateWorkerMetric(HttpWorker.Config worker, List[] data) { BigDecimal sum = BigDecimal.ZERO; QPS qpsmetric = new QPS(); - for (List datum : data) { + for (List datum : data) { sum = sum.add((BigDecimal) qpsmetric.calculateQueryMetric(datum)); } diff --git a/src/main/java/org/aksw/iguana/cc/metrics/impl/EachExecutionStatistic.java b/src/main/java/org/aksw/iguana/cc/metrics/impl/EachExecutionStatistic.java new file mode 100644 index 000000000..835372176 --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/metrics/impl/EachExecutionStatistic.java @@ -0,0 +1,49 @@ +package org.aksw.iguana.cc.metrics.impl; + +import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.metrics.ModelWritingMetric; +import org.aksw.iguana.cc.worker.HttpWorker; +import org.aksw.iguana.commons.rdf.IPROP; +import org.aksw.iguana.commons.rdf.IRES; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.rdf.model.Resource; +import org.apache.jena.rdf.model.ResourceFactory; + +import javax.annotation.Nonnull; +import java.math.BigInteger; +import java.util.List; + +public class EachExecutionStatistic extends Metric implements ModelWritingMetric { + + public EachExecutionStatistic() { + super("Each Query Execution Statistic", "EachQuery", "This metric saves the statistics of each query execution."); + } + + @Override + @Nonnull + public Model createMetricModel(List workers, List[][] data, IRES.Factory iresFactory) { + Model m = ModelFactory.createDefaultModel(); + for (var worker : workers) { + for (int i = 0; i < worker.config().queries().getQueryCount(); i++) { + Resource workerQueryResource = iresFactory.getWorkerQueryResource(worker, i); + Resource queryRes = IRES.getResource(worker.config().queries().getQueryId(i)); + BigInteger run = BigInteger.ONE; + for (HttpWorker.ExecutionStats exec : data[(int) worker.getWorkerID()][i]) { + Resource runRes = iresFactory.getWorkerQueryRunResource(worker, i, run); + m.add(workerQueryResource, IPROP.queryExecution, runRes); + m.add(runRes, IPROP.time, ResourceFactory.createTypedLiteral(exec.duration())); + m.add(runRes, IPROP.success, ResourceFactory.createTypedLiteral(exec.successful())); + m.add(runRes, IPROP.run, ResourceFactory.createTypedLiteral(run)); + m.add(runRes, IPROP.code, ResourceFactory.createTypedLiteral(exec.endState().value)); + // TODO: maybe add http status code + if (exec.contentLength().isPresent()) + m.add(runRes, IPROP.resultSize, ResourceFactory.createTypedLiteral(exec.contentLength().getAsLong())); + m.add(runRes, IPROP.queryID, queryRes); + run = run.add(BigInteger.ONE); + } + } + } + return m; + } +} diff --git a/src/main/java/org/aksw/iguana/cc/metrics/impl/NoQ.java b/src/main/java/org/aksw/iguana/cc/metrics/impl/NoQ.java new file mode 100644 index 000000000..b29e061ac --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/metrics/impl/NoQ.java @@ -0,0 +1,38 @@ +package org.aksw.iguana.cc.metrics.impl; + +import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.metrics.TaskMetric; +import org.aksw.iguana.cc.metrics.WorkerMetric; +import org.aksw.iguana.cc.worker.HttpWorker; + +import java.math.BigInteger; +import java.util.List; + +public class NoQ extends Metric implements TaskMetric, WorkerMetric { + + public NoQ() { + super("Number of Queries", "NoQ", "This metric calculates the number of successfully executed queries."); + } + + @Override + public Number calculateTaskMetric(List workers, List[][] data) { + BigInteger sum = BigInteger.ZERO; + for (var worker : workers) { + sum = sum.add((BigInteger) this.calculateWorkerMetric(worker.config(), data[(int) worker.getWorkerID()])); + } + return sum; + } + + @Override + public Number calculateWorkerMetric(HttpWorker.Config worker, List[] data) { + BigInteger sum = BigInteger.ZERO; + for (List datum : data) { + for (HttpWorker.ExecutionStats exec : datum) { + if (exec.successful()) { + sum = sum.add(BigInteger.ONE); + } + } + } + return sum; + } +} diff --git a/src/main/java/org/aksw/iguana/cc/metrics/impl/NoQPH.java b/src/main/java/org/aksw/iguana/cc/metrics/impl/NoQPH.java new file mode 100644 index 000000000..57b532e86 --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/metrics/impl/NoQPH.java @@ -0,0 +1,48 @@ +package org.aksw.iguana.cc.metrics.impl; + +import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.metrics.TaskMetric; +import org.aksw.iguana.cc.metrics.WorkerMetric; +import org.aksw.iguana.cc.worker.HttpWorker; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; +import java.time.Duration; +import java.util.List; + +public class NoQPH extends Metric implements TaskMetric, WorkerMetric { + + public NoQPH() { + super("Number of Queries per Hour", "NoQPH", "This metric calculates the number of successfully executed queries per hour."); + } + @Override + public Number calculateTaskMetric(List workers, List[][] data) { + BigDecimal sum = BigDecimal.ZERO; + for (var worker : workers) { + sum = sum.add((BigDecimal) this.calculateWorkerMetric(worker.config(), data[(int) worker.getWorkerID()])); + } + return sum; + } + + @Override + public Number calculateWorkerMetric(HttpWorker.Config worker, List[] data) { + BigDecimal successes = BigDecimal.ZERO; + Duration totalTime = Duration.ZERO; + for (List datum : data) { + for (HttpWorker.ExecutionStats exec : datum) { + if (exec.successful()) { + successes = successes.add(BigDecimal.ONE); + totalTime = totalTime.plus(exec.duration()); + } + } + } + BigDecimal tt = (new BigDecimal(BigInteger.valueOf(totalTime.toNanos()), 9)).divide(BigDecimal.valueOf(3600), 20, RoundingMode.HALF_UP); + + try { + return successes.divide(tt, 10, RoundingMode.HALF_UP); + } catch (ArithmeticException e) { + return BigDecimal.ZERO; + } + } +} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/PAvgQPS.java b/src/main/java/org/aksw/iguana/cc/metrics/impl/PAvgQPS.java similarity index 59% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/PAvgQPS.java rename to src/main/java/org/aksw/iguana/cc/metrics/impl/PAvgQPS.java index f71d42ae3..3c3901f0a 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/PAvgQPS.java +++ b/src/main/java/org/aksw/iguana/cc/metrics/impl/PAvgQPS.java @@ -1,18 +1,14 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics.impl; +package org.aksw.iguana.cc.metrics.impl; -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.tasks.stresstest.StresstestMetadata; -import org.aksw.iguana.cc.worker.WorkerMetadata; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.TaskMetric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.WorkerMetric; -import org.aksw.iguana.commons.annotation.Shorthand; +import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.metrics.TaskMetric; +import org.aksw.iguana.cc.metrics.WorkerMetric; +import org.aksw.iguana.cc.worker.HttpWorker; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.List; -@Shorthand("PAvgQPS") public class PAvgQPS extends Metric implements TaskMetric, WorkerMetric { private final int penalty; @@ -23,10 +19,10 @@ public PAvgQPS(Integer penalty) { } @Override - public Number calculateTaskMetric(StresstestMetadata task, List[][] data) { + public Number calculateTaskMetric(List workers, List[][] data) { BigDecimal sum = BigDecimal.ZERO; - for (WorkerMetadata worker : task.workers()) { - sum = sum.add((BigDecimal) this.calculateWorkerMetric(worker, data[worker.workerID()])); + for (var worker : workers) { + sum = sum.add((BigDecimal) this.calculateWorkerMetric(worker.config(), data[(int) worker.getWorkerID()])); } try { @@ -37,10 +33,10 @@ public Number calculateTaskMetric(StresstestMetadata task, List[] data) { + public Number calculateWorkerMetric(HttpWorker.Config worker, List[] data) { BigDecimal sum = BigDecimal.ZERO; PQPS pqpsmetric = new PQPS(penalty); - for (List datum : data) { + for (List datum : data) { sum = sum.add((BigDecimal) pqpsmetric.calculateQueryMetric(datum)); } if (data.length == 0) { diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/PQPS.java b/src/main/java/org/aksw/iguana/cc/metrics/impl/PQPS.java similarity index 66% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/PQPS.java rename to src/main/java/org/aksw/iguana/cc/metrics/impl/PQPS.java index bbefdc12b..e2993ef95 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/PQPS.java +++ b/src/main/java/org/aksw/iguana/cc/metrics/impl/PQPS.java @@ -1,10 +1,9 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics.impl; +package org.aksw.iguana.cc.metrics.impl; -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.QueryMetric; +import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.metrics.QueryMetric; +import org.aksw.iguana.cc.worker.HttpWorker; import org.aksw.iguana.commons.annotation.Shorthand; -import org.aksw.iguana.commons.constants.COMMON; import java.math.BigDecimal; import java.math.BigInteger; @@ -23,13 +22,13 @@ public PQPS(Integer penalty) { } @Override - public Number calculateQueryMetric(List data) { + public Number calculateQueryMetric(List data) { BigDecimal successes = BigDecimal.ZERO; Duration totalTime = Duration.ZERO; - for (QueryExecutionStats exec : data) { + for (HttpWorker.ExecutionStats exec : data) { successes = successes.add(BigDecimal.ONE); - if (exec.responseCode() == COMMON.QUERY_SUCCESS) { - totalTime = totalTime.plusNanos((long) exec.executionTime() * 1000000); + if (exec.successful()) { + totalTime = totalTime.plus(exec.duration()); } else { totalTime = totalTime.plusMillis(penalty); } diff --git a/src/main/java/org/aksw/iguana/cc/metrics/impl/QMPH.java b/src/main/java/org/aksw/iguana/cc/metrics/impl/QMPH.java new file mode 100644 index 000000000..95740b553 --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/metrics/impl/QMPH.java @@ -0,0 +1,50 @@ +package org.aksw.iguana.cc.metrics.impl; + +import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.metrics.TaskMetric; +import org.aksw.iguana.cc.metrics.WorkerMetric; +import org.aksw.iguana.cc.worker.HttpWorker; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; +import java.time.Duration; +import java.util.List; + +public class QMPH extends Metric implements TaskMetric, WorkerMetric { + + public QMPH() { + super("Query Mixes per Hour", "QMPH", "This metric calculates the amount of query mixes (a given set of queries) that are executed per hour."); + } + + @Override + public Number calculateTaskMetric(List workers, List[][] data) { + BigDecimal sum = BigDecimal.ZERO; + for (var worker : workers) { + sum = sum.add((BigDecimal) this.calculateWorkerMetric(worker.config(), data[(int) worker.getWorkerID()])); + } + return sum; + } + + @Override + public Number calculateWorkerMetric(HttpWorker.Config worker, List[] data) { + BigDecimal successes = BigDecimal.ZERO; + BigDecimal noq = BigDecimal.valueOf(worker.queries().getQueryCount()); + Duration totalTime = Duration.ZERO; + for (List datum : data) { + for (HttpWorker.ExecutionStats exec : datum) { + if (exec.successful()) { + successes = successes.add(BigDecimal.ONE); + totalTime = totalTime.plus(exec.duration()); + } + } + } + BigDecimal tt = (new BigDecimal(BigInteger.valueOf(totalTime.toNanos()), 9)).divide(BigDecimal.valueOf(3600), 20, RoundingMode.HALF_UP); + + try { + return successes.divide(tt, 10, RoundingMode.HALF_UP).divide(noq, 10, RoundingMode.HALF_UP); + } catch (ArithmeticException e) { + return BigDecimal.ZERO; + } + } +} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/QPS.java b/src/main/java/org/aksw/iguana/cc/metrics/impl/QPS.java similarity index 61% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/QPS.java rename to src/main/java/org/aksw/iguana/cc/metrics/impl/QPS.java index 888c7bb49..1a26dbe2d 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/QPS.java +++ b/src/main/java/org/aksw/iguana/cc/metrics/impl/QPS.java @@ -1,10 +1,9 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics.impl; +package org.aksw.iguana.cc.metrics.impl; -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.QueryMetric; +import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.metrics.QueryMetric; +import org.aksw.iguana.cc.worker.HttpWorker; import org.aksw.iguana.commons.annotation.Shorthand; -import org.aksw.iguana.commons.constants.COMMON; import java.math.BigDecimal; import java.math.BigInteger; @@ -20,13 +19,13 @@ public QPS() { } @Override - public Number calculateQueryMetric(List data) { + public Number calculateQueryMetric(List data) { BigDecimal successes = BigDecimal.ZERO; Duration totalTime = Duration.ZERO; - for (QueryExecutionStats exec : data) { - if (exec.responseCode() == COMMON.QUERY_SUCCESS) { + for (HttpWorker.ExecutionStats exec : data) { + if (exec.successful()) { successes = successes.add(BigDecimal.ONE); - totalTime = totalTime.plusNanos((long) exec.executionTime() * 1000000); + totalTime = totalTime.plus(exec.duration()); } } BigDecimal tt = (new BigDecimal(BigInteger.valueOf(totalTime.toNanos()), 9)); diff --git a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java index 062f426bf..fb94fe843 100644 --- a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java +++ b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java @@ -154,10 +154,16 @@ public int getQueryCount() { return this.queryList.size(); } - private String getQueryId(int i) { + public String getQueryId(int i) { return this.queryList.hashCode() + ":" + i; } + /** + * Returns every query id in the format: queryListHash:index
+ * The index of a query inside the returned array is the same as the index inside the string. + * + * @return String[] of query ids + */ public String[] getAllQueryIds() { String[] out = new String[queryList.size()]; for (int i = 0; i < queryList.size(); i++) { diff --git a/src/main/java/org/aksw/iguana/cc/query/selector/QuerySelector.java b/src/main/java/org/aksw/iguana/cc/query/selector/QuerySelector.java index 4fa7d878f..2c7381932 100644 --- a/src/main/java/org/aksw/iguana/cc/query/selector/QuerySelector.java +++ b/src/main/java/org/aksw/iguana/cc/query/selector/QuerySelector.java @@ -12,6 +12,8 @@ */ public abstract class QuerySelector { + protected ThreadLocal threadLocalIndex = ThreadLocal.withInitial(() -> 0); // TODO: moving the queryselector to the workers is probably better + protected final int size; public QuerySelector(int size) { diff --git a/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java b/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java index 5fe5c4294..65ab88b61 100644 --- a/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java +++ b/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java @@ -12,21 +12,19 @@ * * @author frensing */ -public class LinearQuerySelector extends QuerySelector { - - - protected int nextIndex; +public class LinearQuerySelector extends QuerySelector { // TODO: check if worker should have a query selector or if it should be in the query handler public LinearQuerySelector(int size) { super(size); - nextIndex = 0; } @Override public int getNextIndex() { - if (this.nextIndex >= this.size) { - this.nextIndex = 0; + int index = threadLocalIndex.get(); + if (index >= this.size) { + this.threadLocalIndex.set(0); } - return this.nextIndex++; + this.threadLocalIndex.set(index + 1); + return index; } } diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/Storage.java b/src/main/java/org/aksw/iguana/cc/storage/Storage.java similarity index 86% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/Storage.java rename to src/main/java/org/aksw/iguana/cc/storage/Storage.java index d1b045651..7afecde54 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/Storage.java +++ b/src/main/java/org/aksw/iguana/cc/storage/Storage.java @@ -1,4 +1,4 @@ -package org.aksw.iguana.cc.tasks.stresstest.storage; +package org.aksw.iguana.cc.storage; import org.apache.jena.rdf.model.Model; diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/StorageManager.java b/src/main/java/org/aksw/iguana/cc/storage/StorageManager.java similarity index 95% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/StorageManager.java rename to src/main/java/org/aksw/iguana/cc/storage/StorageManager.java index 5de7fd4e0..8972059de 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/StorageManager.java +++ b/src/main/java/org/aksw/iguana/cc/storage/StorageManager.java @@ -1,4 +1,4 @@ -package org.aksw.iguana.cc.tasks.stresstest.storage; +package org.aksw.iguana.cc.storage; import org.apache.jena.rdf.model.Model; diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/impl/CSVStorage.java b/src/main/java/org/aksw/iguana/cc/storage/impl/CSVStorage.java similarity index 95% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/impl/CSVStorage.java rename to src/main/java/org/aksw/iguana/cc/storage/impl/CSVStorage.java index f382c8e45..eed449b8b 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/impl/CSVStorage.java +++ b/src/main/java/org/aksw/iguana/cc/storage/impl/CSVStorage.java @@ -1,14 +1,14 @@ -package org.aksw.iguana.cc.tasks.stresstest.storage.impl; +package org.aksw.iguana.cc.storage.impl; +import com.fasterxml.jackson.annotation.JsonProperty; import com.opencsv.CSVReader; import com.opencsv.CSVWriter; import com.opencsv.CSVWriterBuilder; import com.opencsv.exceptions.CsvValidationException; -import org.aksw.iguana.cc.config.IguanaConfig; -import org.aksw.iguana.cc.tasks.stresstest.metrics.*; -import org.aksw.iguana.cc.tasks.stresstest.metrics.impl.AggregatedExecutionStatistics; -import org.aksw.iguana.cc.tasks.stresstest.storage.Storage; -import org.aksw.iguana.commons.annotation.Shorthand; +import org.aksw.iguana.cc.config.elements.StorageConfig; +import org.aksw.iguana.cc.metrics.*; +import org.aksw.iguana.cc.metrics.impl.AggregatedExecutionStatistics; +import org.aksw.iguana.cc.storage.Storage; import org.aksw.iguana.commons.rdf.IONT; import org.aksw.iguana.commons.rdf.IPROP; import org.apache.jena.arq.querybuilder.SelectBuilder; @@ -31,9 +31,12 @@ import java.util.NoSuchElementException; import java.util.function.Predicate; -@Shorthand("CSVStorage") public class CSVStorage implements Storage { + public record Config( + @JsonProperty String path + ) implements StorageConfig {} + private static final Logger LOGGER = LoggerFactory.getLogger(CSVStorage.class); private final Path folder; @@ -45,6 +48,10 @@ public class CSVStorage implements Storage { private String connectionVersion; private String dataset; + public CSVStorage(Config config) { + this(config.path()); + } + public CSVStorage(String folderPath) { Path parentFolder; try { @@ -56,7 +63,8 @@ public CSVStorage(String folderPath) { return; } - this.folder = parentFolder.resolve(IguanaConfig.getSuiteID()); + // TODO: add the id suite back + this.folder = parentFolder.resolve("suite"); this.taskFile = this.folder.resolve("tasks-overview.csv"); if (Files.notExists(parentFolder)) { diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/impl/RDFFileStorage.java b/src/main/java/org/aksw/iguana/cc/storage/impl/RDFFileStorage.java similarity index 82% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/impl/RDFFileStorage.java rename to src/main/java/org/aksw/iguana/cc/storage/impl/RDFFileStorage.java index 6f30d3bcf..d8ee8484b 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/impl/RDFFileStorage.java +++ b/src/main/java/org/aksw/iguana/cc/storage/impl/RDFFileStorage.java @@ -1,10 +1,8 @@ -package org.aksw.iguana.cc.tasks.stresstest.storage.impl; +package org.aksw.iguana.cc.storage.impl; import com.github.jsonldjava.shaded.com.google.common.base.Supplier; import org.aksw.iguana.cc.config.elements.StorageConfig; -import org.aksw.iguana.rp.storage.TripleBasedStorage; -import org.aksw.iguana.cc.tasks.stresstest.storage.TripleBasedStorage; -import org.aksw.iguana.commons.annotation.Shorthand; +import org.aksw.iguana.cc.storage.Storage; import org.apache.jena.rdf.model.Model; import org.apache.jena.riot.Lang; import org.apache.jena.riot.RDFDataMgr; @@ -19,14 +17,12 @@ import java.nio.file.Paths; import java.util.Calendar; -public class RDFFileStorage extends TripleBasedStorage { - public record Config(String path) implements StorageConfig { - } +public class RDFFileStorage implements Storage { + public record Config(String path) implements StorageConfig {} private static final Logger LOGGER = LoggerFactory.getLogger(RDFFileStorage.class.getName()); - protected static Supplier defaultFileNameSupplier = () -> - { + protected static Supplier defaultFileNameSupplier = () -> { var now = Calendar.getInstance(); return String.format("%d-%02d-%02d_%02d-%02d.%03d", now.get(Calendar.YEAR), @@ -70,10 +66,8 @@ public RDFFileStorage(String fileName) { @Override public void storeResult(Model data){ - super.storeResult(data); - try (OutputStream os = new FileOutputStream(file.toString(), true)) { - RDFDataMgr.write(os, metricResults, this.lang); - metricResults.removeAll(); + try (OutputStream os = new FileOutputStream(path.toString(), true)) { + RDFDataMgr.write(os, data, this.lang); } catch (IOException e) { LOGGER.error("Could not commit to RDFFileStorage using lang: " + lang, e); } diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/impl/TriplestoreStorage.java b/src/main/java/org/aksw/iguana/cc/storage/impl/TriplestoreStorage.java similarity index 69% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/impl/TriplestoreStorage.java rename to src/main/java/org/aksw/iguana/cc/storage/impl/TriplestoreStorage.java index 5131a68ce..994c24af2 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/impl/TriplestoreStorage.java +++ b/src/main/java/org/aksw/iguana/cc/storage/impl/TriplestoreStorage.java @@ -1,8 +1,8 @@ -package org.aksw.iguana.cc.tasks.stresstest.storage.impl; +package org.aksw.iguana.cc.storage.impl; import com.fasterxml.jackson.annotation.JsonProperty; import org.aksw.iguana.cc.config.elements.StorageConfig; -import org.aksw.iguana.rp.storage.TripleBasedStorage; +import org.aksw.iguana.cc.storage.Storage; import org.apache.http.auth.AuthScope; import org.apache.http.auth.Credentials; import org.apache.http.auth.UsernamePasswordCredentials; @@ -27,61 +27,47 @@ * @author f.conrads * */ -public class TriplestoreStorage extends TripleBasedStorage { +public class TriplestoreStorage implements Storage { - public record Config(@JsonProperty(required = true) String endpoint, - String user, - String password, - String baseUri) implements StorageConfig { - } + public record Config( + @JsonProperty(required = true) String endpoint, + String user, + String password, + String baseUri + ) implements StorageConfig {} private UpdateRequest blockRequest = UpdateFactory.create(); private final String endpoint; private final String user; private final String password; + private final String baseUri; public TriplestoreStorage(Config config) { endpoint = config.endpoint(); user = config.user(); password = config.password(); - if (baseUri != null && !baseUri.isEmpty()) { - baseUri = config.baseUri(); - } + baseUri = config.baseUri(); } public TriplestoreStorage(String endpoint, String user, String pwd, String baseUri) { this.endpoint = endpoint; - this.user=user; - this.password =pwd; - if(baseUri!=null && !baseUri.isEmpty()) { - this.baseUri=baseUri; - } - } - - public TriplestoreStorage(String endpoint, String baseUri) { - this.endpoint = endpoint; - if(baseUri!=null && !baseUri.isEmpty()){ - this.baseUri=baseUri; - } - user = null; - password = null; + this.user = user; + this.password = pwd; + this.baseUri = baseUri; } public TriplestoreStorage(String endpoint) { this.endpoint = endpoint; - user = null; - password = null; + this.user = null; + this.password = null; + this.baseUri = null; } @Override public void storeResult(Model data) { - super.storeResult(data); - if (metricResults.size() == 0) - return; - StringWriter results = new StringWriter(); - RDFDataMgr.write(results, metricResults, Lang.NT); + RDFDataMgr.write(results, data, Lang.NT); String update = "INSERT DATA {" + results.toString() + "}"; //Create Update Request from block blockRequest.add(update); @@ -93,11 +79,9 @@ public void storeResult(Model data) { blockRequest = new UpdateRequest(); } - - - private HttpClient createHttpClient(){ + private HttpClient createHttpClient() { CredentialsProvider credsProvider = new BasicCredentialsProvider(); - if(user !=null && password !=null){ + if(user != null && password != null){ Credentials credentials = new UsernamePasswordCredentials(user, password); credsProvider.setCredentials(AuthScope.ANY, credentials); } diff --git a/src/main/java/org/aksw/iguana/cc/suite/Suite.java b/src/main/java/org/aksw/iguana/cc/suite/Suite.java index af1a9698d..ccbd4aab5 100644 --- a/src/main/java/org/aksw/iguana/cc/suite/Suite.java +++ b/src/main/java/org/aksw/iguana/cc/suite/Suite.java @@ -24,37 +24,30 @@ public record Config( List storages) { } - public record Result(List stresstest) { - - } private final long suiteId; private final Config config; private final ResponseBodyProcessorInstances responseBodyProcessorInstances; - private final List stresstests = new ArrayList<>(); + private final List tasks = new ArrayList<>(); Suite(long suiteId, Config config) { - this.suiteId = suiteId; this.config = config; - long stresstestId = 0; + long taskID = 0; responseBodyProcessorInstances = new ResponseBodyProcessorInstances(); - for (Task.Config task : config.tasks()) { if (task instanceof Stresstest.Config) { - stresstests.add(new Stresstest(stresstestId, (Stresstest.Config) task, responseBodyProcessorInstances)); + tasks.add(new Stresstest(taskID++, (Stresstest.Config) task, responseBodyProcessorInstances, this.config.storages)); // TODO: look for a better way to add the storages } } } - public Suite.Result run() { - List stresstestResults = new ArrayList<>(); - for (Stresstest stresstest : stresstests) { - stresstestResults.add(stresstest.run()); + public void run() { + for (Task task : tasks) { + task.run(); } - return new Result(stresstestResults); } } diff --git a/src/main/java/org/aksw/iguana/cc/tasks/Task.java b/src/main/java/org/aksw/iguana/cc/tasks/Task.java index 157906590..dbc064106 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/Task.java +++ b/src/main/java/org/aksw/iguana/cc/tasks/Task.java @@ -11,6 +11,7 @@ public interface Task { @JsonSubTypes({ @JsonSubTypes.Type(value = Stresstest.Config.class, name = "stresstest"), }) - interface Config { - } + interface Config {} + + void run(); } diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/Stresstest.java b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java similarity index 52% rename from src/main/java/org/aksw/iguana/cc/tasks/stresstest/Stresstest.java rename to src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java index fcbe89c0e..021d553eb 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/Stresstest.java +++ b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java @@ -1,6 +1,15 @@ package org.aksw.iguana.cc.tasks.impl; import com.fasterxml.jackson.annotation.JsonProperty; +import org.aksw.iguana.cc.config.elements.StorageConfig; +import org.aksw.iguana.cc.metrics.MetricManager; +import org.aksw.iguana.cc.metrics.impl.AggregatedExecutionStatistics; +import org.aksw.iguana.cc.metrics.impl.AvgQPS; +import org.aksw.iguana.cc.metrics.impl.NoQPH; +import org.aksw.iguana.cc.metrics.impl.QPS; +import org.aksw.iguana.cc.storage.Storage; +import org.aksw.iguana.cc.storage.impl.CSVStorage; +import org.aksw.iguana.cc.storage.impl.RDFFileStorage; import org.aksw.iguana.cc.tasks.Task; import org.aksw.iguana.cc.worker.HttpWorker; import org.aksw.iguana.cc.worker.ResponseBodyProcessorInstances; @@ -19,8 +28,8 @@ public class Stresstest implements Task { public record Config( - List warmupWorkers, - @JsonProperty(required = true) List workers + List warmupWorkers, + @JsonProperty(required = true) List workers ) implements Task.Config {} public record PhaseExecutionConfig( @@ -29,10 +38,10 @@ public record PhaseExecutionConfig( ) {} public record Result( - long stresstestId, - List warmup, - List main - ) implements Task.Config {} + List workerResults, + Calendar startTime, + Calendar endTime + ) {} private final long stresstestId; @@ -42,12 +51,23 @@ public record Result( private static final Logger LOGGER = LoggerFactory.getLogger(Stresstest.class); private final List warmupWorkers = new ArrayList<>(); - protected List workers = new LinkedList<>(); + private List workers = new ArrayList<>(); + public List storages = new ArrayList<>(); - public Stresstest(long stresstestId, Config config, ResponseBodyProcessorInstances responseBodyProcessorInstances) { + + public Stresstest(long stresstestId, Config config, ResponseBodyProcessorInstances responseBodyProcessorInstances, List storages) { this.stresstestId = stresstestId; this.config = config; + for (var storageConfig : storages) { + if (storageConfig instanceof CSVStorage.Config) { + this.storages.add(new CSVStorage((CSVStorage.Config) storageConfig)); + } + else if (storageConfig instanceof RDFFileStorage.Config) { + this.storages.add(new RDFFileStorage((RDFFileStorage.Config) storageConfig)); + } + } + long workerId = 0; if (config.warmupWorkers() != null) { for (HttpWorker.Config workerConfig : config.warmupWorkers()) { @@ -66,12 +86,37 @@ public Stresstest(long stresstestId, Config config, ResponseBodyProcessorInstanc } } - public Result run() { + public void run() { try { var warmupResults = executeWorkers(warmupWorkers); var results = executeWorkers(workers); - return new Result(stresstestId, warmupResults, results); + LOGGER.info("Stresstest {} finished", stresstestId); + + Set queryIDs = new HashSet<>(); + for (HttpWorker.Config wConfig : this.config.workers) { + if (wConfig instanceof SPARQLProtocolWorker.Config) { + queryIDs.addAll(List.of((wConfig).queries().getAllQueryIds())); + } + } + + // TODO: language processor + + // TODO: maybe add this to the configd + MetricManager.setMetrics(List.of(new QPS(), new AvgQPS(), new NoQPH(), new AggregatedExecutionStatistics())); + + // TODO: suiteID + StresstestResultProcessor srp = new StresstestResultProcessor( + 0L, + this.stresstestId, + this.workers, + new ArrayList<>(queryIDs), + this.storages + ); + + srp.process(results.workerResults); + srp.calculateAndSaveMetrics(results.startTime, results.endTime, null); + } catch (InterruptedException e) { throw new RuntimeException(e); @@ -80,12 +125,14 @@ public Result run() { } } - private List executeWorkers(List workers) throws InterruptedException, ExecutionException { + private Result executeWorkers(List workers) throws InterruptedException, ExecutionException { List results = new ArrayList<>(workers.size()); + Calendar startTime = Calendar.getInstance(); var futures = workers.stream().map(HttpWorker::start).toList(); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join(); + Calendar endTime = Calendar.getInstance(); for (CompletableFuture future : futures) results.add(future.get()); - return results; + return new Result(results, startTime, endTime); } } diff --git a/src/main/java/org/aksw/iguana/cc/tasks/impl/StresstestResultProcessor.java b/src/main/java/org/aksw/iguana/cc/tasks/impl/StresstestResultProcessor.java new file mode 100644 index 000000000..cee851e3a --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/tasks/impl/StresstestResultProcessor.java @@ -0,0 +1,253 @@ +package org.aksw.iguana.cc.tasks.impl; + +import org.aksw.iguana.cc.metrics.*; +import org.aksw.iguana.cc.storage.Storage; +import org.aksw.iguana.cc.worker.HttpWorker; +import org.aksw.iguana.commons.rdf.IGUANA_BASE; +import org.aksw.iguana.commons.rdf.IONT; +import org.aksw.iguana.commons.rdf.IPROP; +import org.aksw.iguana.commons.rdf.IRES; +import org.aksw.iguana.commons.time.TimeUtils; +import org.apache.jena.rdf.model.*; +import org.apache.jena.vocabulary.RDF; +import org.apache.jena.vocabulary.RDFS; + +import java.util.*; + +public class StresstestResultProcessor { + + private final List metrics; + + private final List workers; + private final List queryIDs; + private final List storages; + + /** + * This array contains each query execution, grouped by each worker and each query. + */ + private List[][] workerQueryExecutions; + + /** + * This map contains each query execution, grouped by each query of the task. + */ + private Map> taskQueryExecutions; + + + private final IRES.Factory iresFactory; + + + public StresstestResultProcessor(long suiteID, + long taskID, + List worker, + List queryIDs, + List storages) { + this.workers = worker; + this.queryIDs = queryIDs; + this.storages = storages; + + this.metrics = MetricManager.getMetrics(); // TODO: change this + + this.workerQueryExecutions = new List[workers.size()][]; + for (int i = 0; i < workers.size(); i++) { + this.workerQueryExecutions[i] = new List[workers.get(i).config().queries().getQueryCount()]; + for (int j = 0; j < workers.get(i).config().queries().getQueryCount(); j++) { + this.workerQueryExecutions[i][j] = new ArrayList<>(); + } + } + + this.taskQueryExecutions = new HashMap<>(); + for (String queryID : queryIDs) { + this.taskQueryExecutions.put(queryID, new ArrayList<>()); + } + + this.iresFactory = new IRES.Factory(suiteID, taskID); + } + + /** + * This method stores the given query executions statistics from a worker to their appropriate data location. + * + * @param data the query execution statistics that should be stored + */ + public void process(Collection data) { + for (HttpWorker.Result result : data) { + for (var stat : result.executionStats()) { + workerQueryExecutions[(int) result.workerID()][stat.queryID()].add(stat); + String queryID = workers.get((int) result.workerID()).config().queries().getQueryId(stat.queryID()); + taskQueryExecutions.get(queryID).add(stat); + } + } + } + + /** + * This method calculates the metrics and creates the RDF model of the result, which will be sent to the storages. + * It uses the given data that was passed with the 'processQueryExecutions' method. + * + * @param start the start date of the task + * @param end the end date of the task + */ + public void calculateAndSaveMetrics(Calendar start, Calendar end, Model triplestats) { + Model m = ModelFactory.createDefaultModel(); + Resource suiteRes = iresFactory.getSuiteResource(); + Resource taskRes = iresFactory.getTaskResource(); + + m.add(suiteRes, RDF.type, IONT.suite); + m.add(suiteRes, IPROP.task, taskRes); + m.add(taskRes, RDF.type, IONT.task); + m.add(taskRes, RDF.type, IONT.stresstest); + m.add(taskRes, IPROP.noOfWorkers, ResourceFactory.createTypedLiteral(workers.size())); + + for (HttpWorker worker : workers) { + HttpWorker.Config config = worker.config(); + + Resource workerRes = iresFactory.getWorkerResource(worker); + Resource connectionRes = IRES.getResource(config.connection().name()); + Resource datasetRes = IRES.getResource(config.connection().dataset().name()); // TODO: check if each connection only has one dataset + + m.add(taskRes, IPROP.workerResult, workerRes); + m.add(workerRes, RDF.type, IONT.worker); + m.add(workerRes, IPROP.workerID, ResourceFactory.createTypedLiteral(worker.getWorkerID())); + m.add(workerRes, IPROP.workerType, ResourceFactory.createTypedLiteral(worker.getClass().getSimpleName())); + m.add(workerRes, IPROP.noOfQueries, ResourceFactory.createTypedLiteral(config.queries().getQueryCount())); + m.add(workerRes, IPROP.timeOut, TimeUtils.createTypedDurationLiteral(config.timeout())); + if (config.completionTarget() instanceof HttpWorker.QueryMixes) + m.add(taskRes, IPROP.noOfQueryMixes, ResourceFactory.createTypedLiteral(((HttpWorker.QueryMixes) config.completionTarget()).number())); + if (config.completionTarget() instanceof HttpWorker.TimeLimit) + m.add(taskRes, IPROP.timeLimit, TimeUtils.createTypedDurationLiteral(((HttpWorker.TimeLimit) config.completionTarget()).duration())); + m.add(workerRes, IPROP.connection, connectionRes); + + m.add(connectionRes, RDF.type, IONT.connection); + m.add(connectionRes, RDFS.label, ResourceFactory.createTypedLiteral(config.connection().name())); + if (config.connection().version() != null) { + m.add(connectionRes, IPROP.version, ResourceFactory.createTypedLiteral(config.connection().version())); + } + m.add(connectionRes, IPROP.dataset, datasetRes); + + m.add(datasetRes, RDFS.label, ResourceFactory.createTypedLiteral(config.connection().dataset().name())); // TODO: is this always correct? only one dataset per connection? + m.add(datasetRes, RDF.type, IONT.dataset); + } + + // TODO: language processor thingy + + // Connect task and workers to the Query nodes, that store the triple stats. + for (var worker : workers) { + var config = worker.config(); + var workerQueryIDs = config.queries().getAllQueryIds(); + for (int i = 0; i < config.queries().getQueryCount(); i++) { + Resource workerQueryRes = iresFactory.getWorkerQueryResource(worker, i); + Resource queryRes = IRES.getResource(workerQueryIDs[i]); + m.add(workerQueryRes, IPROP.queryID, queryRes); // TODO: check this, (seems to be right) + } + + var taskQueryIDs = this.queryIDs.toArray(String[]::new); // make elements accessible by index + for (int i = 0; i < taskQueryIDs.length; i++) { + Resource taskQueryRes = iresFactory.getTaskQueryResource(taskQueryIDs[i]); + Resource queryRes = IRES.getResource(taskQueryIDs[i]); + m.add(taskQueryRes, IPROP.queryID, queryRes); // TODO: check this as well + } + } + + for (Metric metric : metrics) { + m.add(this.createMetricModel(metric)); + } + + // Task to queries + for (String queryID : queryIDs) { + m.add(taskRes, IPROP.query, iresFactory.getTaskQueryResource(queryID)); + } + + // Worker to queries + for (var worker : workers) { + for (int i = 0; i < worker.config().queries().getAllQueryIds().length; i++) { + Resource workerRes = iresFactory.getWorkerResource(worker); + m.add(workerRes, IPROP.query, iresFactory.getWorkerQueryResource(worker, i)); + } + } + + m.add(taskRes, IPROP.startDate, ResourceFactory.createTypedLiteral(start)); + m.add(taskRes, IPROP.endDate, ResourceFactory.createTypedLiteral(end)); + + m.setNsPrefixes(IGUANA_BASE.PREFIX_MAP); + + for (var storage : storages) { + storage.storeResult(m); + } + } + + /** + * For a given metric this method calculates the metric with the stored data and creates the appropriate + * RDF related to that metric. + * + * @param metric the metric that should be calculated + * @return the result model of the metric + */ + private Model createMetricModel(Metric metric) { + Model m = ModelFactory.createDefaultModel(); + Property metricProp = IPROP.createMetricProperty(metric); + Resource metricRes = IRES.getMetricResource(metric); + Resource taskRes = iresFactory.getTaskResource(); + + if (metric instanceof ModelWritingMetric) { + m.add(((ModelWritingMetric) metric).createMetricModel(this.workers, + this.workerQueryExecutions, + this.iresFactory)); + m.add(((ModelWritingMetric) metric).createMetricModel(this.workers, + this.taskQueryExecutions, + this.iresFactory)); + } + + if (metric instanceof TaskMetric) { + Number metricValue = ((TaskMetric) metric).calculateTaskMetric(this.workers, workerQueryExecutions); + if (metricValue != null) { + Literal lit = ResourceFactory.createTypedLiteral(metricValue); + m.add(taskRes, metricProp, lit); + } + m.add(taskRes, IPROP.metric, metricRes); + } + + if (metric instanceof WorkerMetric) { + for (var worker : workers) { + Resource workerRes = iresFactory.getWorkerResource(worker); + Number metricValue = ((WorkerMetric) metric).calculateWorkerMetric( + worker.config(), + workerQueryExecutions[(int) worker.getWorkerID()]); + if (metricValue != null) { + Literal lit = ResourceFactory.createTypedLiteral(metricValue); + m.add(workerRes, metricProp, lit); + } + m.add(workerRes, IPROP.metric, metricRes); + } + } + + if (metric instanceof QueryMetric) { + // queries grouped by worker + for (var worker : workers) { + for (int i = 0; i < worker.config().queries().getQueryCount(); i++) { + Number metricValue = ((QueryMetric) metric).calculateQueryMetric(workerQueryExecutions[(int) worker.getWorkerID()][i]); + if (metricValue != null) { + Literal lit = ResourceFactory.createTypedLiteral(metricValue); + Resource queryRes = iresFactory.getWorkerQueryResource(worker, i); + m.add(queryRes, metricProp, lit); + } + } + } + + // queries grouped by task + for (String queryID : queryIDs) { + Number metricValue = ((QueryMetric) metric).calculateQueryMetric(taskQueryExecutions.get(queryID)); + if (metricValue != null) { + Literal lit = ResourceFactory.createTypedLiteral(metricValue); + Resource queryRes = iresFactory.getTaskQueryResource(queryID); + m.add(queryRes, metricProp, lit); + } + } + } + + m.add(metricRes, RDFS.label, metric.getName()); + m.add(metricRes, RDFS.label, metric.getAbbreviation()); + m.add(metricRes, RDFS.comment, metric.getDescription()); + m.add(metricRes, RDF.type, IONT.getMetricClass(metric)); + m.add(metricRes, RDF.type, IONT.metric); + + return m; + } +} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/StresstestMetadata.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/StresstestMetadata.java deleted file mode 100644 index 66233de82..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/StresstestMetadata.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.aksw.iguana.cc.tasks.stresstest; - -import org.aksw.iguana.cc.worker.WorkerMetadata; -import org.apache.jena.rdf.model.Model; - -import java.util.Optional; -import java.util.Set; - -public record StresstestMetadata( - String suiteID, - String expID, - String taskID, - String datasetID, - String conID, - Optional conVersion, - String taskname, - String classname, - Optional timelimit, - Optional noOfQueryMixes, - WorkerMetadata[] workers, - Set queryIDs, - Optional simpleTriple, - Optional tripleStats -) {} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/StresstestResultProcessor.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/StresstestResultProcessor.java deleted file mode 100644 index c0f2dc790..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/StresstestResultProcessor.java +++ /dev/null @@ -1,230 +0,0 @@ -package org.aksw.iguana.cc.tasks.stresstest; - -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.worker.WorkerMetadata; -import org.aksw.iguana.cc.tasks.stresstest.metrics.*; -import org.aksw.iguana.cc.tasks.stresstest.storage.StorageManager; -import org.aksw.iguana.commons.rdf.IGUANA_BASE; -import org.aksw.iguana.commons.rdf.IONT; -import org.aksw.iguana.commons.rdf.IPROP; -import org.aksw.iguana.commons.rdf.IRES; -import org.apache.jena.rdf.model.*; -import org.apache.jena.vocabulary.RDF; -import org.apache.jena.vocabulary.RDFS; - -import java.util.*; - -public class StresstestResultProcessor { - - private final StresstestMetadata metadata; - private final List metrics; - - /** - * This array contains each query execution, grouped by each worker and each query. - */ - private List[][] workerQueryExecutions; - - /** - * This map contains each query execution, grouped by each query of the task. - */ - private Map> taskQueryExecutions; - - private final Resource taskRes; - - public StresstestResultProcessor(StresstestMetadata metadata) { - this.metadata = metadata; - this.taskRes = IRES.getResource(metadata.taskID()); - this.metrics = MetricManager.getMetrics(); - - WorkerMetadata[] workers = metadata.workers(); - this.workerQueryExecutions = new List[workers.length][]; - for (int i = 0; i < workers.length; i++) { - this.workerQueryExecutions[i] = new List[workers[i].numberOfQueries()]; - for (int j = 0; j < workers[i].numberOfQueries(); j++) { - this.workerQueryExecutions[i][j] = new LinkedList<>(); - } - } - - taskQueryExecutions = new HashMap<>(); - for (String queryID : metadata.queryIDs()) { - taskQueryExecutions.put(queryID, new ArrayList<>()); - } - } - - /** - * This method stores the given query executions statistics from a worker to their appropriate data location. - * - * @param worker the worker that has executed the queries - * @param data a collection of the query execution statistics - */ - public void processQueryExecutions(WorkerMetadata worker, Collection data) { - for(QueryExecutionStats stat : data) { - // The queryIDs returned by the queryHandler are Strings, in the form of ':'. - int queryID = Integer.parseInt(stat.queryID().substring(stat.queryID().indexOf(":") + 1)); - workerQueryExecutions[worker.workerID()][queryID].add(stat); - - taskQueryExecutions.get(stat.queryID()).add(stat); - } - } - - /** - * This method calculates the metrics and creates the RDF model of the result, which will be sent to the storages. - * It uses the given data that was passed with the 'processQueryExecutions' method. - * - * @param start the start date of the task - * @param end the end date of the task - */ - public void calculateAndSaveMetrics(Calendar start, Calendar end) { - Model m = ModelFactory.createDefaultModel(); - Resource suiteRes = IRES.getResource(metadata.suiteID()); - Resource experimentRes = IRES.getResource(metadata.expID()); - Resource datasetRes = IRES.getResource(metadata.datasetID()); - Resource connectionRes = IRES.getResource(metadata.conID()); - - m.add(suiteRes, IPROP.experiment, experimentRes); - m.add(suiteRes, RDF.type, IONT.suite); - m.add(experimentRes, IPROP.dataset, datasetRes); - m.add(experimentRes, RDF.type, IONT.experiment); - m.add(experimentRes, IPROP.task, taskRes); - m.add(datasetRes, RDFS.label, ResourceFactory.createTypedLiteral(metadata.datasetID())); - m.add(datasetRes, RDF.type, IONT.dataset); - m.add(taskRes, IPROP.connection, connectionRes); - if (metadata.noOfQueryMixes().isPresent()) - m.add(taskRes, IPROP.noOfQueryMixes, ResourceFactory.createTypedLiteral(metadata.noOfQueryMixes().get())); - m.add(taskRes, IPROP.noOfWorkers, ResourceFactory.createTypedLiteral(metadata.workers().length)); - if (metadata.timelimit().isPresent()) - m.add(taskRes, IPROP.timeLimit, ResourceFactory.createTypedLiteral(metadata.timelimit().get())); - m.add(taskRes, RDF.type, IONT.task); - - m.add(taskRes, RDF.type, IONT.getClass(metadata.classname())); - if (metadata.conVersion().isPresent()) - m.add(connectionRes, IPROP.version, ResourceFactory.createTypedLiteral(metadata.conVersion().get())); - m.add(connectionRes, RDFS.label, ResourceFactory.createTypedLiteral(metadata.conID())); - m.add(connectionRes, RDF.type, IONT.connection); - - for (WorkerMetadata worker : metadata.workers()) { - Resource workerRes = IRES.getWorkerResource(metadata.taskID(), worker.workerID()); - m.add(taskRes, IPROP.workerResult, workerRes); - m.add(workerRes, IPROP.workerID, ResourceFactory.createTypedLiteral(worker.workerID())); - m.add(workerRes, IPROP.workerType, ResourceFactory.createTypedLiteral(worker.workerType())); - m.add(workerRes, IPROP.noOfQueries, ResourceFactory.createTypedLiteral(worker.queryIDs().length)); - m.add(workerRes, IPROP.timeOut, ResourceFactory.createTypedLiteral(worker.timeout())); - m.add(workerRes, RDF.type, IONT.worker); - } - - if (metadata.tripleStats().isPresent()) { - m.add(metadata.tripleStats().get()); - // Connect task and workers to the Query nodes, that store the triple stats. - for (WorkerMetadata worker : metadata.workers()) { - for (String queryID : worker.queryIDs()) { - int intID = Integer.parseInt(queryID.substring(queryID.indexOf(":") + 1)); - Resource workerQueryRes = IRES.getWorkerQueryResource(metadata.taskID(), worker.workerID(), queryID); - Resource queryRes = IRES.getResource(worker.queryHash() + "/" + intID); - m.add(workerQueryRes, IPROP.queryID, queryRes); - } - - for (String queryID : metadata.queryIDs()) { - int intID = Integer.parseInt(queryID.substring(queryID.indexOf(":") + 1)); - Resource taskQueryRes = IRES.getTaskQueryResource(metadata.taskID(), queryID); - Resource queryRes = IRES.getResource(worker.queryHash() + "/" + intID); - m.add(taskQueryRes, IPROP.queryID, queryRes); - } - } - } - - for (Metric metric : metrics) { - m.add(this.createMetricModel(metric)); - } - - // Task to queries - for (String queryID : metadata.queryIDs()) { - m.add(taskRes, IPROP.query, IRES.getTaskQueryResource(metadata.taskID(), queryID)); - } - - // Worker to queries - for (WorkerMetadata worker : metadata.workers()) { - for (String queryID : worker.queryIDs()) { - Resource workerRes = IRES.getWorkerResource(metadata.taskID(), worker.workerID()); - m.add(workerRes, IPROP.query, IRES.getWorkerQueryResource(metadata.taskID(), worker.workerID(), queryID)); - } - } - - m.add(taskRes, IPROP.startDate, ResourceFactory.createTypedLiteral(start)); - m.add(taskRes, IPROP.endDate, ResourceFactory.createTypedLiteral(end)); - - m.setNsPrefixes(IGUANA_BASE.PREFIX_MAP); - - StorageManager.getInstance().storeResult(m); - } - - /** - * For a given metric this method calculates the metric with the stored data and creates the appropriate - * RDF related to that metric. - * - * @param metric the metric that should be calculated - * @return the result model of the metric - */ - private Model createMetricModel(Metric metric) { - Model m = ModelFactory.createDefaultModel(); - Property metricProp = IPROP.createMetricProperty(metric); - Resource metricRes = IRES.getMetricResource(metric); - - if (metric instanceof ModelWritingMetric) { - m.add(((ModelWritingMetric) metric).createMetricModel(metadata, workerQueryExecutions)); - m.add(((ModelWritingMetric) metric).createMetricModel(metadata, taskQueryExecutions)); - } - - if (metric instanceof TaskMetric) { - Number metricValue = ((TaskMetric) metric).calculateTaskMetric(metadata, workerQueryExecutions); - if (metricValue != null) { - Literal lit = ResourceFactory.createTypedLiteral(metricValue); - m.add(taskRes, metricProp, lit); - } - m.add(taskRes, IPROP.metric, metricRes); - } - - if (metric instanceof WorkerMetric) { - for (WorkerMetadata worker : metadata.workers()) { - Resource workerRes = IRES.getWorkerResource(metadata.taskID(), worker.workerID()); - Number metricValue = ((WorkerMetric) metric).calculateWorkerMetric(worker, workerQueryExecutions[worker.workerID()]); - if (metricValue != null) { - Literal lit = ResourceFactory.createTypedLiteral(metricValue); - m.add(workerRes, metricProp, lit); - } - m.add(workerRes, IPROP.metric, metricRes); - } - } - - if (metric instanceof QueryMetric) { - // queries grouped by worker - for (WorkerMetadata worker : metadata.workers()) { - for (int i = 0; i < worker.numberOfQueries(); i++) { - Number metricValue = ((QueryMetric) metric).calculateQueryMetric(workerQueryExecutions[worker.workerID()][i]); - if (metricValue != null) { - Literal lit = ResourceFactory.createTypedLiteral(metricValue); - Resource queryRes = IRES.getWorkerQueryResource(metadata.taskID(), worker.workerID(), worker.queryIDs()[i]); - m.add(queryRes, metricProp, lit); - } - } - } - - // queries grouped by task - for (String queryID : taskQueryExecutions.keySet()) { - Number metricValue = ((QueryMetric) metric).calculateQueryMetric(taskQueryExecutions.get(queryID)); - if (metricValue != null) { - Literal lit = ResourceFactory.createTypedLiteral(metricValue); - Resource queryRes = IRES.getTaskQueryResource(metadata.taskID(), queryID); - m.add(queryRes, metricProp, lit); - } - } - } - - m.add(metricRes, RDFS.label, metric.getName()); - m.add(metricRes, RDFS.label, metric.getAbbreviation()); - m.add(metricRes, RDFS.comment, metric.getDescription()); - m.add(metricRes, RDF.type, IONT.getMetricClass(metric)); - m.add(metricRes, RDF.type, IONT.metric); - - return m; - } -} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/ModelWritingMetric.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/ModelWritingMetric.java deleted file mode 100644 index bbce4aa49..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/ModelWritingMetric.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics; - -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.tasks.stresstest.StresstestMetadata; -import org.apache.jena.rdf.model.Model; -import org.apache.jena.rdf.model.ModelFactory; - -import javax.annotation.Nonnull; -import java.util.List; -import java.util.Map; - -public interface ModelWritingMetric { - default @Nonnull Model createMetricModel(StresstestMetadata task, List[][] data) { - return ModelFactory.createDefaultModel(); - } - - default @Nonnull Model createMetricModel(StresstestMetadata task, Map> data) { - return ModelFactory.createDefaultModel(); - } -} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/QueryMetric.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/QueryMetric.java deleted file mode 100644 index f4a793f1a..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/QueryMetric.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics; - -import org.aksw.iguana.cc.model.QueryExecutionStats; - -import java.util.List; - -public interface QueryMetric { - Number calculateQueryMetric(List data); -} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/TaskMetric.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/TaskMetric.java deleted file mode 100644 index 6995f24d1..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/TaskMetric.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics; - -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.tasks.stresstest.StresstestMetadata; - -import java.util.List; - -public interface TaskMetric { - Number calculateTaskMetric(StresstestMetadata task, List[][] data); -} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/WorkerMetric.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/WorkerMetric.java deleted file mode 100644 index bc81071e6..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/WorkerMetric.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics; - -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.worker.WorkerMetadata; - -import java.util.List; - -public interface WorkerMetric { - Number calculateWorkerMetric(WorkerMetadata worker, List[] data); -} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/AggregatedExecutionStatistics.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/AggregatedExecutionStatistics.java deleted file mode 100644 index a2e332cba..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/AggregatedExecutionStatistics.java +++ /dev/null @@ -1,101 +0,0 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics.impl; - -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.tasks.stresstest.StresstestMetadata; -import org.aksw.iguana.cc.worker.WorkerMetadata; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.ModelWritingMetric; -import org.aksw.iguana.commons.annotation.Shorthand; -import org.aksw.iguana.commons.constants.COMMON; -import org.aksw.iguana.commons.rdf.IONT; -import org.aksw.iguana.commons.rdf.IPROP; -import org.aksw.iguana.commons.rdf.IRES; -import org.apache.jena.rdf.model.Model; -import org.apache.jena.rdf.model.ModelFactory; -import org.apache.jena.rdf.model.Resource; -import org.apache.jena.rdf.model.ResourceFactory; -import org.apache.jena.vocabulary.RDF; - -import javax.annotation.Nonnull; -import java.math.BigInteger; -import java.time.Duration; -import java.util.List; -import java.util.Map; - -import static org.aksw.iguana.commons.time.TimeUtils.toXSDDurationInSeconds; - -@Shorthand("AES") -public class AggregatedExecutionStatistics extends Metric implements ModelWritingMetric { - - public AggregatedExecutionStatistics() { - super("Aggregated Execution Statistics", "AES", "Sums up the statistics of each query execution for each query a worker and task has. The result size only contains the value of the last execution."); - } - - @Override - @Nonnull - public Model createMetricModel(StresstestMetadata task, List[][] data) { - Model m = ModelFactory.createDefaultModel(); - for (WorkerMetadata worker : task.workers()) { - for (int i = 0; i < worker.numberOfQueries(); i++) { - Resource queryRes = IRES.getWorkerQueryResource(task.taskID(), worker.workerID(), worker.queryIDs()[i]); - m.add(createAggregatedModel(data[worker.workerID()][i], queryRes)); - } - } - return m; - } - - @Override - @Nonnull - public Model createMetricModel(StresstestMetadata task, Map> data) { - Model m = ModelFactory.createDefaultModel(); - for (String queryID : data.keySet()) { - Resource queryRes = IRES.getTaskQueryResource(task.taskID(), queryID); - m.add(createAggregatedModel(data.get(queryID), queryRes)); - } - return m; - } - - private static Model createAggregatedModel(List data, Resource queryRes) { - Model m = ModelFactory.createDefaultModel(); - BigInteger succeeded = BigInteger.ZERO; - BigInteger failed = BigInteger.ZERO; - BigInteger resultSize = BigInteger.ZERO; - BigInteger wrongCodes = BigInteger.ZERO; - BigInteger timeOuts = BigInteger.ZERO; - BigInteger unknownExceptions = BigInteger.ZERO; - Duration totalTime = Duration.ZERO; - - for (QueryExecutionStats exec : data) { - // TODO: make response code integer - switch ((int) exec.responseCode()) { - case (int) COMMON.QUERY_SUCCESS -> succeeded = succeeded.add(BigInteger.ONE); - case (int) COMMON.QUERY_SOCKET_TIMEOUT -> { - timeOuts = timeOuts.add(BigInteger.ONE); - failed = failed.add(BigInteger.ONE); - } - case (int) COMMON.QUERY_HTTP_FAILURE -> { - wrongCodes = wrongCodes.add(BigInteger.ONE); - failed = failed.add(BigInteger.ONE); - } - case (int) COMMON.QUERY_UNKNOWN_EXCEPTION -> { - unknownExceptions = unknownExceptions.add(BigInteger.ONE); - failed = failed.add(BigInteger.ONE); - } - } - - totalTime = totalTime.plusNanos((long) (exec.executionTime() * 1000000)); - resultSize = BigInteger.valueOf(exec.resultSize()); - } - - m.add(queryRes, IPROP.succeeded, ResourceFactory.createTypedLiteral(succeeded)); - m.add(queryRes, IPROP.failed, ResourceFactory.createTypedLiteral(failed)); - m.add(queryRes, IPROP.resultSize, ResourceFactory.createTypedLiteral(resultSize)); - m.add(queryRes, IPROP.timeOuts, ResourceFactory.createTypedLiteral(timeOuts)); - m.add(queryRes, IPROP.wrongCodes, ResourceFactory.createTypedLiteral(wrongCodes)); - m.add(queryRes, IPROP.unknownException, ResourceFactory.createTypedLiteral(unknownExceptions)); - m.add(queryRes, IPROP.totalTime, ResourceFactory.createTypedLiteral(toXSDDurationInSeconds(totalTime))); - m.add(queryRes, RDF.type, IONT.executedQuery); - - return m; - } -} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/EachExecutionStatistic.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/EachExecutionStatistic.java deleted file mode 100644 index f3771943a..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/EachExecutionStatistic.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics.impl; - -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.tasks.stresstest.StresstestMetadata; -import org.aksw.iguana.cc.worker.WorkerMetadata; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.ModelWritingMetric; -import org.aksw.iguana.commons.annotation.Shorthand; -import org.aksw.iguana.commons.constants.COMMON; -import org.aksw.iguana.commons.rdf.IPROP; -import org.aksw.iguana.commons.rdf.IRES; -import org.apache.jena.rdf.model.Model; -import org.apache.jena.rdf.model.ModelFactory; -import org.apache.jena.rdf.model.Resource; -import org.apache.jena.rdf.model.ResourceFactory; - -import javax.annotation.Nonnull; -import java.math.BigInteger; -import java.util.List; - -@Shorthand("EachQuery") -public class EachExecutionStatistic extends Metric implements ModelWritingMetric { - - public EachExecutionStatistic() { - super("Each Query Execution Statistic", "EachQuery", "This metric saves the statistics of each query execution."); - } - - @Override - @Nonnull - public Model createMetricModel(StresstestMetadata task, List[][] data) { - Model m = ModelFactory.createDefaultModel(); - for (WorkerMetadata worker : task.workers()) { - for (int i = 0; i < worker.numberOfQueries(); i++) { - Resource queryRes = IRES.getWorkerQueryResource(task.taskID(), worker.workerID(), worker.queryIDs()[i]); - Resource query = IRES.getResource(worker.queryHash() + "/" + worker.queryIDs()[i]); - BigInteger run = BigInteger.ONE; - for (QueryExecutionStats exec : data[worker.workerID()][i]) { - Resource runRes = IRES.getWorkerQueryRunResource(task.taskID(), worker.workerID(), worker.queryIDs()[i], run); - m.add(queryRes, IPROP.queryExecution, runRes); - m.add(runRes, IPROP.time, ResourceFactory.createTypedLiteral(exec.executionTime())); - m.add(runRes, IPROP.success, ResourceFactory.createTypedLiteral(exec.responseCode() == COMMON.QUERY_SUCCESS)); - m.add(runRes, IPROP.run, ResourceFactory.createTypedLiteral(run)); - m.add(runRes, IPROP.code, ResourceFactory.createTypedLiteral(exec.responseCode())); - m.add(runRes, IPROP.resultSize, ResourceFactory.createTypedLiteral(exec.resultSize())); - m.add(runRes, IPROP.queryID, query); - run = run.add(BigInteger.ONE); - } - } - } - return m; - } -} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/NoQ.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/NoQ.java deleted file mode 100644 index e1af28be1..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/NoQ.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics.impl; - -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.tasks.stresstest.StresstestMetadata; -import org.aksw.iguana.cc.worker.WorkerMetadata; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.TaskMetric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.WorkerMetric; -import org.aksw.iguana.commons.annotation.Shorthand; -import org.aksw.iguana.commons.constants.COMMON; - -import java.math.BigInteger; -import java.util.List; - -@Shorthand("NoQ") -public class NoQ extends Metric implements TaskMetric, WorkerMetric { - - public NoQ() { - super("Number of Queries", "NoQ", "This metric calculates the number of successfully executed queries."); - } - - @Override - public Number calculateTaskMetric(StresstestMetadata task, List[][] data) { - BigInteger sum = BigInteger.ZERO; - for (WorkerMetadata worker : task.workers()) { - sum = sum.add((BigInteger) this.calculateWorkerMetric(worker, data[worker.workerID()])); - } - return sum; - } - - @Override - public Number calculateWorkerMetric(WorkerMetadata worker, List[] data) { - BigInteger sum = BigInteger.ZERO; - for (List datum : data) { - for (QueryExecutionStats exec : datum) { - if (exec.responseCode() == COMMON.QUERY_SUCCESS) { - sum = sum.add(BigInteger.ONE); - } - } - } - return sum; - } -} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/NoQPH.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/NoQPH.java deleted file mode 100644 index 4015d3df9..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/NoQPH.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics.impl; - -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.tasks.stresstest.StresstestMetadata; -import org.aksw.iguana.cc.worker.WorkerMetadata; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.TaskMetric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.WorkerMetric; -import org.aksw.iguana.commons.annotation.Shorthand; -import org.aksw.iguana.commons.constants.COMMON; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.math.RoundingMode; -import java.time.Duration; -import java.util.List; - -@Shorthand("NoQPH") -public class NoQPH extends Metric implements TaskMetric, WorkerMetric { - - public NoQPH() { - super("Number of Queries per Hour", "NoQPH", "This metric calculates the number of successfully executed queries per hour."); - } - @Override - public Number calculateTaskMetric(StresstestMetadata task, List[][] data) { - BigDecimal sum = BigDecimal.ZERO; - for (WorkerMetadata worker : task.workers()) { - sum = sum.add((BigDecimal) this.calculateWorkerMetric(worker, data[worker.workerID()])); - } - return sum; - } - - @Override - public Number calculateWorkerMetric(WorkerMetadata worker, List[] data) { - BigDecimal successes = BigDecimal.ZERO; - Duration totalTime = Duration.ZERO; - for (List datum : data) { - for (QueryExecutionStats exec : datum) { - if (exec.responseCode() == COMMON.QUERY_SUCCESS) { - successes = successes.add(BigDecimal.ONE); - totalTime = totalTime.plusNanos((long) exec.executionTime() * 1000000); - } - } - } - BigDecimal tt = (new BigDecimal(BigInteger.valueOf(totalTime.toNanos()), 9)).divide(BigDecimal.valueOf(3600), 20, RoundingMode.HALF_UP); - - try { - return successes.divide(tt, 10, RoundingMode.HALF_UP); - } catch (ArithmeticException e) { - return BigDecimal.ZERO; - } - } -} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/QMPH.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/QMPH.java deleted file mode 100644 index c3a3e01cf..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/metrics/impl/QMPH.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.aksw.iguana.cc.tasks.stresstest.metrics.impl; - -import org.aksw.iguana.cc.model.QueryExecutionStats; -import org.aksw.iguana.cc.tasks.stresstest.StresstestMetadata; -import org.aksw.iguana.cc.worker.WorkerMetadata; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.TaskMetric; -import org.aksw.iguana.cc.tasks.stresstest.metrics.WorkerMetric; -import org.aksw.iguana.commons.annotation.Shorthand; -import org.aksw.iguana.commons.constants.COMMON; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.math.RoundingMode; -import java.time.Duration; -import java.util.List; - -@Shorthand("QMPH") -public class QMPH extends Metric implements TaskMetric, WorkerMetric { - - public QMPH() { - super("Query Mixes per Hour", "QMPH", "This metric calculates the amount of query mixes (a given set of queries) that are executed per hour."); - } - @Override - public Number calculateTaskMetric(StresstestMetadata task, List[][] data) { - BigDecimal sum = BigDecimal.ZERO; - for (WorkerMetadata worker : task.workers()) { - sum = sum.add((BigDecimal) this.calculateWorkerMetric(worker, data[worker.workerID()])); - } - return sum; - } - - @Override - public Number calculateWorkerMetric(WorkerMetadata worker, List[] data) { - BigDecimal successes = BigDecimal.ZERO; - BigDecimal noq = BigDecimal.valueOf(worker.numberOfQueries()); - Duration totalTime = Duration.ZERO; - for (List datum : data) { - for (QueryExecutionStats exec : datum) { - if (exec.responseCode() == COMMON.QUERY_SUCCESS) { - successes = successes.add(BigDecimal.ONE); - totalTime = totalTime.plusNanos((long) exec.executionTime() * 1000000); - } - } - } - BigDecimal tt = (new BigDecimal(BigInteger.valueOf(totalTime.toNanos()), 9)).divide(BigDecimal.valueOf(3600), 20, RoundingMode.HALF_UP); - - try { - return successes.divide(tt, 10, RoundingMode.HALF_UP).divide(noq, 10, RoundingMode.HALF_UP); - } catch (ArithmeticException e) { - return BigDecimal.ZERO; - } - } -} diff --git a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/TripleBasedStorage.java b/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/TripleBasedStorage.java deleted file mode 100644 index 599113307..000000000 --- a/src/main/java/org/aksw/iguana/cc/tasks/stresstest/storage/TripleBasedStorage.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * - */ -package org.aksw.iguana.cc.tasks.stresstest.storage; - -import org.aksw.iguana.commons.constants.COMMON; -import org.apache.jena.rdf.model.Model; -import org.apache.jena.rdf.model.ModelFactory; - -/** - * This Storage will save all the metric results as triples - * - * @author f.conrads - * - */ -public abstract class TripleBasedStorage implements Storage { - - protected String baseUri = COMMON.BASE_URI; - protected Model metricResults = ModelFactory.createDefaultModel(); - - @Override - public String toString() { - return this.getClass().getSimpleName(); - } - - public void storeResult(Model data){ - metricResults.add(data); - } -} diff --git a/src/main/java/org/aksw/iguana/cc/worker/HttpWorker.java b/src/main/java/org/aksw/iguana/cc/worker/HttpWorker.java index 7cfb7424e..2044b1ba4 100644 --- a/src/main/java/org/aksw/iguana/cc/worker/HttpWorker.java +++ b/src/main/java/org/aksw/iguana/cc/worker/HttpWorker.java @@ -11,6 +11,7 @@ import java.util.Base64; import java.util.List; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; /** @@ -47,21 +48,74 @@ public interface Config { boolean parseResults(); } - public record ExecutionStats( // TODO: queryID, there should also probably be a clearer way to tell, if the query was successful or not + public record ExecutionStats( + int queryID, Instant startTime, - Optional duration, - int httpStatusCode, - long contentLength, - Long responseBodyHash, - Exception error - ) {} + Duration duration, // should always exist + Optional httpStatusCode, + OptionalLong contentLength, + OptionalLong responseBodyHash, + Optional error + ) { + public enum END_STATE { + SUCCESS(1), // values are the same as previous implementation for backwards compatibility + TIMEOUT(-1), + HTTP_ERROR(-2), + MISCELLANEOUS_EXCEPTION(0); + + public final int value; + END_STATE(int value) { + this.value = value; + } + } + + public END_STATE endState() { + if (successful()) { + return END_STATE.SUCCESS; + } else if (timeout()) { + return END_STATE.TIMEOUT; + } else if (httpError()) { + return END_STATE.HTTP_ERROR; + } else { + return END_STATE.MISCELLANEOUS_EXCEPTION; + } + } + + public boolean completed() { + return httpStatusCode().isPresent(); + } + + public boolean successful() { + if (completed() && error().isEmpty()) { + return httpStatusCode().get() / 100 == 2; + } else { + return false; + } + } + + public boolean timeout() { + if (!successful() && error().isPresent()) { + return error().get() instanceof java.net.SocketTimeoutException; + } else { + return false; + } + } + + public boolean httpError() { + return httpStatusCode().isPresent() && httpStatusCode().orElse(200) / 100 != 2; + } + + public boolean miscellaneousException() { + return error().isPresent() && !timeout() && !httpError() && !successful(); + } + } public record Result(long workerID, List executionStats) {} @JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION) @JsonSubTypes({ @JsonSubTypes.Type(value = TimeLimit.class), - @JsonSubTypes.Type(value = QueryMixes.class), + @JsonSubTypes.Type(value = QueryMixes.class) }) sealed public interface CompletionTarget permits TimeLimit, QueryMixes {} @@ -69,12 +123,12 @@ public record TimeLimit(@JsonProperty(required = true) Duration duration) implem public record QueryMixes(@JsonProperty(required = true) int number) implements CompletionTarget {} - final protected long workerId; + final protected long workerID; final protected Config config; final protected ResponseBodyProcessor responseBodyProcessor; - public HttpWorker(long workerId, ResponseBodyProcessor responseBodyProcessor, Config config) { - this.workerId = workerId; + public HttpWorker(long workerID, ResponseBodyProcessor responseBodyProcessor, Config config) { + this.workerID = workerID; this.responseBodyProcessor = responseBodyProcessor; this.config = config; } @@ -85,4 +139,11 @@ public static String basicAuth(String username, String password) { public abstract CompletableFuture start(); + public Config config() { + return this.config; + } + + public long getWorkerID() { + return this.workerID; + } } diff --git a/src/main/java/org/aksw/iguana/cc/worker/WorkerMetadata.java b/src/main/java/org/aksw/iguana/cc/worker/WorkerMetadata.java deleted file mode 100644 index 678306aba..000000000 --- a/src/main/java/org/aksw/iguana/cc/worker/WorkerMetadata.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.aksw.iguana.cc.worker; - -public record WorkerMetadata( - int workerID, - String workerType, - double timeout, - int numberOfQueries, - int queryHash, - String[] queryIDs -) {} diff --git a/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java b/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java index c6dcd811b..7576b6d56 100644 --- a/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java +++ b/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java @@ -143,6 +143,7 @@ public Config(Integer number, } record HttpExecutionResult( + int queryID, Optional> response, Instant requestStart, Duration duration, @@ -232,11 +233,9 @@ public CompletableFuture start() { private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure) throws IOException, URISyntaxException { HttpExecutionResult result = executeHttpRequest(timeout); - - var statusCode = -1; - if (result.response().isPresent()) { - statusCode = result.response().get().statusCode(); - } + Optional statuscode = Optional.empty(); + if (result.response().isPresent()) + statuscode = Optional.of(result.response().get().statusCode()); if (result.successful()) { // 2xx // process result @@ -252,12 +251,13 @@ private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure) } return new ExecutionStats( + result.queryID(), result.requestStart(), - (result.successful()) ? Optional.of(result.duration) : Optional.empty(), - statusCode, - result.actualContentLength().orElse(0L), - result.hash.orElse(0L), - result.exception().orElse(null) + result.duration(), + statuscode, + result.actualContentLength(), + result.hash, + result.exception() ); } @@ -275,6 +275,7 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) throws IOExcept BiFunction, Exception, HttpExecutionResult> createFailedResult = (response, e) -> { final Duration requestDuration = Duration.between(requestStart, Instant.now()); return new HttpExecutionResult( + queryHandle.index(), Optional.ofNullable(response), requestStart, requestDuration, @@ -305,6 +306,7 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) throws IOExcept } return new HttpExecutionResult( + queryHandle.index(), Optional.of(httpResponse), requestStart, Duration.between(requestStart, Instant.now()), diff --git a/src/main/java/org/aksw/iguana/commons/rdf/IONT.java b/src/main/java/org/aksw/iguana/commons/rdf/IONT.java index b9027f9c0..f3b36d337 100644 --- a/src/main/java/org/aksw/iguana/commons/rdf/IONT.java +++ b/src/main/java/org/aksw/iguana/commons/rdf/IONT.java @@ -1,6 +1,6 @@ package org.aksw.iguana.commons.rdf; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; +import org.aksw.iguana.cc.metrics.Metric; import org.apache.jena.rdf.model.Resource; import org.apache.jena.rdf.model.ResourceFactory; diff --git a/src/main/java/org/aksw/iguana/commons/rdf/IPROP.java b/src/main/java/org/aksw/iguana/commons/rdf/IPROP.java index 81eeddd20..369a7710b 100644 --- a/src/main/java/org/aksw/iguana/commons/rdf/IPROP.java +++ b/src/main/java/org/aksw/iguana/commons/rdf/IPROP.java @@ -1,6 +1,6 @@ package org.aksw.iguana.commons.rdf; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; +import org.aksw.iguana.cc.metrics.Metric; import org.apache.jena.rdf.model.Property; import org.apache.jena.rdf.model.ResourceFactory; diff --git a/src/main/java/org/aksw/iguana/commons/rdf/IRES.java b/src/main/java/org/aksw/iguana/commons/rdf/IRES.java index 49a01ea3d..0603a775f 100644 --- a/src/main/java/org/aksw/iguana/commons/rdf/IRES.java +++ b/src/main/java/org/aksw/iguana/commons/rdf/IRES.java @@ -1,6 +1,7 @@ package org.aksw.iguana.commons.rdf; -import org.aksw.iguana.cc.tasks.stresstest.metrics.Metric; +import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.worker.HttpWorker; import org.apache.jena.rdf.model.Resource; import org.apache.jena.rdf.model.ResourceFactory; @@ -10,9 +11,6 @@ public class IRES { public static final String NS = IGUANA_BASE.NS + "resource" + "/"; public static final String PREFIX = "ires"; - private IRES() { - } - /** * The RDF-friendly version of the IRES namespace * with trailing / character. @@ -44,4 +42,42 @@ public static Resource getMetricResource(Metric metric) { public static Resource getWorkerQueryRunResource(String taskID, int workerID, String queryID, BigInteger run) { return ResourceFactory.createResource(NS + taskID + "/" + workerID + "/" + queryID + "/" + run); } + + public static class Factory { + + private final long suiteID; + private final long taskID; + + private final String taskURI; + + public Factory(long suiteID, long taskID) { + this.suiteID = suiteID; + this.taskID = taskID; + this.taskURI = NS + suiteID + "/" + taskID; + } + + public Resource getSuiteResource() { + return ResourceFactory.createResource(NS + suiteID); + } + + public Resource getTaskResource() { + return ResourceFactory.createResource(this.taskURI); + } + + public Resource getWorkerResource(HttpWorker worker) { + return ResourceFactory.createResource(this.taskURI + "/" + worker.getWorkerID()); + } + + public Resource getTaskQueryResource(String queryID) { + return ResourceFactory.createResource(this.taskURI + "/" + queryID); + } + + public Resource getWorkerQueryResource(HttpWorker worker, int index) { + return ResourceFactory.createResource(this.taskURI + "/" + worker.getWorkerID() + "/" + worker.config().queries().getQueryId(index)); + } + + public Resource getWorkerQueryRunResource(HttpWorker worker, int index, BigInteger run) { + return ResourceFactory.createResource(this.taskURI + "/" + worker.getWorkerID() + "/" + worker.config().queries().getQueryId(index) + "/" + run); + } + } } diff --git a/src/main/java/org/aksw/iguana/commons/time/TimeUtils.java b/src/main/java/org/aksw/iguana/commons/time/TimeUtils.java index 46c2e13fa..42954f868 100644 --- a/src/main/java/org/aksw/iguana/commons/time/TimeUtils.java +++ b/src/main/java/org/aksw/iguana/commons/time/TimeUtils.java @@ -2,6 +2,8 @@ import org.apache.jena.datatypes.xsd.XSDDuration; import org.apache.jena.datatypes.xsd.impl.XSDDurationType; +import org.apache.jena.rdf.model.Literal; +import org.apache.jena.rdf.model.ResourceFactory; import java.math.BigDecimal; import java.math.BigInteger; @@ -44,4 +46,8 @@ public static double durationInMilliseconds(Instant start, Instant end) { public static XSDDuration toXSDDurationInSeconds(Duration duration) { return (XSDDuration) new XSDDurationType().parse("PT" + new BigDecimal(BigInteger.valueOf(duration.toNanos()), 9).toPlainString() + "S"); } + + public static Literal createTypedDurationLiteral(Duration duration) { + return ResourceFactory.createTypedLiteral(new XSDDurationType().parse(duration.toString())); + } } diff --git a/src/test/java/org/aksw/iguana/cc/config/elements/StorageConfigTest.java b/src/test/java/org/aksw/iguana/cc/config/elements/StorageConfigTest.java index 163f22f4f..890b6b2d5 100644 --- a/src/test/java/org/aksw/iguana/cc/config/elements/StorageConfigTest.java +++ b/src/test/java/org/aksw/iguana/cc/config/elements/StorageConfigTest.java @@ -1,7 +1,7 @@ package org.aksw.iguana.cc.config.elements; import com.fasterxml.jackson.databind.ObjectMapper; -import org.aksw.iguana.rp.storage.impl.RDFFileStorage; +import org.aksw.iguana.cc.storage.impl.RDFFileStorage; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; diff --git a/src/test/java/org/aksw/iguana/cc/tasks/MockupStorage.java b/src/test/java/org/aksw/iguana/cc/tasks/MockupStorage.java index 145a72570..e43226caa 100644 --- a/src/test/java/org/aksw/iguana/cc/tasks/MockupStorage.java +++ b/src/test/java/org/aksw/iguana/cc/tasks/MockupStorage.java @@ -1,6 +1,6 @@ package org.aksw.iguana.cc.tasks; -import org.aksw.iguana.cc.tasks.stresstest.storage.Storage; +import org.aksw.iguana.cc.storage.Storage; import org.apache.jena.rdf.model.Model; import org.apache.jena.rdf.model.ModelFactory; diff --git a/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/CSVStorageTest.java b/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/CSVStorageTest.java index 054d6e347..af2ec554b 100644 --- a/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/CSVStorageTest.java +++ b/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/CSVStorageTest.java @@ -2,13 +2,13 @@ import com.opencsv.CSVReader; import com.opencsv.exceptions.CsvException; -import org.aksw.iguana.cc.config.IguanaConfig; -import org.aksw.iguana.cc.tasks.stresstest.metrics.*; -import org.aksw.iguana.cc.tasks.stresstest.metrics.impl.AggregatedExecutionStatistics; -import org.aksw.iguana.cc.tasks.stresstest.metrics.impl.NoQ; -import org.aksw.iguana.cc.tasks.stresstest.metrics.impl.NoQPH; -import org.aksw.iguana.cc.tasks.stresstest.metrics.impl.QPS; -import org.aksw.iguana.cc.tasks.stresstest.storage.impl.CSVStorage; +import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.metrics.MetricManager; +import org.aksw.iguana.cc.metrics.impl.AggregatedExecutionStatistics; +import org.aksw.iguana.cc.metrics.impl.NoQ; +import org.aksw.iguana.cc.metrics.impl.NoQPH; +import org.aksw.iguana.cc.metrics.impl.QPS; +import org.aksw.iguana.cc.storage.impl.CSVStorage; import org.aksw.iguana.commons.rdf.IONT; import org.aksw.iguana.commons.rdf.IPROP; import org.aksw.iguana.commons.rdf.IRES; @@ -44,7 +44,9 @@ public class CSVStorageTest { @BeforeEach public void setup() throws IOException { this.folder = Files.createTempDirectory("iguana-CSVStorage-test"); - this.suiteFolder = folder.resolve(IguanaConfig.getSuiteID()); + + // TODO: suiteid + this.suiteFolder = folder.resolve("suite"); } public static Arguments createTestData1() { diff --git a/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/NTFileStorageTest.java b/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/NTFileStorageTest.java deleted file mode 100644 index 305d00992..000000000 --- a/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/NTFileStorageTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * - */ -package org.aksw.iguana.cc.tasks.storage.impl; - -import org.aksw.iguana.cc.tasks.stresstest.storage.impl.NTFileStorage; -import org.aksw.iguana.cc.tasks.stresstest.storage.Storage; -import org.apache.jena.rdf.model.*; -import org.apache.jena.vocabulary.RDFS; -import org.junit.Test; - -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * - * This will test the NTFileStorage in short. - * - * @author f.conrads - * - */ -public class NTFileStorageTest { - - - @Test - public void dataTest() throws IOException{ - Storage store = new NTFileStorage("results_test2.nt"); - - new File("results_test2.nt").delete(); - - Model m = ModelFactory.createDefaultModel(); - m.read(new FileReader("src/test/resources/nt/results_test1.nt"), null, "N-TRIPLE"); - - store.storeResult(m); - assertEqual("results_test2.nt","src/test/resources/nt/results_test1.nt", true); - new File("results_test2.nt").delete(); - - } - - /** - * Checks if two ntriple files are equal by loading them into a model and check if they have the same size - * and by removing the actual model from the expected, if the new size after removal equals 0 they are the same - * - * @param actualFile - * @param expectedFile - * @throws IOException - */ - public void assertEqual(String actualFile, String expectedFile, boolean ignoreDate) throws IOException{ - Model expected = ModelFactory.createDefaultModel(); - expected.read(new FileReader(expectedFile), null, "N-TRIPLE"); - Model actual = ModelFactory.createDefaultModel(); - actual.read(new FileReader(actualFile), null, "N-TRIPLE"); - assertEquals(expected.size(), actual.size()); - expected.remove(actual); - if(!ignoreDate){ - //Remove startDate as they are different, just check if actual contains a start date - Property startDate =ResourceFactory.createProperty(RDFS.getURI()+"startDate"); - assertTrue(actual.contains(null, startDate, (RDFNode)null)); - List stmts = expected.listStatements(null, startDate, (RDFNode)null).toList(); - assertEquals(1, stmts.size()); - expected.remove(stmts); - } - - assertEquals(0, expected.size()); - } -} diff --git a/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/RDFFileStorageTest.java b/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/RDFFileStorageTest.java index 757d0e6a0..48527c582 100644 --- a/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/RDFFileStorageTest.java +++ b/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/RDFFileStorageTest.java @@ -3,8 +3,8 @@ */ package org.aksw.iguana.cc.tasks.storage.impl; -import org.aksw.iguana.cc.tasks.stresstest.storage.impl.RDFFileStorage; -import org.aksw.iguana.cc.tasks.stresstest.storage.Storage; +import org.aksw.iguana.cc.storage.impl.RDFFileStorage; +import org.aksw.iguana.cc.storage.Storage; import org.apache.jena.rdf.model.*; import org.apache.jena.riot.RDFLanguages; import org.apache.jena.vocabulary.RDFS; diff --git a/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/TriplestoreStorageTest.java b/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/TriplestoreStorageTest.java index 26ba9a0e4..1a4493319 100644 --- a/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/TriplestoreStorageTest.java +++ b/src/test/java/org/aksw/iguana/cc/tasks/storage/impl/TriplestoreStorageTest.java @@ -1,6 +1,6 @@ package org.aksw.iguana.cc.tasks.storage.impl; -import org.aksw.iguana.cc.tasks.stresstest.storage.impl.TriplestoreStorage; +import org.aksw.iguana.cc.storage.impl.TriplestoreStorage; import org.aksw.iguana.commons.constants.COMMON; import org.aksw.iguana.cc.tasks.ServerMock; import org.apache.jena.rdf.model.Model; diff --git a/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java b/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java index f18bb49a3..24a6efc61 100644 --- a/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java +++ b/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java @@ -24,7 +24,6 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayDeque; -import java.util.Optional; import java.util.stream.Stream; import static com.github.tomakehurst.wiremock.client.WireMock.*; @@ -117,12 +116,11 @@ public void testRequestFactory(SPARQLProtocolWorker worker) { final HttpWorker.Result result = worker.start().join(); assertEquals(result.executionStats().size(), QUERY_MIXES, "Worker should have executed only 1 query"); - assertNull(result.executionStats().get(0).error(), "Worker threw an exception, during execution"); - assertEquals(200, result.executionStats().get(0).httpStatusCode(), "Worker returned wrong status code"); - assertTrue(result.executionStats().get(0).duration().isPresent(), "Worker didn't return a duration"); - assertNotEquals(Duration.ZERO, result.executionStats().get(0).duration().get(), "Worker returned zero duration"); - assertNotEquals(0, result.executionStats().get(0).responseBodyHash(), "Worker didn't return a response body hash"); - assertEquals("Non-Empty-Body".getBytes(StandardCharsets.UTF_8).length, result.executionStats().get(0).contentLength(), "Worker returned wrong content length"); + assertNull(result.executionStats().get(0).error().orElse(null), "Worker threw an exception, during execution"); + assertEquals(200, result.executionStats().get(0).httpStatusCode().get(), "Worker returned wrong status code"); + assertNotEquals(Duration.ZERO, result.executionStats().get(0).duration(), "Worker returned zero duration"); + assertNotEquals(0, result.executionStats().get(0).responseBodyHash().getAsLong(), "Worker didn't return a response body hash"); + assertEquals("Non-Empty-Body".getBytes(StandardCharsets.UTF_8).length, result.executionStats().get(0).contentLength().getAsLong(), "Worker returned wrong content length"); } @DisplayName("Test Malformed Response Processing") @@ -134,8 +132,7 @@ public void testMalformedResponseProcessing(Fault fault) throws IOException, URI .willReturn(aResponse().withFault(fault))); final HttpWorker.Result result = worker.start().join(); assertEquals(1, result.executionStats().size()); - assertNotNull(result.executionStats().get(0).error()); - assertEquals(Optional.empty(), result.executionStats().get(0).duration()); + assertNotNull(result.executionStats().get(0).error().orElse(null)); } @Test