From 99fa1eea7b055e8280fe70196e4994874b9e1c7a Mon Sep 17 00:00:00 2001 From: Sebastian Peter <14994800+sebastian-peter@users.noreply.github.com> Date: Tue, 18 Jan 2022 18:30:53 +0100 Subject: [PATCH 01/14] Introducing primary data from sql --- build.gradle | 2 +- .../resources/config/config-template.conf | 7 +- .../edu/ie3/simona/config/SimonaConfig.scala | 31 ++-- .../primary/PrimaryServiceWorker.scala | 161 +++++++++++------- .../weather/WeatherSourceWrapper.scala | 2 +- .../edu/ie3/simona/util/ConfigUtil.scala | 6 +- .../primary/PrimaryServiceProxySpec.scala | 6 +- 7 files changed, 126 insertions(+), 89 deletions(-) diff --git a/build.gradle b/build.gradle index 1257ed2bf3..b9a125cefa 100644 --- a/build.gradle +++ b/build.gradle @@ -67,7 +67,7 @@ dependencies { /* Exclude our own nested dependencies */ exclude group: 'com.github.ie3-institute' } - implementation('com.github.ie3-institute:PowerSystemDataModel:2.1.0') { + implementation('com.github.ie3-institute:PowerSystemDataModel:3.0-SNAPSHOT') { exclude group: 'org.apache.logging.log4j' exclude group: 'org.slf4j' /* Exclude our own nested dependencies */ diff --git a/src/main/resources/config/config-template.conf b/src/main/resources/config/config-template.conf index 8e6952e8f0..4848267d2c 100644 --- a/src/main/resources/config/config-template.conf +++ b/src/main/resources/config/config-template.conf @@ -101,9 +101,8 @@ simona.input.primary = { jdbcUrl: string userName: string password: string - weatherTableName: string + tableName: string schemaName: string | "public" - timeColumnName: string timePattern: string | "yyyy-MM-dd'T'HH:mm:ss[.S[S][S]]'Z'" # default pattern from PSDM:TimeBasedSimpleValueFactory } #@optional @@ -150,9 +149,9 @@ simona.input.weather.datasource = { jdbcUrl: string userName: string password: string - weatherTableName: string + tableName: string schemaName: string | "public" - timeColumnName: string + timePattern: string | "yyyy-MM-dd'T'HH:mm:ss[.S[S][S]]'Z'" # default pattern from PSDM:TimeBasedSimpleValueFactory } #@optional couchbaseParams = { diff --git a/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala b/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala index 6283055bbc..f6eb5cee78 100644 --- a/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala +++ b/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala @@ -1,5 +1,5 @@ /* - * © 2021. TU Dortmund University, + * © 2022. TU Dortmund University, * Institute of Energy Systems, Energy Efficiency and Energy Economics, * Research group Distribution grid planning and operation */ @@ -908,10 +908,9 @@ object SimonaConfig { jdbcUrl: java.lang.String, password: java.lang.String, schemaName: java.lang.String, - timeColumnName: java.lang.String, + tableName: java.lang.String, timePattern: java.lang.String, - userName: java.lang.String, - weatherTableName: java.lang.String + userName: java.lang.String ) object SqlParams { def apply( @@ -925,14 +924,11 @@ object SimonaConfig { schemaName = if (c.hasPathOrNull("schemaName")) c.getString("schemaName") else "public", - timeColumnName = - $_reqStr(parentPath, c, "timeColumnName", $tsCfgValidator), + tableName = $_reqStr(parentPath, c, "tableName", $tsCfgValidator), timePattern = if (c.hasPathOrNull("timePattern")) c.getString("timePattern") else "yyyy-MM-dd'T'HH:mm:ss[.S[S][S]]'Z'", - userName = $_reqStr(parentPath, c, "userName", $tsCfgValidator), - weatherTableName = - $_reqStr(parentPath, c, "weatherTableName", $tsCfgValidator) + userName = $_reqStr(parentPath, c, "userName", $tsCfgValidator) ) } private def $_reqStr( @@ -1277,9 +1273,9 @@ object SimonaConfig { jdbcUrl: java.lang.String, password: java.lang.String, schemaName: java.lang.String, - timeColumnName: java.lang.String, - userName: java.lang.String, - weatherTableName: java.lang.String + tableName: java.lang.String, + timePattern: java.lang.String, + userName: java.lang.String ) object SqlParams { def apply( @@ -1293,11 +1289,12 @@ object SimonaConfig { schemaName = if (c.hasPathOrNull("schemaName")) c.getString("schemaName") else "public", - timeColumnName = - $_reqStr(parentPath, c, "timeColumnName", $tsCfgValidator), - userName = $_reqStr(parentPath, c, "userName", $tsCfgValidator), - weatherTableName = - $_reqStr(parentPath, c, "weatherTableName", $tsCfgValidator) + tableName = + $_reqStr(parentPath, c, "tableName", $tsCfgValidator), + timePattern = + if (c.hasPathOrNull("timePattern")) c.getString("timePattern") + else "yyyy-MM-dd'T'HH:mm:ss[.S[S][S]]'Z'", + userName = $_reqStr(parentPath, c, "userName", $tsCfgValidator) ) } private def $_reqStr( diff --git a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala index 8553ef5533..ad59294518 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala @@ -7,14 +7,17 @@ package edu.ie3.simona.service.primary import akka.actor.{ActorRef, Props} +import edu.ie3.datamodel.io.connectors.SqlConnector import edu.ie3.datamodel.io.csv.timeseries.ColumnScheme import edu.ie3.datamodel.io.factory.timeseries.TimeBasedSimpleValueFactory import edu.ie3.datamodel.io.naming.FileNamingStrategy import edu.ie3.datamodel.io.source.TimeSeriesSource import edu.ie3.datamodel.io.source.csv.CsvTimeSeriesSource +import edu.ie3.datamodel.io.source.sql.SqlTimeSeriesSource import edu.ie3.datamodel.models.value.Value import edu.ie3.simona.agent.participant.data.Data.PrimaryData import edu.ie3.simona.agent.participant.data.Data.PrimaryData.RichValue +import edu.ie3.simona.config.SimonaConfig.Simona.Input.Primary.SqlParams import edu.ie3.simona.exceptions.InitializationException import edu.ie3.simona.exceptions.WeatherServiceException.InvalidRegistrationRequestException import edu.ie3.simona.ontology.messages.SchedulerMessage @@ -24,11 +27,11 @@ import edu.ie3.simona.service.ServiceStateData.{ InitializeServiceStateData, ServiceActivationBaseStateData } -import edu.ie3.simona.service.{ServiceStateData, SimonaService} import edu.ie3.simona.service.primary.PrimaryServiceWorker.{ PrimaryServiceInitializedStateData, ProvidePrimaryDataMessage } +import edu.ie3.simona.service.{ServiceStateData, SimonaService} import edu.ie3.simona.util.TickUtil.{RichZonedDateTime, TickLong} import edu.ie3.util.scala.collection.immutable.SortedDistinctSeq @@ -61,68 +64,94 @@ final case class PrimaryServiceWorker[V <: Value]( PrimaryServiceInitializedStateData[V], Option[Seq[SchedulerMessage.ScheduleTriggerMessage]] ) - ] = initServiceData match { - case PrimaryServiceWorker.CsvInitPrimaryServiceStateData( - timeSeriesUuid, - simulationStart, - csvSep, - directoryPath, - filePath, - fileNamingStrategy, - timePattern - ) => - /* Got the right data. Attempt to set up a source and acquire information */ - implicit val startDateTime: ZonedDateTime = simulationStart + ] = { + val trySource = initServiceData match { + case PrimaryServiceWorker.CsvInitPrimaryServiceStateData( + timeSeriesUuid, + simulationStart, + csvSep, + directoryPath, + filePath, + fileNamingStrategy, + timePattern + ) => + Try { + /* Set up source and acquire information */ + val factory = new TimeBasedSimpleValueFactory(valueClass, timePattern) + val source = new CsvTimeSeriesSource( + csvSep, + directoryPath, + fileNamingStrategy, + timeSeriesUuid, + filePath, + valueClass, + factory + ) + (source, simulationStart) + } + case PrimaryServiceWorker.SqlInitPrimaryServiceStateData( + sqlParams: SqlParams, + timeSeriesUuid: UUID, + simulationStart: ZonedDateTime + ) => + Try { + val valueFactory = + new TimeBasedSimpleValueFactory(valueClass, sqlParams.timePattern) - Try { - /* Set up source and acquire information */ - val factory = new TimeBasedSimpleValueFactory(valueClass, timePattern) - val source = new CsvTimeSeriesSource( - csvSep, - directoryPath, - fileNamingStrategy, - timeSeriesUuid, - filePath, - valueClass, - factory - ) - /* This seems not to be very efficient, but it is as efficient as possible. The getter method points to a - * final attribute within the source implementation. */ - val (maybeNextTick, furtherActivationTicks) = SortedDistinctSeq( - source.getTimeSeries.getEntries.asScala - .filter { timeBasedValue => - val dateTime = timeBasedValue.getTime - dateTime.isEqual(simulationStart) || dateTime.isAfter( - simulationStart - ) - } - .map(timeBasedValue => timeBasedValue.getTime.toTick) - .toSeq - .sorted - ).pop + val sqlConnector = new SqlConnector( + sqlParams.jdbcUrl, + sqlParams.userName, + sqlParams.password + ) - /* Set up the state data and determine the next activation tick. */ - val initializedStateData = - PrimaryServiceInitializedStateData( - maybeNextTick, - furtherActivationTicks, - simulationStart, - source + val source = new SqlTimeSeriesSource( + sqlConnector, + sqlParams.schemaName, + sqlParams.tableName, + timeSeriesUuid, + valueClass, + valueFactory ) - val triggerMessage = - ServiceActivationBaseStateData.tickToScheduleTriggerMessages( - maybeNextTick, - self + + (source, simulationStart) + } + case unsupported => + /* Got the wrong init data */ + Failure( + new InitializationException( + s"Provided init data '${unsupported.getClass.getSimpleName}' for primary service are invalid!" ) - (initializedStateData, triggerMessage) - } - case unsupported => - /* Got the wrong init data */ - Failure( - new InitializationException( - s"Provided init data '${unsupported.getClass.getSimpleName}' for primary service are invalid!" ) - ) + } + trySource.map { case (source, simulationStart) => + val (maybeNextTick, furtherActivationTicks) = SortedDistinctSeq( + source.getTimeSeries.getEntries.asScala + .filter { timeBasedValue => + val dateTime = timeBasedValue.getTime + dateTime.isEqual(simulationStart) || dateTime.isAfter( + simulationStart + ) + } + .map(timeBasedValue => timeBasedValue.getTime.toTick) + .toSeq + .sorted + ).pop + + /* Set up the state data and determine the next activation tick. */ + val initializedStateData = + PrimaryServiceInitializedStateData( + maybeNextTick, + furtherActivationTicks, + simulationStart, + source + ) + val triggerMessage = + ServiceActivationBaseStateData.tickToScheduleTriggerMessages( + maybeNextTick, + self + ) + (initializedStateData, triggerMessage) + } } /** Handle a request to register for information from this service @@ -348,6 +377,22 @@ case object PrimaryServiceWorker { timePattern: String ) extends InitPrimaryServiceStateData + /** Specific implementation of [[InitPrimaryServiceStateData]], if the source + * to use utilizes csv files. + * + * TODO + * + * @param timeSeriesUuid + * Unique identifier of the time series to read + * @param simulationStart + * Wall clock time of the beginning of simulation time + */ + final case class SqlInitPrimaryServiceStateData( + sqlParams: SqlParams, + override val timeSeriesUuid: UUID, + override val simulationStart: ZonedDateTime + ) extends InitPrimaryServiceStateData + /** Class carrying the state of a fully initialized [[PrimaryServiceWorker]] * * @param maybeNextActivationTick diff --git a/src/main/scala/edu/ie3/simona/service/weather/WeatherSourceWrapper.scala b/src/main/scala/edu/ie3/simona/service/weather/WeatherSourceWrapper.scala index baf41e6c9f..a1224055f0 100644 --- a/src/main/scala/edu/ie3/simona/service/weather/WeatherSourceWrapper.scala +++ b/src/main/scala/edu/ie3/simona/service/weather/WeatherSourceWrapper.scala @@ -328,7 +328,7 @@ private[weather] object WeatherSourceWrapper extends LazyLogging { sqlConnector, idCoordinateSource, sqlParams.schemaName, - sqlParams.weatherTableName, + sqlParams.tableName, buildFactory(timestampPattern, scheme) ) logger.info( diff --git a/src/main/scala/edu/ie3/simona/util/ConfigUtil.scala b/src/main/scala/edu/ie3/simona/util/ConfigUtil.scala index b4703c2873..d4ee521750 100644 --- a/src/main/scala/edu/ie3/simona/util/ConfigUtil.scala +++ b/src/main/scala/edu/ie3/simona/util/ConfigUtil.scala @@ -321,11 +321,7 @@ object ConfigUtil { logger.info( "Password for SQL weather source is empty. This is allowed, but not common. Please check if this an intended setting." ) - if (sql.timeColumnName.isEmpty) - throw new InvalidConfigParameterException( - "Time column for SQL weather source cannot be empty" - ) - if (sql.weatherTableName.isEmpty) + if (sql.tableName.isEmpty) throw new InvalidConfigParameterException( "Weather table name for SQL weather source cannot be empty" ) diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala index b72aea17f3..a80ccd0608 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala @@ -127,7 +127,7 @@ class PrimaryServiceProxySpec mappingSource ) - val scheduler = TestProbe("scheduler") + private val scheduler = TestProbe("scheduler") "Testing a primary service config" should { "lead to complaining about too much source definitions" in { @@ -204,7 +204,7 @@ class PrimaryServiceProxySpec None, None, None, - Some(SqlParams("", "", "", "", "", "", "")) + Some(SqlParams("", "", "", "", "", "")) ) val exception = intercept[InvalidConfigParameterException]( @@ -276,7 +276,7 @@ class PrimaryServiceProxySpec None, None, None, - Some(SqlParams("", "", "", "", "", "", "")) + Some(SqlParams("", "", "", "", "", "")) ) proxy invokePrivate prepareStateData( From ff30158497178d43377a9281eb46bd7d08022624 Mon Sep 17 00:00:00 2001 From: Sebastian Peter <14994800+sebastian-peter@users.noreply.github.com> Date: Tue, 18 Jan 2022 18:59:08 +0100 Subject: [PATCH 02/14] Fixing test --- .../ie3/simona/service/primary/PrimaryServiceProxySpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala index a80ccd0608..769115381e 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala @@ -210,7 +210,7 @@ class PrimaryServiceProxySpec val exception = intercept[InvalidConfigParameterException]( PrimaryServiceProxy.checkConfig(maliciousConfig) ) - exception.getMessage shouldBe "Invalid configuration 'SqlParams(,,,,,,)' for a time series source.\nAvailable types:\n\tcsv" + exception.getMessage shouldBe "Invalid configuration 'SqlParams(,,,,,)' for a time series source.\nAvailable types:\n\tcsv" } "fails on invalid time pattern" in { @@ -287,7 +287,7 @@ class PrimaryServiceProxySpec fail("Building state data with missing config should fail") case Failure(exception) => exception.getClass shouldBe classOf[IllegalArgumentException] - exception.getMessage shouldBe "Unsupported config for mapping source: 'SqlParams(,,,,,,)'" + exception.getMessage shouldBe "Unsupported config for mapping source: 'SqlParams(,,,,,)'" } } From 7c6f6f49fbe5a6aeb84f8784be4fb1e8b15ca3f7 Mon Sep 17 00:00:00 2001 From: Sebastian Peter <14994800+sebastian-peter@users.noreply.github.com> Date: Tue, 18 Jan 2022 19:35:20 +0100 Subject: [PATCH 03/14] Making PrimaryService tests work on Windows --- .../primary/PrimaryServiceProxySpec.scala | 14 ++++++++++---- .../primary/PrimaryServiceWorkerSpec.scala | 19 ++++++++++--------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala index 769115381e..1f6f4a5f17 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala @@ -54,10 +54,10 @@ import edu.ie3.util.TimeUtil import org.scalatest.PartialFunctionValues import org.scalatest.prop.TableDrivenPropertyChecks +import java.nio.file.Paths import java.time.ZonedDateTime import java.util.concurrent.TimeUnit import java.util.{Objects, UUID} -import scala.reflect.io.File import scala.util.{Failure, Success, Try} import scala.concurrent.ExecutionContext.Implicits.global @@ -74,9 +74,15 @@ class PrimaryServiceProxySpec ) with TableDrivenPropertyChecks with PartialFunctionValues { - val baseDirectoryPath: String = this.getClass - .getResource(File.separator + "it-data" + File.separator + "primaryService") - .getPath + val baseDirectoryPath: String = Paths + .get( + this.getClass + .getResource( + "/it-data/primaryService" + ) + .toURI + ) + .toString val csvSep = ";" val fileNamingStrategy = new FileNamingStrategy() val validPrimaryConfig: PrimaryConfig = diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala index 640497cf3f..b23eeff6c9 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala @@ -42,9 +42,9 @@ import edu.ie3.util.quantities.PowerSystemUnits import edu.ie3.util.scala.collection.immutable.SortedDistinctSeq import tech.units.indriya.quantity.Quantities +import java.nio.file.Paths import java.time.ZonedDateTime import java.util.UUID -import scala.reflect.io.File import scala.util.{Failure, Success} class PrimaryServiceWorkerSpec @@ -57,14 +57,15 @@ class PrimaryServiceWorkerSpec """.stripMargin) ) ) { - val baseDirectoryPath: String = "^file:".r.replaceFirstIn( - this.getClass - .getResource( - File.separator + "it-data" + File.separator + "primaryService" - ) - .toString, - "" - ) + val baseDirectoryPath: String = Paths + .get( + this.getClass + .getResource( + "/it-data/primaryService" + ) + .toURI + ) + .toString private val simulationStart = TimeUtil.withDefaults.toZonedDateTime("2020-01-01 00:00:00") From 4ec37aa1529d65036ade9a2a3c07f5a194c279d9 Mon Sep 17 00:00:00 2001 From: Sebastian Peter <14994800+sebastian-peter@users.noreply.github.com> Date: Tue, 18 Jan 2022 21:54:57 +0100 Subject: [PATCH 04/14] Added comments --- .../edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala | 1 + .../ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala index 1f6f4a5f17..7f17045fe3 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala @@ -74,6 +74,7 @@ class PrimaryServiceProxySpec ) with TableDrivenPropertyChecks with PartialFunctionValues { + // this works both on Windows and Unix systems val baseDirectoryPath: String = Paths .get( this.getClass diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala index b23eeff6c9..05992f39b2 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala @@ -57,6 +57,7 @@ class PrimaryServiceWorkerSpec """.stripMargin) ) ) { + // this works both on Windows and Unix systems val baseDirectoryPath: String = Paths .get( this.getClass From 1212567f636a5e5ef9be65267d93b4dc358eca8e Mon Sep 17 00:00:00 2001 From: Sebastian Peter <14994800+sebastian-peter@users.noreply.github.com> Date: Thu, 20 Jan 2022 16:13:08 +0100 Subject: [PATCH 05/14] Implementing test for SQL primary data with postgresql testcontainer --- build.gradle | 6 + .../primary/PrimaryServiceWorker.scala | 9 +- ...p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5.sql | 17 ++ ...h_46be1e57-e4ed-4ef7-95f1-b2b321cb2047.sql | 19 ++ .../primary/PrimaryServiceWorkerSqlIT.scala | 215 ++++++++++++++++++ 5 files changed, 261 insertions(+), 5 deletions(-) create mode 100644 src/test/resources/edu/ie3/simona/service/primary/timeseries/its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5.sql create mode 100644 src/test/resources/edu/ie3/simona/service/primary/timeseries/its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047.sql create mode 100644 src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala diff --git a/build.gradle b/build.gradle index b9a125cefa..ef2c7e566f 100644 --- a/build.gradle +++ b/build.gradle @@ -29,6 +29,8 @@ ext { akkaVersion = '2.6.18' tscfgVersion = '0.9.996' + testContainerVersion = '0.39.12' + scriptsLocation = 'gradle' + File.separator + 'scripts' + File.separator // location of script plugins } @@ -100,6 +102,10 @@ dependencies { testImplementation group: 'org.pegdown', name: 'pegdown', version: '1.6.0' testImplementation "com.typesafe.akka:akka-testkit_${scalaVersion}:${akkaVersion}" // akka testkit + // testcontainers + testImplementation "com.dimafeng:testcontainers-scala-scalatest_${scalaVersion}:${testContainerVersion}" + testImplementation "com.dimafeng:testcontainers-scala-postgresql_${scalaVersion}:${testContainerVersion}" + /* --- Scala libs --- */ /* CORE Scala */ implementation "org.scala-lang:scala-library:${scalaBinaryVersion}" diff --git a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala index ad59294518..ccad2ef790 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala @@ -65,7 +65,7 @@ final case class PrimaryServiceWorker[V <: Value]( Option[Seq[SchedulerMessage.ScheduleTriggerMessage]] ) ] = { - val trySource = initServiceData match { + (initServiceData match { case PrimaryServiceWorker.CsvInitPrimaryServiceStateData( timeSeriesUuid, simulationStart, @@ -95,7 +95,7 @@ final case class PrimaryServiceWorker[V <: Value]( simulationStart: ZonedDateTime ) => Try { - val valueFactory = + val factory = new TimeBasedSimpleValueFactory(valueClass, sqlParams.timePattern) val sqlConnector = new SqlConnector( @@ -110,7 +110,7 @@ final case class PrimaryServiceWorker[V <: Value]( sqlParams.tableName, timeSeriesUuid, valueClass, - valueFactory + factory ) (source, simulationStart) @@ -122,8 +122,7 @@ final case class PrimaryServiceWorker[V <: Value]( s"Provided init data '${unsupported.getClass.getSimpleName}' for primary service are invalid!" ) ) - } - trySource.map { case (source, simulationStart) => + }).map { case (source, simulationStart) => val (maybeNextTick, furtherActivationTicks) = SortedDistinctSeq( source.getTimeSeries.getEntries.asScala .filter { timeBasedValue => diff --git a/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5.sql b/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5.sql new file mode 100644 index 0000000000..1f956e06a7 --- /dev/null +++ b/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5.sql @@ -0,0 +1,17 @@ +CREATE TABLE public."its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5" +( + time timestamp with time zone, + p double precision, + uuid uuid, + CONSTRAINT its_p_pkey PRIMARY KEY (uuid) +) + WITH ( + OIDS = FALSE + ) + TABLESPACE pg_default; + +INSERT INTO + public."its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5" (uuid, time, p) +VALUES +('0245d599-9a5c-4c32-9613-5b755fac8ca0', '2020-01-01 00:00:00+0', 1000.0), +('a5e27652-9024-4a93-9d2a-590fbc3ab5a1', '2020-01-01 00:15:00+0', 1250.0); diff --git a/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047.sql b/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047.sql new file mode 100644 index 0000000000..230393eb5b --- /dev/null +++ b/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047.sql @@ -0,0 +1,19 @@ +CREATE TABLE public."its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047" +( + time timestamp with time zone, + p double precision, + q double precision, + heat_demand double precision, + uuid uuid, + CONSTRAINT its_pqh_pkey PRIMARY KEY (uuid) +) + WITH ( + OIDS = FALSE + ) + TABLESPACE pg_default; + +INSERT INTO + public."its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047" (uuid, time, p, q, heat_demand) +VALUES +('661ac594-47f0-4442-8d82-bbeede5661f7', '2020-01-01 00:00:00+0', 1000.0, 329.0, 8.0), +('5adcd6c5-a903-433f-b7b5-5fe669a3ed30', '2020-01-01 00:15:00+0', 1250.0, 411.0, 12.0); diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala new file mode 100644 index 0000000000..88eddf6f5a --- /dev/null +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala @@ -0,0 +1,215 @@ +/* + * © 2022. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.service.primary + +import akka.actor.{ActorRef, ActorSystem} +import akka.testkit.{TestActorRef, TestProbe} +import com.dimafeng.testcontainers.{ForAllTestContainer, PostgreSQLContainer} +import com.typesafe.config.ConfigFactory +import edu.ie3.datamodel.models.value.{HeatAndSValue, PValue, Value} +import edu.ie3.simona.agent.participant.data.Data.PrimaryData.{ + ActivePower, + ApparentPowerAndHeat +} +import edu.ie3.simona.config.SimonaConfig.Simona.Input.Primary.SqlParams +import edu.ie3.simona.ontology.messages.SchedulerMessage.{ + CompletionMessage, + ScheduleTriggerMessage, + TriggerWithIdMessage +} +import edu.ie3.simona.ontology.messages.services.ServiceMessage.RegistrationResponseMessage.RegistrationSuccessfulMessage +import edu.ie3.simona.ontology.messages.services.ServiceMessage.WorkerRegistrationMessage +import edu.ie3.simona.ontology.trigger.Trigger.{ + ActivityStartTrigger, + InitializeServiceTrigger +} +import edu.ie3.simona.service.primary.PrimaryServiceWorker.{ + ProvidePrimaryDataMessage, + SqlInitPrimaryServiceStateData +} +import edu.ie3.simona.test.common.AgentSpec +import edu.ie3.util.TimeUtil +import org.scalatest.BeforeAndAfterAll +import org.scalatest.prop.TableDrivenPropertyChecks +import org.testcontainers.utility.MountableFile + +import java.nio.file.Paths +import java.util.UUID +import scala.language.postfixOps +import scala.reflect.ClassTag + +class PrimaryServiceWorkerSqlIT + extends AgentSpec( + ActorSystem( + "PrimaryServiceWorkerSqlIT", + ConfigFactory + .parseString(""" + |akka.loglevel="OFF" + """.stripMargin) + ) + ) + with ForAllTestContainer + with BeforeAndAfterAll + with TableDrivenPropertyChecks { + + override val container: PostgreSQLContainer = PostgreSQLContainer( + "postgres:11.14" + ) + + private val simulationStart = + TimeUtil.withDefaults.toZonedDateTime("2020-01-01 00:00:00") + + private val schemaName = "public" + + private val uuidP = UUID.fromString("9185b8c1-86ba-4a16-8dea-5ac898e8caa5") + private val uuidPhq = UUID.fromString("46be1e57-e4ed-4ef7-95f1-b2b321cb2047") + + private val tableNameP = s"its_p_$uuidP" + private val tableNamePhq = s"its_pqh_$uuidPhq" + + override protected def beforeAll(): Unit = { + val url = getClass.getResource("timeseries/") + url shouldNot be(null) + val path = Paths.get(url.toURI) + + // Copy sql import scripts into docker + val sqlImportFile = MountableFile.forHostPath(path) + container.copyFileToContainer(sqlImportFile, "/home/") + + Iterable(s"$tableNameP.sql", s"$tableNamePhq.sql") + .foreach { file => + val res = container.execInContainer("psql", "-Utest", "-f/home/" + file) + res.getStderr shouldBe empty + } + } + + override protected def afterAll(): Unit = { + container.stop() + container.close() + } + + private def getServiceActor[T <: Value]( + scheduler: ActorRef + )(implicit tag: ClassTag[T]): PrimaryServiceWorker[T] = { + new PrimaryServiceWorker[T]( + scheduler, + tag.runtimeClass.asInstanceOf[Class[T]], + simulationStart + ) + } + + "A primary service actor with SQL source" should { + "initialize and send out data when activated" in { + + val cases = Table( + ( + "getService", + "uuid", + "tableName", + "firstTick", + "dataValueClass", + "maybeNextTick" + ), + ( + getServiceActor[HeatAndSValue](_), + uuidPhq, + tableNamePhq, + 0L, + classOf[ApparentPowerAndHeat], + Some(900L) + ), + ( + getServiceActor[PValue](_), + uuidP, + tableNameP, + 0L, + classOf[ActivePower], + Some(900L) + ) + ) + + forAll(cases) { + ( + getService, + uuid, + tableName, + firstTick, + dataValueClass, + maybeNextTick + ) => + val scheduler = TestProbe("scheduler") + + val serviceRef = + TestActorRef( + getService(scheduler.ref) + ) + + val initData = SqlInitPrimaryServiceStateData( + SqlParams( + jdbcUrl = container.jdbcUrl, + userName = container.username, + password = container.password, + schemaName = schemaName, + tableName = tableName, + timePattern = "yyyy-MM-dd HH:mm:ss" + ), + uuid, + simulationStart + ) + + val triggerId1 = 1L + + scheduler.send( + serviceRef, + TriggerWithIdMessage( + InitializeServiceTrigger(initData), + triggerId1, + serviceRef + ) + ) + + scheduler.expectMsg( + CompletionMessage( + triggerId1, + Some( + List( + ScheduleTriggerMessage( + ActivityStartTrigger(firstTick), + serviceRef + ) + ) + ) + ) + ) + + val participant = TestProbe() + + participant.send( + serviceRef, + WorkerRegistrationMessage(participant.ref) + ) + participant.expectMsg(RegistrationSuccessfulMessage(Some(firstTick))) + + val triggerId2 = 2L + + scheduler.send( + serviceRef, + TriggerWithIdMessage( + ActivityStartTrigger(firstTick), + triggerId2, + serviceRef + ) + ) + + val dataMsg = participant.expectMsgType[ProvidePrimaryDataMessage] + dataMsg.tick shouldBe firstTick + dataMsg.data.getClass shouldBe dataValueClass + dataMsg.nextDataTick shouldBe maybeNextTick + } + } + } +} From c2b2cfb4c831a424cf8fb5e2a5a2d1d4a82583d1 Mon Sep 17 00:00:00 2001 From: Sebastian Peter <14994800+sebastian-peter@users.noreply.github.com> Date: Thu, 20 Jan 2022 16:50:38 +0100 Subject: [PATCH 06/14] Small improvements --- .../ie3/simona/service/primary/PrimaryServiceWorker.scala | 6 +++--- .../simona/service/primary/PrimaryServiceWorkerSqlIT.scala | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala index ccad2ef790..68ba913443 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala @@ -377,10 +377,10 @@ case object PrimaryServiceWorker { ) extends InitPrimaryServiceStateData /** Specific implementation of [[InitPrimaryServiceStateData]], if the source - * to use utilizes csv files. - * - * TODO + * to use utilizes an SQL database. * + * @param sqlParams + * Parameters regarding SQL connection and table selection * @param timeSeriesUuid * Unique identifier of the time series to read * @param simulationStart diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala index 88eddf6f5a..6d81f05d3e 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala @@ -205,6 +205,8 @@ class PrimaryServiceWorkerSqlIT ) ) + scheduler.expectMsgType[CompletionMessage] + val dataMsg = participant.expectMsgType[ProvidePrimaryDataMessage] dataMsg.tick shouldBe firstTick dataMsg.data.getClass shouldBe dataValueClass From 8d41e16dfa060ea622bffbb45fc61d8faa06e6f9 Mon Sep 17 00:00:00 2001 From: Sebastian Peter <14994800+sebastian-peter@users.noreply.github.com> Date: Thu, 20 Jan 2022 16:50:50 +0100 Subject: [PATCH 07/14] Adding to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c47d5ac600..3ca43d56b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,5 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Changed - Improving code readability in EvcsAgent by moving FreeLotsRequest to separate methods +- Implement SQL source for primary data [#34](https://github.com/ie3-institute/simona/issues/34) [Unreleased]: https://github.com/ie3-institute/simona From 882baa68298501a0cae2d0d9e9d0e06fc076432c Mon Sep 17 00:00:00 2001 From: Sebastian Peter <14994800+sebastian-peter@users.noreply.github.com> Date: Mon, 24 Jan 2022 14:24:19 +0100 Subject: [PATCH 08/14] Addressing Thomas' comments --- .../edu/ie3/simona/service/primary/PrimaryServiceWorker.scala | 1 + .../ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala index 68ba913443..3ccb45cac3 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala @@ -124,6 +124,7 @@ final case class PrimaryServiceWorker[V <: Value]( ) }).map { case (source, simulationStart) => val (maybeNextTick, furtherActivationTicks) = SortedDistinctSeq( + // Note: The whole data set is used here, which might be inefficient depending on the source implementation. source.getTimeSeries.getEntries.asScala .filter { timeBasedValue => val dateTime = timeBasedValue.getTime diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala index 6d81f05d3e..77536c1442 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala @@ -92,6 +92,8 @@ class PrimaryServiceWorkerSqlIT container.close() } + // asInstanceOf throws ClassCastException if cast fails, thus this is safe here + @SuppressWarnings(Array("AsInstanceOf")) private def getServiceActor[T <: Value]( scheduler: ActorRef )(implicit tag: ClassTag[T]): PrimaryServiceWorker[T] = { From bf8535722df79170b47986c6d2e949b95eca14e2 Mon Sep 17 00:00:00 2001 From: Sebastian Peter <14994800+sebastian-peter@users.noreply.github.com> Date: Thu, 27 Jan 2022 10:28:53 +0100 Subject: [PATCH 09/14] Improved changelog --- CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ca43d56b5..e6f4029eae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + +### Added +- Implement SQL source for primary data [#34](https://github.com/ie3-institute/simona/issues/34) + ### Changed - Improving code readability in EvcsAgent by moving FreeLotsRequest to separate methods -- Implement SQL source for primary data [#34](https://github.com/ie3-institute/simona/issues/34) [Unreleased]: https://github.com/ie3-institute/simona From 8dcbc8c39deb9be514f0f51d6bc375cc2f92b7a6 Mon Sep 17 00:00:00 2001 From: Sebastian Peter Date: Wed, 16 Mar 2022 19:19:23 +0100 Subject: [PATCH 10/14] Adapting to changes in PSDM and addressing some comments --- gradle/scripts/tscfg.gradle | 2 +- .../resources/config/config-template.conf | 1 - .../edu/ie3/simona/config/SimonaConfig.scala | 2 - .../service/primary/PrimaryServiceProxy.scala | 136 +++++++-------- .../primary/PrimaryServiceWorker.scala | 34 ++-- ...p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5.sql | 17 -- ...h_46be1e57-e4ed-4ef7-95f1-b2b321cb2047.sql | 19 --- .../primary/timeseries/time_series_p.sql | 21 +++ .../primary/timeseries/time_series_pqh.sql | 21 +++ .../primary/PrimaryServiceProxySpec.scala | 161 +++++++----------- .../primary/PrimaryServiceWorkerSpec.scala | 23 ++- .../primary/PrimaryServiceWorkerSqlIT.scala | 79 +++------ .../common/input/TimeSeriesTestData.scala | 39 +++++ .../test/helper/TestContainerHelper.scala | 36 ++++ 14 files changed, 305 insertions(+), 286 deletions(-) delete mode 100644 src/test/resources/edu/ie3/simona/service/primary/timeseries/its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5.sql delete mode 100644 src/test/resources/edu/ie3/simona/service/primary/timeseries/its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047.sql create mode 100644 src/test/resources/edu/ie3/simona/service/primary/timeseries/time_series_p.sql create mode 100644 src/test/resources/edu/ie3/simona/service/primary/timeseries/time_series_pqh.sql create mode 100644 src/test/scala/edu/ie3/simona/test/common/input/TimeSeriesTestData.scala create mode 100644 src/test/scala/edu/ie3/simona/test/helper/TestContainerHelper.scala diff --git a/gradle/scripts/tscfg.gradle b/gradle/scripts/tscfg.gradle index 7f430ff1f7..ce1fe4dbe9 100644 --- a/gradle/scripts/tscfg.gradle +++ b/gradle/scripts/tscfg.gradle @@ -15,7 +15,7 @@ task genConfigClass { args = [ "build/tscfg-${tscfgVersion}.jar", "--spec", - "src/main/resources/config/simona-config-template.conf", + "src/main/resources/config/config-template.conf", "--scala", "--durations", "--pn", diff --git a/src/main/resources/config/config-template.conf b/src/main/resources/config/config-template.conf index 4848267d2c..9c36b25cf5 100644 --- a/src/main/resources/config/config-template.conf +++ b/src/main/resources/config/config-template.conf @@ -101,7 +101,6 @@ simona.input.primary = { jdbcUrl: string userName: string password: string - tableName: string schemaName: string | "public" timePattern: string | "yyyy-MM-dd'T'HH:mm:ss[.S[S][S]]'Z'" # default pattern from PSDM:TimeBasedSimpleValueFactory } diff --git a/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala b/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala index f6eb5cee78..93e9b4931b 100644 --- a/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala +++ b/src/main/scala/edu/ie3/simona/config/SimonaConfig.scala @@ -908,7 +908,6 @@ object SimonaConfig { jdbcUrl: java.lang.String, password: java.lang.String, schemaName: java.lang.String, - tableName: java.lang.String, timePattern: java.lang.String, userName: java.lang.String ) @@ -924,7 +923,6 @@ object SimonaConfig { schemaName = if (c.hasPathOrNull("schemaName")) c.getString("schemaName") else "public", - tableName = $_reqStr(parentPath, c, "tableName", $tsCfgValidator), timePattern = if (c.hasPathOrNull("timePattern")) c.getString("timePattern") else "yyyy-MM-dd'T'HH:mm:ss[.S[S][S]]'Z'", diff --git a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala index 010c0525dd..3e6d213b07 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala @@ -9,9 +9,12 @@ package edu.ie3.simona.service.primary import akka.actor.{Actor, ActorRef, PoisonPill, Props} import edu.ie3.datamodel.io.csv.CsvIndividualTimeSeriesMetaInformation import edu.ie3.datamodel.io.naming.FileNamingStrategy -import edu.ie3.datamodel.io.naming.timeseries.ColumnScheme +import edu.ie3.datamodel.io.naming.timeseries.IndividualTimeSeriesMetaInformation import edu.ie3.datamodel.io.source.TimeSeriesMappingSource -import edu.ie3.datamodel.io.source.csv.CsvTimeSeriesMappingSource +import edu.ie3.datamodel.io.source.csv.{ + CsvTimeSeriesMappingSource, + CsvTimeSeriesTypeSource +} import edu.ie3.datamodel.models.value.Value import edu.ie3.simona.config.SimonaConfig import edu.ie3.simona.config.SimonaConfig.Simona.Input.Primary.CsvParams @@ -51,7 +54,6 @@ import java.time.ZonedDateTime import java.util.UUID import scala.Option.when import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters._ import scala.util.{Failure, Success, Try} /** This actor has information on which models can be replaced by precalculated @@ -137,19 +139,27 @@ case class PrimaryServiceProxy( ).filter(_.isDefined).flatten.headOption match { case Some(CsvParams(csvSep, folderPath, _)) => // TODO: Configurable file naming strategy + val fileNamingStrategy = new FileNamingStrategy() val mappingSource = new CsvTimeSeriesMappingSource( csvSep, folderPath, - new FileNamingStrategy() + fileNamingStrategy + ) + val typeSource = new CsvTimeSeriesTypeSource( + csvSep, + folderPath, + fileNamingStrategy ) val modelToTimeSeries = mappingSource.getMapping.asScala.toMap + val timeSeriesMetaInformation = + typeSource.getTimeSeriesMetaInformation.asScala.toMap + val timeSeriesToSourceRef = modelToTimeSeries.values .to(LazyList) .distinct .flatMap { timeSeriesUuid => - mappingSource - .timeSeriesMetaInformation(timeSeriesUuid) - .toScala match { + timeSeriesMetaInformation + .get(timeSeriesUuid) match { case Some(metaInformation) => val columnScheme = metaInformation.getColumnScheme /* Only register those entries, that meet the supported column schemes */ @@ -157,7 +167,7 @@ case class PrimaryServiceProxy( PrimaryServiceWorker.supportedColumnSchemes .contains(columnScheme) ) { - timeSeriesUuid -> SourceRef(columnScheme, None) + timeSeriesUuid -> SourceRef(metaInformation, None) } case None => log.warning( @@ -251,14 +261,12 @@ case class PrimaryServiceProxy( /* There is yet a worker apparent. Register the requesting actor. The worker will reply to the original * requesting actor. */ worker ! WorkerRegistrationMessage(requestingActor) - case Some(SourceRef(columnScheme, None)) => + case Some(SourceRef(metaInformation, None)) => /* There is NO worker apparent, yet. Spin one off. */ initializeWorker( - columnScheme, - timeSeriesUuid, + metaInformation, stateData.simulationStart, - stateData.primaryConfig, - stateData.mappingSource + stateData.primaryConfig ) match { case Success(workerRef) => /* Forward the registration request. The worker will reply about successful registration or not. */ @@ -289,33 +297,28 @@ case class PrimaryServiceProxy( /** Instantiate a new [[PrimaryServiceWorker]] and send initialization * information * - * @param columnScheme - * Scheme of the data to expect + * @param metaInformation + * Meta information (including column scheme) of the time series + * @param simulationStart + * The time of the simulation start * @param primaryConfig * Configuration for the primary config - * @param mappingSource - * Source for time series mapping, that might deliver additional - * information for the source initialization * @return * The [[ActorRef]] to the worker */ protected def initializeWorker( - columnScheme: ColumnScheme, - timeSeriesUuid: UUID, + metaInformation: IndividualTimeSeriesMetaInformation, simulationStart: ZonedDateTime, - primaryConfig: PrimaryConfig, - mappingSource: TimeSeriesMappingSource + primaryConfig: PrimaryConfig ): Try[ActorRef] = { val workerRef = classToWorkerRef( - columnScheme.getValueClass, - timeSeriesUuid.toString, - simulationStart + metaInformation.getColumnScheme.getValueClass, + metaInformation.getUuid.toString ) toInitData( - primaryConfig, - mappingSource, - timeSeriesUuid, - simulationStart + metaInformation, + simulationStart, + primaryConfig ) match { case Success(initData) => scheduler ! ScheduleTriggerMessage( @@ -341,8 +344,6 @@ case class PrimaryServiceProxy( * Class of the values to provide later on * @param timeSeriesUuid * uuid of the time series the actor processes - * @param simulationStart - * Wall clock time of first instant in simulation * @tparam V * Type of the class to provide * @return @@ -350,33 +351,29 @@ case class PrimaryServiceProxy( */ protected def classToWorkerRef[V <: Value]( valueClass: Class[V], - timeSeriesUuid: String, - simulationStart: ZonedDateTime + timeSeriesUuid: String ): ActorRef = { import edu.ie3.simona.actor.SimonaActorNaming._ context.system.simonaActorOf( - PrimaryServiceWorker.props(scheduler, valueClass, simulationStart), + PrimaryServiceWorker.props(scheduler, valueClass), timeSeriesUuid ) } /** Building proper init data for the worker * - * @param primaryConfig - * Configuration for primary sources - * @param mappingSource - * Source to get mapping information about time series - * @param timeSeriesUuid - * Unique identifier for the time series + * @param metaInformation + * Meta information (including column scheme) of the time series * @param simulationStart - * Wall clock time of the first instant in simulation + * The time of the simulation start + * @param primaryConfig + * Configuration for the primary config * @return */ private def toInitData( - primaryConfig: PrimaryConfig, - mappingSource: TimeSeriesMappingSource, - timeSeriesUuid: UUID, - simulationStart: ZonedDateTime + metaInformation: IndividualTimeSeriesMetaInformation, + simulationStart: ZonedDateTime, + primaryConfig: PrimaryConfig ): Try[InitPrimaryServiceStateData] = primaryConfig match { case PrimaryConfig( @@ -386,28 +383,27 @@ case class PrimaryServiceProxy( None ) => /* The mapping and actual data sources are from csv. At first, get the file name of the file to read. */ - Try(mappingSource.timeSeriesMetaInformation(timeSeriesUuid).get) - .flatMap { - /* Time series meta information could be successfully obtained */ - case csvMetaData: CsvIndividualTimeSeriesMetaInformation => - Success( - CsvInitPrimaryServiceStateData( - timeSeriesUuid, - simulationStart, - csvSep, - directoryPath, - csvMetaData.getFullFilePath, - new FileNamingStrategy(), - timePattern - ) + metaInformation match { + /* Time series meta information could be successfully obtained */ + case csvMetaData: CsvIndividualTimeSeriesMetaInformation => + Success( + CsvInitPrimaryServiceStateData( + csvMetaData.getUuid, + simulationStart, + csvSep, + directoryPath, + csvMetaData.getFullFilePath, + new FileNamingStrategy(), + timePattern ) - case invalidMetaData => - Failure( - new InitializationException( - s"Expected '${classOf[CsvIndividualTimeSeriesMetaInformation]}', but got '$invalidMetaData'." - ) + ) + case invalidMetaData => + Failure( + new InitializationException( + s"Expected '${classOf[CsvIndividualTimeSeriesMetaInformation]}', but got '$invalidMetaData'." ) - } + ) + } case unsupported => Failure( new InitializationException( @@ -489,14 +485,14 @@ object PrimaryServiceProxy { /** Giving reference to the target time series and source worker. * - * @param columnScheme - * Column scheme of the time series to get + * @param metaInformation + * Meta information (including column scheme) of the time series * @param worker - * Optional reference to a yet existing worker providing information on - * that time series + * Optional reference to an already existing worker providing information + * on that time series */ final case class SourceRef( - columnScheme: ColumnScheme, + metaInformation: IndividualTimeSeriesMetaInformation, worker: Option[ActorRef] ) diff --git a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala index 17ff2405bc..7d25630cac 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceWorker.scala @@ -9,8 +9,8 @@ package edu.ie3.simona.service.primary import akka.actor.{ActorRef, Props} import edu.ie3.datamodel.io.connectors.SqlConnector import edu.ie3.datamodel.io.factory.timeseries.TimeBasedSimpleValueFactory -import edu.ie3.datamodel.io.naming.FileNamingStrategy import edu.ie3.datamodel.io.naming.timeseries.ColumnScheme +import edu.ie3.datamodel.io.naming.{DatabaseNamingStrategy, FileNamingStrategy} import edu.ie3.datamodel.io.source.TimeSeriesSource import edu.ie3.datamodel.io.source.csv.CsvTimeSeriesSource import edu.ie3.datamodel.io.source.sql.SqlTimeSeriesSource @@ -43,8 +43,7 @@ import scala.util.{Failure, Success, Try} final case class PrimaryServiceWorker[V <: Value]( override protected val scheduler: ActorRef, - valueClass: Class[V], - private implicit val startDateTime: ZonedDateTime + valueClass: Class[V] ) extends SimonaService[PrimaryServiceInitializedStateData[V]](scheduler) { /** Initialize the actor with the given information. Try to figure out the @@ -89,10 +88,12 @@ final case class PrimaryServiceWorker[V <: Value]( ) (source, simulationStart) } + case PrimaryServiceWorker.SqlInitPrimaryServiceStateData( - sqlParams: SqlParams, timeSeriesUuid: UUID, - simulationStart: ZonedDateTime + simulationStart: ZonedDateTime, + sqlParams: SqlParams, + namingStrategy: DatabaseNamingStrategy ) => Try { val factory = @@ -107,7 +108,7 @@ final case class PrimaryServiceWorker[V <: Value]( val source = new SqlTimeSeriesSource( sqlConnector, sqlParams.schemaName, - sqlParams.tableName, + namingStrategy, timeSeriesUuid, valueClass, factory @@ -115,6 +116,7 @@ final case class PrimaryServiceWorker[V <: Value]( (source, simulationStart) } + case unsupported => /* Got the wrong init data */ Failure( @@ -123,6 +125,8 @@ final case class PrimaryServiceWorker[V <: Value]( ) ) }).map { case (source, simulationStart) => + implicit val startDateTime: ZonedDateTime = simulationStart + val (maybeNextTick, furtherActivationTicks) = SortedDistinctSeq( // Note: The whole data set is used here, which might be inefficient depending on the source implementation. source.getTimeSeries.getEntries.asScala @@ -318,7 +322,7 @@ final case class PrimaryServiceWorker[V <: Value]( } } -case object PrimaryServiceWorker { +object PrimaryServiceWorker { /** List of supported column schemes aka. column schemes, that belong to * primary data @@ -332,10 +336,9 @@ case object PrimaryServiceWorker { def props[V <: Value]( scheduler: ActorRef, - valueClass: Class[V], - simulationStart: ZonedDateTime + valueClass: Class[V] ): Props = - Props(new PrimaryServiceWorker(scheduler, valueClass, simulationStart)) + Props(new PrimaryServiceWorker(scheduler, valueClass)) /** Abstract class pattern for specific [[InitializeServiceStateData]]. * Different implementations are needed, because the [[PrimaryServiceProxy]] @@ -380,17 +383,20 @@ case object PrimaryServiceWorker { /** Specific implementation of [[InitPrimaryServiceStateData]], if the source * to use utilizes an SQL database. * - * @param sqlParams - * Parameters regarding SQL connection and table selection * @param timeSeriesUuid * Unique identifier of the time series to read * @param simulationStart * Wall clock time of the beginning of simulation time + * @param sqlParams + * Parameters regarding SQL connection and table selection + * @param databaseNamingStrategy + * Strategy of naming database entities, such as tables */ final case class SqlInitPrimaryServiceStateData( - sqlParams: SqlParams, override val timeSeriesUuid: UUID, - override val simulationStart: ZonedDateTime + override val simulationStart: ZonedDateTime, + sqlParams: SqlParams, + databaseNamingStrategy: DatabaseNamingStrategy ) extends InitPrimaryServiceStateData /** Class carrying the state of a fully initialized [[PrimaryServiceWorker]] diff --git a/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5.sql b/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5.sql deleted file mode 100644 index 1f956e06a7..0000000000 --- a/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5.sql +++ /dev/null @@ -1,17 +0,0 @@ -CREATE TABLE public."its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5" -( - time timestamp with time zone, - p double precision, - uuid uuid, - CONSTRAINT its_p_pkey PRIMARY KEY (uuid) -) - WITH ( - OIDS = FALSE - ) - TABLESPACE pg_default; - -INSERT INTO - public."its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5" (uuid, time, p) -VALUES -('0245d599-9a5c-4c32-9613-5b755fac8ca0', '2020-01-01 00:00:00+0', 1000.0), -('a5e27652-9024-4a93-9d2a-590fbc3ab5a1', '2020-01-01 00:15:00+0', 1250.0); diff --git a/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047.sql b/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047.sql deleted file mode 100644 index 230393eb5b..0000000000 --- a/src/test/resources/edu/ie3/simona/service/primary/timeseries/its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047.sql +++ /dev/null @@ -1,19 +0,0 @@ -CREATE TABLE public."its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047" -( - time timestamp with time zone, - p double precision, - q double precision, - heat_demand double precision, - uuid uuid, - CONSTRAINT its_pqh_pkey PRIMARY KEY (uuid) -) - WITH ( - OIDS = FALSE - ) - TABLESPACE pg_default; - -INSERT INTO - public."its_pqh_46be1e57-e4ed-4ef7-95f1-b2b321cb2047" (uuid, time, p, q, heat_demand) -VALUES -('661ac594-47f0-4442-8d82-bbeede5661f7', '2020-01-01 00:00:00+0', 1000.0, 329.0, 8.0), -('5adcd6c5-a903-433f-b7b5-5fe669a3ed30', '2020-01-01 00:15:00+0', 1250.0, 411.0, 12.0); diff --git a/src/test/resources/edu/ie3/simona/service/primary/timeseries/time_series_p.sql b/src/test/resources/edu/ie3/simona/service/primary/timeseries/time_series_p.sql new file mode 100644 index 0000000000..79beaf5e70 --- /dev/null +++ b/src/test/resources/edu/ie3/simona/service/primary/timeseries/time_series_p.sql @@ -0,0 +1,21 @@ +CREATE TABLE public.time_series_p +( + uuid uuid PRIMARY KEY, + time_series uuid NOT NULL, + time timestamp with time zone NOT NULL, + p double precision NOT NULL +) + WITHOUT OIDS + TABLESPACE pg_default; + +CREATE INDEX time_series_p_series_id ON time_series_p USING hash (time_series); + +CREATE UNIQUE INDEX time_series_p_series_time ON time_series_p USING btree (time_series, time); + +INSERT INTO + public.time_series_p (uuid, time_series, time, p) +VALUES +('0245d599-9a5c-4c32-9613-5b755fac8ca0', '9185b8c1-86ba-4a16-8dea-5ac898e8caa5', '2020-01-01 00:00:00+0', 1000.0), +('a5e27652-9024-4a93-9d2a-590fbc3ab5a1', '9185b8c1-86ba-4a16-8dea-5ac898e8caa5', '2020-01-01 00:15:00+0', 1250.0), +('b4a2b3e0-7215-431b-976e-d8b41c7bc71b', 'b669e4bf-a351-4067-860d-d5f224b62247', '2020-01-01 00:00:00+0', 50.0), +('1c8f072c-c833-47da-a3e9-5f4d305ab926', 'b669e4bf-a351-4067-860d-d5f224b62247', '2020-01-01 00:15:00+0', 100.0); diff --git a/src/test/resources/edu/ie3/simona/service/primary/timeseries/time_series_pqh.sql b/src/test/resources/edu/ie3/simona/service/primary/timeseries/time_series_pqh.sql new file mode 100644 index 0000000000..8bd3a48908 --- /dev/null +++ b/src/test/resources/edu/ie3/simona/service/primary/timeseries/time_series_pqh.sql @@ -0,0 +1,21 @@ +CREATE TABLE public.time_series_pqh +( + uuid uuid PRIMARY KEY, + time_series uuid NOT NULL, + time timestamp with time zone NOT NULL, + p double precision NOT NULL, + q double precision NOT NULL, + heat_demand double precision NOT NULL +) + WITHOUT OIDS + TABLESPACE pg_default; + +CREATE INDEX time_series_pqh_series_id ON time_series_pqh USING hash (time_series); + +CREATE UNIQUE INDEX time_series_pqh_series_time ON time_series_pqh USING btree (time_series, time); + +INSERT INTO + public.time_series_pqh (uuid, time_series, time, p, q, heat_demand) +VALUES +('661ac594-47f0-4442-8d82-bbeede5661f7', '46be1e57-e4ed-4ef7-95f1-b2b321cb2047', '2020-01-01 00:00:00+0', 1000.0, 329.0, 8.0), +('5adcd6c5-a903-433f-b7b5-5fe669a3ed30', '46be1e57-e4ed-4ef7-95f1-b2b321cb2047', '2020-01-01 00:15:00+0', 1250.0, 411.0, 12.0); diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala index 1a31e3bae2..8ab191fc3b 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceProxySpec.scala @@ -10,8 +10,9 @@ import akka.actor.{ActorRef, ActorSystem, PoisonPill} import akka.testkit.{TestActorRef, TestProbe} import akka.util.Timeout import com.typesafe.config.ConfigFactory +import edu.ie3.datamodel.io.csv.CsvIndividualTimeSeriesMetaInformation import edu.ie3.datamodel.io.naming.FileNamingStrategy -import edu.ie3.datamodel.io.naming.timeseries.ColumnScheme +import edu.ie3.datamodel.io.naming.timeseries.IndividualTimeSeriesMetaInformation import edu.ie3.datamodel.io.source.TimeSeriesMappingSource import edu.ie3.datamodel.io.source.csv.CsvTimeSeriesMappingSource import edu.ie3.datamodel.models.value.{SValue, Value} @@ -33,22 +34,23 @@ import edu.ie3.simona.ontology.messages.SchedulerMessage.{ ScheduleTriggerMessage, TriggerWithIdMessage } +import edu.ie3.simona.ontology.messages.services.ServiceMessage.RegistrationResponseMessage.RegistrationFailedMessage import edu.ie3.simona.ontology.messages.services.ServiceMessage.{ PrimaryServiceRegistrationMessage, WorkerRegistrationMessage } -import edu.ie3.simona.ontology.messages.services.ServiceMessage.RegistrationResponseMessage.RegistrationFailedMessage import edu.ie3.simona.ontology.trigger.Trigger.InitializeServiceTrigger -import edu.ie3.simona.service.primary.PrimaryServiceWorker.{ - CsvInitPrimaryServiceStateData, - InitPrimaryServiceStateData -} import edu.ie3.simona.service.primary.PrimaryServiceProxy.{ InitPrimaryServiceProxyStateData, PrimaryServiceStateData, SourceRef } +import edu.ie3.simona.service.primary.PrimaryServiceWorker.{ + CsvInitPrimaryServiceStateData, + InitPrimaryServiceStateData +} import edu.ie3.simona.test.common.AgentSpec +import edu.ie3.simona.test.common.input.TimeSeriesTestData import edu.ie3.util.TimeUtil import org.scalatest.PartialFunctionValues import org.scalatest.prop.TableDrivenPropertyChecks @@ -57,8 +59,8 @@ import java.nio.file.Paths import java.time.ZonedDateTime import java.util.concurrent.TimeUnit import java.util.{Objects, UUID} -import scala.util.{Failure, Success, Try} import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.{Failure, Success, Try} class PrimaryServiceProxySpec extends AgentSpec( @@ -72,7 +74,8 @@ class PrimaryServiceProxySpec ) ) with TableDrivenPropertyChecks - with PartialFunctionValues { + with PartialFunctionValues + with TimeSeriesTestData { // this works both on Windows and Unix systems val baseDirectoryPath: String = Paths .get( @@ -103,30 +106,19 @@ class PrimaryServiceProxySpec baseDirectoryPath, fileNamingStrategy ) - val workerId: String = - "PrimaryService_3fbfaa97-cff4-46d4-95ba-a95665e87c26" + val workerId: String = "PrimaryService_" + uuidPq val modelUuid: UUID = UUID.fromString("c7ebcc6c-55fc-479b-aa6b-6fa82ccac6b8") - val timeSeriesUuid: UUID = - UUID.fromString("3fbfaa97-cff4-46d4-95ba-a95665e87c26") val simulationStart: ZonedDateTime = TimeUtil.withDefaults.toZonedDateTime("2021-03-17 13:14:00") val proxyStateData: PrimaryServiceStateData = PrimaryServiceStateData( Map( - UUID.fromString("b86e95b0-e579-4a80-a534-37c7a470a409") -> UUID - .fromString("9185b8c1-86ba-4a16-8dea-5ac898e8caa5"), - modelUuid -> UUID.fromString("3fbfaa97-cff4-46d4-95ba-a95665e87c26"), - UUID.fromString("90a96daa-012b-4fea-82dc-24ba7a7ab81c") -> UUID - .fromString("3fbfaa97-cff4-46d4-95ba-a95665e87c26") + UUID.fromString("b86e95b0-e579-4a80-a534-37c7a470a409") -> uuidP, + modelUuid -> uuidPq, + UUID.fromString("90a96daa-012b-4fea-82dc-24ba7a7ab81c") -> uuidPq ), Map( - UUID.fromString("9185b8c1-86ba-4a16-8dea-5ac898e8caa5") -> SourceRef( - ColumnScheme.ACTIVE_POWER, - None - ), - UUID.fromString("3fbfaa97-cff4-46d4-95ba-a95665e87c26") -> SourceRef( - ColumnScheme.APPARENT_POWER, - None - ) + uuidP -> SourceRef(metaP, None), + uuidPq -> SourceRef(metaPq, None) ), simulationStart, validPrimaryConfig, @@ -210,13 +202,13 @@ class PrimaryServiceProxySpec None, None, None, - Some(SqlParams("", "", "", "", "", "")) + Some(SqlParams("", "", "", "", "")) ) val exception = intercept[InvalidConfigParameterException]( PrimaryServiceProxy.checkConfig(maliciousConfig) ) - exception.getMessage shouldBe "Invalid configuration 'SqlParams(,,,,,)' for a time series source.\nAvailable types:\n\tcsv" + exception.getMessage shouldBe "Invalid configuration 'SqlParams(,,,,)' for a time series source.\nAvailable types:\n\tcsv" } "fails on invalid time pattern" in { @@ -282,7 +274,7 @@ class PrimaryServiceProxySpec None, None, None, - Some(SqlParams("", "", "", "", "", "")) + Some(SqlParams("", "", "", "", "")) ) proxy invokePrivate prepareStateData( @@ -293,7 +285,7 @@ class PrimaryServiceProxySpec fail("Building state data with missing config should fail") case Failure(exception) => exception.getClass shouldBe classOf[IllegalArgumentException] - exception.getMessage shouldBe "Unsupported config for mapping source: 'SqlParams(,,,,,)'" + exception.getMessage shouldBe "Unsupported config for mapping source: 'SqlParams(,,,,)'" } } @@ -312,24 +304,13 @@ class PrimaryServiceProxySpec ) ) => modelToTimeSeries shouldBe Map( - UUID.fromString("b86e95b0-e579-4a80-a534-37c7a470a409") -> UUID - .fromString("9185b8c1-86ba-4a16-8dea-5ac898e8caa5"), - UUID.fromString("c7ebcc6c-55fc-479b-aa6b-6fa82ccac6b8") -> UUID - .fromString("3fbfaa97-cff4-46d4-95ba-a95665e87c26"), - UUID.fromString("90a96daa-012b-4fea-82dc-24ba7a7ab81c") -> UUID - .fromString("3fbfaa97-cff4-46d4-95ba-a95665e87c26") + UUID.fromString("b86e95b0-e579-4a80-a534-37c7a470a409") -> uuidP, + UUID.fromString("c7ebcc6c-55fc-479b-aa6b-6fa82ccac6b8") -> uuidPq, + UUID.fromString("90a96daa-012b-4fea-82dc-24ba7a7ab81c") -> uuidPq ) timeSeriesToSourceRef shouldBe Map( - UUID - .fromString("9185b8c1-86ba-4a16-8dea-5ac898e8caa5") -> SourceRef( - ColumnScheme.ACTIVE_POWER, - None - ), - UUID - .fromString("3fbfaa97-cff4-46d4-95ba-a95665e87c26") -> SourceRef( - ColumnScheme.APPARENT_POWER, - None - ) + uuidP -> SourceRef(metaP, None), + uuidPq -> SourceRef(metaPq, None) ) simulationStart shouldBe this.simulationStart primaryConfig shouldBe validPrimaryConfig @@ -372,8 +353,7 @@ class PrimaryServiceProxySpec val workerRef = proxy invokePrivate classToWorkerRef( testClass, - workerId, - simulationStart + workerId ) Objects.nonNull(workerRef) shouldBe true @@ -385,12 +365,15 @@ class PrimaryServiceProxySpec val toInitData = PrivateMethod[Try[InitPrimaryServiceStateData]]( Symbol("toInitData") ) + val metaInformation = new CsvIndividualTimeSeriesMetaInformation( + metaPq, + "its_pq_" + uuidPq + ) proxy invokePrivate toInitData( - validPrimaryConfig, - mappingSource, - timeSeriesUuid, - simulationStart + metaInformation, + simulationStart, + validPrimaryConfig ) match { case Success( CsvInitPrimaryServiceStateData( @@ -403,11 +386,11 @@ class PrimaryServiceProxySpec timePattern ) ) => - actualTimeSeriesUuid shouldBe timeSeriesUuid + actualTimeSeriesUuid shouldBe uuidPq actualSimulationStart shouldBe simulationStart actualCsvSep shouldBe csvSep directoryPath shouldBe baseDirectoryPath - filePath shouldBe "its_pq_3fbfaa97-cff4-46d4-95ba-a95665e87c26" + filePath shouldBe metaInformation.getFullFilePath classOf[FileNamingStrategy].isAssignableFrom( fileNamingStrategy.getClass ) shouldBe true @@ -429,13 +412,10 @@ class PrimaryServiceProxySpec None, None ) - proxy invokePrivate initializeWorker( - ColumnScheme.APPARENT_POWER, - timeSeriesUuid, + metaPq, simulationStart, - maliciousPrimaryConfig, - mappingSource + maliciousPrimaryConfig ) match { case Failure(exception) => /* Check the exception */ @@ -468,35 +448,32 @@ class PrimaryServiceProxySpec TestActorRef(new PrimaryServiceProxy(scheduler.ref, simulationStart) { override protected def classToWorkerRef[V <: Value]( valueClass: Class[V], - timeSeriesUuid: String, - simulationStart: ZonedDateTime + timeSeriesUuid: String ): ActorRef = testProbe.ref // needs to be overwritten as to make it available to the private method tester @SuppressWarnings(Array("NoOpOverride")) override protected def initializeWorker( - columnScheme: ColumnScheme, - timeSeriesUuid: UUID, + metaInformation: IndividualTimeSeriesMetaInformation, simulationStart: ZonedDateTime, - primaryConfig: PrimaryConfig, - mappingSource: TimeSeriesMappingSource + primaryConfig: PrimaryConfig ): Try[ActorRef] = super.initializeWorker( - columnScheme, - timeSeriesUuid, + metaInformation, simulationStart, - primaryConfig, - mappingSource + primaryConfig ) }) val fakeProxy: PrimaryServiceProxy = fakeProxyRef.underlyingActor + val metaInformation = new CsvIndividualTimeSeriesMetaInformation( + metaPq, + "its_pq_" + uuidPq + ) fakeProxy invokePrivate initializeWorker( - ColumnScheme.APPARENT_POWER, - timeSeriesUuid, + metaInformation, simulationStart, - validPrimaryConfig, - mappingSource + validPrimaryConfig ) match { case Success(workerRef) => /* Check, if expected init message has been sent */ @@ -515,11 +492,11 @@ class PrimaryServiceProxySpec ), actorToBeScheduled ) => - actualTimeSeriesUuid shouldBe timeSeriesUuid + actualTimeSeriesUuid shouldBe uuidPq actualSimulationStart shouldBe simulationStart actualCsvSep shouldBe csvSep directoryPath shouldBe baseDirectoryPath - filePath shouldBe "its_pq_3fbfaa97-cff4-46d4-95ba-a95665e87c26" + filePath shouldBe metaInformation.getFullFilePath classOf[FileNamingStrategy].isAssignableFrom( fileNamingStrategy.getClass ) shouldBe true @@ -556,7 +533,7 @@ class PrimaryServiceProxySpec "work otherwise" in { proxy invokePrivate updateStateData( proxyStateData, - timeSeriesUuid, + uuidPq, self ) match { case PrimaryServiceStateData( @@ -568,16 +545,8 @@ class PrimaryServiceProxySpec ) => modelToTimeSeries shouldBe proxyStateData.modelToTimeSeries timeSeriesToSourceRef shouldBe Map( - UUID - .fromString("9185b8c1-86ba-4a16-8dea-5ac898e8caa5") -> SourceRef( - ColumnScheme.ACTIVE_POWER, - None - ), - UUID - .fromString("3fbfaa97-cff4-46d4-95ba-a95665e87c26") -> SourceRef( - ColumnScheme.APPARENT_POWER, - Some(self) - ) + uuidP -> SourceRef(metaP, None), + uuidPq -> SourceRef(metaPq, Some(self)) ) simulationStart shouldBe proxyStateData.simulationStart primaryConfig shouldBe proxyStateData.primaryConfig @@ -595,7 +564,7 @@ class PrimaryServiceProxySpec proxy invokePrivate handleCoveredModel( modelUuid, - timeSeriesUuid, + uuidPq, maliciousStateData, self ) @@ -605,13 +574,13 @@ class PrimaryServiceProxySpec "forward the registration request, if worker is already known" in { val adaptedStateData = proxyStateData.copy( timeSeriesToSourceRef = Map( - timeSeriesUuid -> SourceRef(ColumnScheme.APPARENT_POWER, Some(self)) + uuidPq -> SourceRef(metaPq, Some(self)) ) ) proxy invokePrivate handleCoveredModel( modelUuid, - timeSeriesUuid, + uuidPq, adaptedStateData, self ) @@ -630,7 +599,7 @@ class PrimaryServiceProxySpec proxy invokePrivate handleCoveredModel( modelUuid, - timeSeriesUuid, + uuidPq, maliciousStateData, self ) @@ -643,11 +612,9 @@ class PrimaryServiceProxySpec val fakeProxyRef = TestActorRef(new PrimaryServiceProxy(self, simulationStart) { override protected def initializeWorker( - columnScheme: ColumnScheme, - timeSeriesUuid: UUID, + metaInformation: IndividualTimeSeriesMetaInformation, simulationStart: ZonedDateTime, - primaryConfig: PrimaryConfig, - mappingSource: TimeSeriesMappingSource + primaryConfig: PrimaryConfig ): Try[ActorRef] = Success(probe.ref) // needs to be overwritten as to make it available to the private method tester @@ -669,7 +636,7 @@ class PrimaryServiceProxySpec fakeProxy invokePrivate handleCoveredModel( modelUuid, - timeSeriesUuid, + uuidPq, proxyStateData, self ) @@ -693,11 +660,9 @@ class PrimaryServiceProxySpec val fakeProxyRef = TestActorRef(new PrimaryServiceProxy(self, simulationStart) { override protected def initializeWorker( - columnScheme: ColumnScheme, - timeSeriesUuid: UUID, + metaInformation: IndividualTimeSeriesMetaInformation, simulationStart: ZonedDateTime, - primaryConfig: PrimaryConfig, - mappingSource: TimeSeriesMappingSource + primaryConfig: PrimaryConfig ): Try[ActorRef] = Success(probe.ref) }) diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala index 53556678fa..2a7e0d816c 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSpec.scala @@ -37,6 +37,7 @@ import edu.ie3.simona.service.primary.PrimaryServiceWorker.{ } import edu.ie3.simona.service.primary.PrimaryServiceWorkerSpec.WrongInitPrimaryServiceStateData import edu.ie3.simona.test.common.AgentSpec +import edu.ie3.simona.test.common.input.TimeSeriesTestData import edu.ie3.util.TimeUtil import edu.ie3.util.quantities.PowerSystemUnits import edu.ie3.util.scala.collection.immutable.SortedDistinctSeq @@ -56,7 +57,8 @@ class PrimaryServiceWorkerSpec |akka.loglevel="OFF" """.stripMargin) ) - ) { + ) + with TimeSeriesTestData { // this works both on Windows and Unix systems val baseDirectoryPath: String = Paths .get( @@ -68,15 +70,12 @@ class PrimaryServiceWorkerSpec ) .toString - private val simulationStart = - TimeUtil.withDefaults.toZonedDateTime("2020-01-01 00:00:00") - val validInitData: CsvInitPrimaryServiceStateData = CsvInitPrimaryServiceStateData( - timeSeriesUuid = UUID.fromString("9185b8c1-86ba-4a16-8dea-5ac898e8caa5"), + timeSeriesUuid = uuidP, csvSep = ";", directoryPath = baseDirectoryPath, - filePath = "its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5", + filePath = "its_p_" + uuidP, fileNamingStrategy = new FileNamingStrategy(), simulationStart = TimeUtil.withDefaults.toZonedDateTime("2020-01-01 00:00:00"), @@ -88,8 +87,7 @@ class PrimaryServiceWorkerSpec TestActorRef( new PrimaryServiceWorker[PValue]( self, - classOf[PValue], - simulationStart + classOf[PValue] ) ) val service = serviceRef.underlyingActor @@ -106,13 +104,12 @@ class PrimaryServiceWorkerSpec "fail, if pointed to the wrong file" in { val maliciousInitData = CsvInitPrimaryServiceStateData( - timeSeriesUuid = - UUID.fromString("3fbfaa97-cff4-46d4-95ba-a95665e87c26"), + timeSeriesUuid = uuidPq, simulationStart = TimeUtil.withDefaults.toZonedDateTime("2020-01-01 00:00:00"), csvSep = ";", directoryPath = baseDirectoryPath, - filePath = "its_pq_3fbfaa97-cff4-46d4-95ba-a95665e87c26", + filePath = "its_pq_" + uuidPq, fileNamingStrategy = new FileNamingStrategy(), timePattern = TimeUtil.withDefaults.getDtfPattern ) @@ -200,8 +197,8 @@ class PrimaryServiceWorkerSpec ";", baseDirectoryPath, new FileNamingStrategy(), - UUID.fromString("9185b8c1-86ba-4a16-8dea-5ac898e8caa5"), - "its_p_9185b8c1-86ba-4a16-8dea-5ac898e8caa5", + uuidP, + "its_p_" + uuidP, classOf[PValue], new TimeBasedSimpleValueFactory[PValue](classOf[PValue]) ), diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala index 77536c1442..d171975881 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala @@ -6,11 +6,12 @@ package edu.ie3.simona.service.primary -import akka.actor.{ActorRef, ActorSystem} +import akka.actor.ActorSystem import akka.testkit.{TestActorRef, TestProbe} import com.dimafeng.testcontainers.{ForAllTestContainer, PostgreSQLContainer} import com.typesafe.config.ConfigFactory -import edu.ie3.datamodel.models.value.{HeatAndSValue, PValue, Value} +import edu.ie3.datamodel.io.naming.DatabaseNamingStrategy +import edu.ie3.datamodel.models.value.{HeatAndSValue, PValue} import edu.ie3.simona.agent.participant.data.Data.PrimaryData.{ ActivePower, ApparentPowerAndHeat @@ -32,15 +33,11 @@ import edu.ie3.simona.service.primary.PrimaryServiceWorker.{ SqlInitPrimaryServiceStateData } import edu.ie3.simona.test.common.AgentSpec +import edu.ie3.simona.test.common.input.TimeSeriesTestData +import edu.ie3.simona.test.helper.TestContainerHelper import edu.ie3.util.TimeUtil import org.scalatest.BeforeAndAfterAll import org.scalatest.prop.TableDrivenPropertyChecks -import org.testcontainers.utility.MountableFile - -import java.nio.file.Paths -import java.util.UUID -import scala.language.postfixOps -import scala.reflect.ClassTag class PrimaryServiceWorkerSqlIT extends AgentSpec( @@ -54,7 +51,9 @@ class PrimaryServiceWorkerSqlIT ) with ForAllTestContainer with BeforeAndAfterAll - with TableDrivenPropertyChecks { + with TableDrivenPropertyChecks + with TimeSeriesTestData + with TestContainerHelper { override val container: PostgreSQLContainer = PostgreSQLContainer( "postgres:11.14" @@ -65,22 +64,12 @@ class PrimaryServiceWorkerSqlIT private val schemaName = "public" - private val uuidP = UUID.fromString("9185b8c1-86ba-4a16-8dea-5ac898e8caa5") - private val uuidPhq = UUID.fromString("46be1e57-e4ed-4ef7-95f1-b2b321cb2047") - - private val tableNameP = s"its_p_$uuidP" - private val tableNamePhq = s"its_pqh_$uuidPhq" - override protected def beforeAll(): Unit = { - val url = getClass.getResource("timeseries/") - url shouldNot be(null) - val path = Paths.get(url.toURI) - // Copy sql import scripts into docker - val sqlImportFile = MountableFile.forHostPath(path) + val sqlImportFile = getMountableFile("timeseries/") container.copyFileToContainer(sqlImportFile, "/home/") - Iterable(s"$tableNameP.sql", s"$tableNamePhq.sql") + Iterable("time_series_p.sql", "time_series_pqh.sql") .foreach { file => val res = container.execInContainer("psql", "-Utest", "-f/home/" + file) res.getStderr shouldBe empty @@ -92,42 +81,34 @@ class PrimaryServiceWorkerSqlIT container.close() } - // asInstanceOf throws ClassCastException if cast fails, thus this is safe here - @SuppressWarnings(Array("AsInstanceOf")) - private def getServiceActor[T <: Value]( - scheduler: ActorRef - )(implicit tag: ClassTag[T]): PrimaryServiceWorker[T] = { - new PrimaryServiceWorker[T]( - scheduler, - tag.runtimeClass.asInstanceOf[Class[T]], - simulationStart - ) - } - "A primary service actor with SQL source" should { "initialize and send out data when activated" in { + val scheduler = TestProbe("scheduler") val cases = Table( ( - "getService", + "service", "uuid", - "tableName", "firstTick", "dataValueClass", "maybeNextTick" ), ( - getServiceActor[HeatAndSValue](_), - uuidPhq, - tableNamePhq, + PrimaryServiceWorker.props( + scheduler.ref, + classOf[HeatAndSValue] + ), + uuidPqh, 0L, classOf[ApparentPowerAndHeat], Some(900L) ), ( - getServiceActor[PValue](_), + PrimaryServiceWorker.props( + scheduler.ref, + classOf[PValue] + ), uuidP, - tableNameP, 0L, classOf[ActivePower], Some(900L) @@ -136,31 +117,25 @@ class PrimaryServiceWorkerSqlIT forAll(cases) { ( - getService, + service, uuid, - tableName, firstTick, dataValueClass, maybeNextTick ) => - val scheduler = TestProbe("scheduler") - - val serviceRef = - TestActorRef( - getService(scheduler.ref) - ) + val serviceRef = TestActorRef(service) val initData = SqlInitPrimaryServiceStateData( + uuid, + simulationStart, SqlParams( jdbcUrl = container.jdbcUrl, userName = container.username, password = container.password, schemaName = schemaName, - tableName = tableName, timePattern = "yyyy-MM-dd HH:mm:ss" ), - uuid, - simulationStart + new DatabaseNamingStrategy() ) val triggerId1 = 1L @@ -213,6 +188,8 @@ class PrimaryServiceWorkerSqlIT dataMsg.tick shouldBe firstTick dataMsg.data.getClass shouldBe dataValueClass dataMsg.nextDataTick shouldBe maybeNextTick + + scheduler.expectNoMessage() } } } diff --git a/src/test/scala/edu/ie3/simona/test/common/input/TimeSeriesTestData.scala b/src/test/scala/edu/ie3/simona/test/common/input/TimeSeriesTestData.scala new file mode 100644 index 0000000000..64d674e3ad --- /dev/null +++ b/src/test/scala/edu/ie3/simona/test/common/input/TimeSeriesTestData.scala @@ -0,0 +1,39 @@ +/* + * © 2022. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.test.common.input + +import edu.ie3.datamodel.io.naming.timeseries.{ + ColumnScheme, + IndividualTimeSeriesMetaInformation +} + +import java.util.UUID + +trait TimeSeriesTestData { + protected val uuidP: UUID = + UUID.fromString("9185b8c1-86ba-4a16-8dea-5ac898e8caa5") + protected val uuidPq: UUID = + UUID.fromString("3fbfaa97-cff4-46d4-95ba-a95665e87c26") + protected val uuidPqh: UUID = + UUID.fromString("46be1e57-e4ed-4ef7-95f1-b2b321cb2047") + + protected val metaP: IndividualTimeSeriesMetaInformation = + new IndividualTimeSeriesMetaInformation( + uuidP, + ColumnScheme.ACTIVE_POWER + ) + protected val metaPq: IndividualTimeSeriesMetaInformation = + new IndividualTimeSeriesMetaInformation( + uuidPq, + ColumnScheme.APPARENT_POWER + ) + protected val metaPqh: IndividualTimeSeriesMetaInformation = + new IndividualTimeSeriesMetaInformation( + uuidPqh, + ColumnScheme.APPARENT_POWER_AND_HEAT_DEMAND + ) +} diff --git a/src/test/scala/edu/ie3/simona/test/helper/TestContainerHelper.scala b/src/test/scala/edu/ie3/simona/test/helper/TestContainerHelper.scala new file mode 100644 index 0000000000..8bb7ff7ad6 --- /dev/null +++ b/src/test/scala/edu/ie3/simona/test/helper/TestContainerHelper.scala @@ -0,0 +1,36 @@ +/* + * © 2022. TU Dortmund University, + * Institute of Energy Systems, Energy Efficiency and Energy Economics, + * Research group Distribution grid planning and operation + */ + +package edu.ie3.simona.test.helper + +import akka.testkit.TestException +import org.testcontainers.utility.MountableFile + +import java.nio.file.Paths + +trait TestContainerHelper { + + /** Retrieve resource with the class' resource loader. In contrast to + * [[org.testcontainers.utility.MountableFile#forClasspathResource(java.lang.String, java.lang.Integer)]], + * this also works with paths relative to the current class (i.e. without + * leading '/'). + * @param resource + * the resource directory or file path + * @return + * a MountableFile to use with test containers + */ + def getMountableFile(resource: String): MountableFile = { + def url = getClass.getResource(resource) + if (url == null) { + throw TestException( + "Resource '" + resource + "' was not found from " + getClass.toString + ) + } + def path = Paths.get(url.toURI) + + MountableFile.forHostPath(path) + } +} From 0e885d5ee2cfea3f21e26505adbf061e21f9f545 Mon Sep 17 00:00:00 2001 From: Sebastian Peter Date: Wed, 16 Mar 2022 19:25:45 +0100 Subject: [PATCH 11/14] Updating testcontainers version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 4f6c24d20a..a5045c941f 100644 --- a/build.gradle +++ b/build.gradle @@ -30,7 +30,7 @@ ext { tscfgVersion = '0.9.997' scapegoatVersion = '1.4.12' - testContainerVersion = '0.39.12' + testContainerVersion = '0.40.3' scriptsLocation = 'gradle' + File.separator + 'scripts' + File.separator // location of script plugins } From 880697fdec0feb0c41addd141be6089839d96f4f Mon Sep 17 00:00:00 2001 From: Sebastian Peter Date: Wed, 16 Mar 2022 19:44:29 +0100 Subject: [PATCH 12/14] Improving getMountableFile as stream of Option --- .../simona/test/helper/TestContainerHelper.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/test/scala/edu/ie3/simona/test/helper/TestContainerHelper.scala b/src/test/scala/edu/ie3/simona/test/helper/TestContainerHelper.scala index 8bb7ff7ad6..f58786aa6d 100644 --- a/src/test/scala/edu/ie3/simona/test/helper/TestContainerHelper.scala +++ b/src/test/scala/edu/ie3/simona/test/helper/TestContainerHelper.scala @@ -23,14 +23,13 @@ trait TestContainerHelper { * a MountableFile to use with test containers */ def getMountableFile(resource: String): MountableFile = { - def url = getClass.getResource(resource) - if (url == null) { - throw TestException( - "Resource '" + resource + "' was not found from " + getClass.toString + Option(getClass.getResource(resource)) + .map(url => Paths.get(url.toURI)) + .map(MountableFile.forHostPath) + .getOrElse( + throw TestException( + "Resource '" + resource + "' was not found from " + getClass.toString + ) ) - } - def path = Paths.get(url.toURI) - - MountableFile.forHostPath(path) } } From 962e87dd65154afc473c723cc2ca26fd204e3834 Mon Sep 17 00:00:00 2001 From: Sebastian Peter Date: Mon, 21 Mar 2022 17:26:30 +0100 Subject: [PATCH 13/14] Adapted/removed comments --- .../edu/ie3/simona/service/primary/PrimaryServiceProxy.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala index 3e6d213b07..f46af75f53 100644 --- a/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala +++ b/src/main/scala/edu/ie3/simona/service/primary/PrimaryServiceProxy.scala @@ -382,9 +382,8 @@ case class PrimaryServiceProxy( None, None ) => - /* The mapping and actual data sources are from csv. At first, get the file name of the file to read. */ + /* The actual data sources are from csv. Meta information have to match */ metaInformation match { - /* Time series meta information could be successfully obtained */ case csvMetaData: CsvIndividualTimeSeriesMetaInformation => Success( CsvInitPrimaryServiceStateData( From dbc797d29e9928edb4171268bef5a6b8be4aab68 Mon Sep 17 00:00:00 2001 From: Sebastian Peter Date: Mon, 21 Mar 2022 17:26:52 +0100 Subject: [PATCH 14/14] Testing first data values with IT --- .../primary/PrimaryServiceWorkerSqlIT.scala | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala index d171975881..1609809225 100644 --- a/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala +++ b/src/test/scala/edu/ie3/simona/service/primary/PrimaryServiceWorkerSqlIT.scala @@ -11,6 +11,7 @@ import akka.testkit.{TestActorRef, TestProbe} import com.dimafeng.testcontainers.{ForAllTestContainer, PostgreSQLContainer} import com.typesafe.config.ConfigFactory import edu.ie3.datamodel.io.naming.DatabaseNamingStrategy +import edu.ie3.datamodel.models.StandardUnits import edu.ie3.datamodel.models.value.{HeatAndSValue, PValue} import edu.ie3.simona.agent.participant.data.Data.PrimaryData.{ ActivePower, @@ -38,6 +39,7 @@ import edu.ie3.simona.test.helper.TestContainerHelper import edu.ie3.util.TimeUtil import org.scalatest.BeforeAndAfterAll import org.scalatest.prop.TableDrivenPropertyChecks +import tech.units.indriya.quantity.Quantities class PrimaryServiceWorkerSqlIT extends AgentSpec( @@ -56,7 +58,7 @@ class PrimaryServiceWorkerSqlIT with TestContainerHelper { override val container: PostgreSQLContainer = PostgreSQLContainer( - "postgres:11.14" + "postgres:14.2" ) private val simulationStart = @@ -90,7 +92,7 @@ class PrimaryServiceWorkerSqlIT "service", "uuid", "firstTick", - "dataValueClass", + "firstData", "maybeNextTick" ), ( @@ -100,7 +102,11 @@ class PrimaryServiceWorkerSqlIT ), uuidPqh, 0L, - classOf[ApparentPowerAndHeat], + ApparentPowerAndHeat( + Quantities.getQuantity(1000.0d, StandardUnits.ACTIVE_POWER_IN), + Quantities.getQuantity(329.0d, StandardUnits.REACTIVE_POWER_IN), + Quantities.getQuantity(8000.0, StandardUnits.HEAT_DEMAND_PROFILE) + ), Some(900L) ), ( @@ -110,7 +116,9 @@ class PrimaryServiceWorkerSqlIT ), uuidP, 0L, - classOf[ActivePower], + ActivePower( + Quantities.getQuantity(1000.0d, StandardUnits.ACTIVE_POWER_IN) + ), Some(900L) ) ) @@ -120,7 +128,7 @@ class PrimaryServiceWorkerSqlIT service, uuid, firstTick, - dataValueClass, + firstData, maybeNextTick ) => val serviceRef = TestActorRef(service) @@ -186,7 +194,7 @@ class PrimaryServiceWorkerSqlIT val dataMsg = participant.expectMsgType[ProvidePrimaryDataMessage] dataMsg.tick shouldBe firstTick - dataMsg.data.getClass shouldBe dataValueClass + dataMsg.data shouldBe firstData dataMsg.nextDataTick shouldBe maybeNextTick scheduler.expectNoMessage()