Skip to content

Commit

Permalink
Validate table fields against metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
ludovicc committed Feb 15, 2019
1 parent 419c0ca commit ed5efd0
Show file tree
Hide file tree
Showing 23 changed files with 2,792 additions and 2,499 deletions.
48 changes: 47 additions & 1 deletion src/main/scala/ch/chuv/lren/woken/dao/FeaturesRepository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package ch.chuv.lren.woken.dao

import cats.Id
import cats.data.{ NonEmptyList, Validated }
import cats.effect.concurrent.Ref
import cats.effect.{ Effect, Resource }
import cats.implicits._
Expand All @@ -28,10 +29,11 @@ import ch.chuv.lren.woken.core.model.database.{ FeaturesTableDescription, TableC
import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation
import ch.chuv.lren.woken.dao.FeaturesTableRepository.Headers
import ch.chuv.lren.woken.messages.datasets.{ DatasetId, TableId }
import ch.chuv.lren.woken.messages.query.{ UserFeedback, UserFeedbacks, UserWarning }
import spray.json._
import spray.json.DefaultJsonProtocol._
import ch.chuv.lren.woken.messages.query.filters._
import ch.chuv.lren.woken.messages.variables.SqlType
import ch.chuv.lren.woken.messages.variables.{ SqlType, VariableMetaData }
import doobie.util.log.LogHandler
import doobie.util.update.Update0
import sup.HealthCheck
Expand Down Expand Up @@ -133,6 +135,49 @@ trait FeaturesTableRepository[F[_]] extends Repository[F] {

def features(query: FeaturesQuery): F[(Headers, Stream[JsObject])]

/**
* Validate the fields in the actual table against their metadata
*
* @param variables Full list of variables for the table as defined in the metadata
*/
def validateFields(variables: List[VariableMetaData])(
implicit effect: Effect[F]
): F[Validated[NonEmptyList[(VariableMetaData, UserFeedback)], UserFeedbacks]] = {
val variableNames = variables.map(_.toId.code).toSet
val headerNames = columns.map(_.name).toSet
val unknownHeaders =
headerNames.diff(variableNames).filterNot(c => table.primaryKey.exists(_.name == c))
val unknownVariables = variableNames.diff(headerNames)
unknownVariables.toList.toNel
.fold(
unknownHeaders
.map { h =>
val msg = s"Column $h in table ${table.table.toString} is not described in the metadata"
UserWarning(msg): UserFeedback
}
.toList
.valid[NonEmptyList[(VariableMetaData, UserFeedback)]]
)(
unknowVarsNel =>
unknowVarsNel
.map(
v =>
variables
.find(_.code == v)
.getOrElse(throw new IllegalStateException("This variable should exist"))
)
.map(
varMeta =>
(varMeta,
UserWarning(
s"Variable ${varMeta.code} does not exist in table ${table.table.toString}"
))
)
.invalid
)
.pure[F]
}

/**
*
* @param filters Filters always applied on the queries
Expand All @@ -147,6 +192,7 @@ trait FeaturesTableRepository[F[_]] extends Repository[F] {
otherColumns: List[TableColumn],
prefills: List[PrefillExtendedFeaturesTable]
): Validation[Resource[F, FeaturesTableRepository[F]]]

}

object FeaturesTableRepository {
Expand Down
105 changes: 64 additions & 41 deletions src/main/scala/ch/chuv/lren/woken/service/DatabaseServices.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import cats.data.NonEmptyList
import cats.effect._
import cats.implicits._
import ch.chuv.lren.woken.config.{ DatabaseConfiguration, WokenConfiguration, configurationFailed }
import ch.chuv.lren.woken.core.model.VariablesMeta
import ch.chuv.lren.woken.dao._
import ch.chuv.lren.woken.messages.datasets.Dataset
import com.typesafe.scalalogging.Logger
import doobie.hikari.HikariTransactor
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -59,46 +61,20 @@ case class DatabaseServices[F[_]: ConcurrentEffect: ContextShift: Timer](
Monoid
.combineAll(datasetService.datasets().values.filter(_.location.isEmpty).map { dataset =>
Monoid
.combineAll(dataset.tables.map {
table =>
{
featuresService
.featuresTable(table)
.fold[F[Unit]](
{ error: NonEmptyList[String] =>
val errMsg = error.mkString_(",")
logger.error(errMsg)
Effect[F].raiseError(new IllegalStateException(errMsg))
}, {
table: FeaturesTableService[F] =>
table
.count(dataset.id)
.flatMap[Unit] { count =>
if (count == 0) {
val error =
s"Table ${table.table} contains no value for dataset ${dataset.id.code}"
logger.error(error)
Effect[F].raiseError(throw new IllegalStateException(error))
} else ().pure[F]
}
.flatMap { _ =>
val tableId = table.table.table
variablesMetaService
.get(tableId)
.flatMap(
metaO =>
metaO.fold(
Effect[F].raiseError[Unit](
new IllegalStateException(
s"Cannot find metadata for table ${tableId.toString}"
)
)
)(_ => ().pure[F])
)
}
}
)
}
.combineAll(dataset.tables.map { table =>
{
featuresService
.featuresTable(table)
.fold[F[Unit]](
{ error: NonEmptyList[String] =>
val errMsg = error.mkString_(",")
logger.error(errMsg)
Effect[F].raiseError(new IllegalStateException(errMsg))
}, { table: FeaturesTableService[F] =>
validateTable(dataset, table)
}
)
}
})
.map(
_ => {
Expand All @@ -109,10 +85,57 @@ case class DatabaseServices[F[_]: ConcurrentEffect: ContextShift: Timer](
}
)
})
.map(s_ => logger.info("[OK] Datasets are valid"))
.map(_ => logger.info("[OK] Datasets are valid"))

}

private def validateTable(dataset: Dataset, table: FeaturesTableService[F]): F[Unit] =
for {
_ <- tableShouldContainRowsForDataset(dataset, table)
variables <- tableShouldHaveMetadataDefined(table)
_ <- tableFieldsShouldMatchMetadata(table, variables)
} yield ()

private def tableShouldContainRowsForDataset(dataset: Dataset,
table: FeaturesTableService[F]): F[Unit] =
table
.count(dataset.id)
.flatMap[Unit] { count =>
if (count == 0) {
val error =
s"Table ${table.table} contains no value for dataset ${dataset.id.code}"
logger.error(error)
Effect[F].raiseError(new IllegalStateException(error))
} else ().pure[F]
}

private def tableShouldHaveMetadataDefined(table: FeaturesTableService[F]): F[VariablesMeta] = {
val tableId = table.table.table
variablesMetaService
.get(tableId)
.flatMap(
metaO =>
metaO.fold(
Effect[F].raiseError[VariablesMeta](
new IllegalStateException(
s"Cannot find metadata for table ${tableId.toString}"
)
)
)(_.pure[F])
)
}

private def tableFieldsShouldMatchMetadata(table: FeaturesTableService[F],
variables: VariablesMeta): F[Unit] =
table
.validateFields(variables.allVariables())
.map { validation =>
validation.fold(
err => Effect[F].raiseError(new IllegalStateException(err.map(_._2.msg).mkString_(", "))),
warnings => Effect[F].delay(warnings.foreach(w => logger.warn(w.msg)))
)
}

def close(): F[Unit] = Effect[F].pure(())

type TaggedS[H] = Tagged[String, H]
Expand Down
22 changes: 22 additions & 0 deletions src/main/scala/ch/chuv/lren/woken/service/FeaturesService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package ch.chuv.lren.woken.service

import cats.Id
import cats.data.{ NonEmptyList, Validated }
import cats.effect.{ Effect, Resource }
import cats.syntax.validated._
import ch.chuv.lren.woken.core.features.FeaturesQuery
Expand All @@ -30,7 +31,9 @@ import ch.chuv.lren.woken.dao.{
import ch.chuv.lren.woken.messages.datasets.{ DatasetId, TableId }
import ch.chuv.lren.woken.core.fp.runNow
import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation
import ch.chuv.lren.woken.messages.query.{ UserFeedback, UserFeedbacks }
import ch.chuv.lren.woken.messages.query.filters.FilterRule
import ch.chuv.lren.woken.messages.variables.VariableMetaData
import spray.json.JsObject
import sup.HealthCheck

Expand Down Expand Up @@ -96,6 +99,15 @@ trait FeaturesTableService[F[_]] {

def features(query: FeaturesQuery): F[(Headers, Stream[JsObject])]

/**
* Validate the fields in the actual table against their metadata
*
* @param variables Full list of variables for the table as defined in the metadata
*/
def validateFields(
variables: List[VariableMetaData]
): F[Validated[NonEmptyList[(VariableMetaData, UserFeedback)], UserFeedbacks]]

def createExtendedFeaturesTable(
filters: Option[FilterRule],
newFeatures: List[TableColumn],
Expand Down Expand Up @@ -157,6 +169,16 @@ class FeaturesTableServiceImpl[F[_]: Effect](repository: FeaturesTableRepository
override def datasets(filters: Option[FilterRule]): F[Set[DatasetId]] =
repository.datasets(filters)

/**
* Validate the fields in the actual table against their metadata
*
* @param variables Full list of variables for the table as defined in the metadata
*/
override def validateFields(
variables: List[VariableMetaData]
): F[Validated[NonEmptyList[(VariableMetaData, UserFeedback)], UserFeedbacks]] =
repository.validateFields(variables)

override def createExtendedFeaturesTable(
filters: Option[FilterRule],
newFeatures: List[TableColumn],
Expand Down
2 changes: 1 addition & 1 deletion tests/http/query-data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ http -v --verify=no -a admin:WoKeN --timeout 180 POST http://localhost:8087/mini
grouping:='[]' \
covariables:='[{"code":"\"cognitive_task2\""},{"code":"\"score_test1\""}]' \
covariablesMustExist:=true \
targetTable='sample_data' \
targetTable:='{"name":"sample_data"}' \
algorithm:='{"code":"data", "name": "data", "parameters": []}'
2 changes: 1 addition & 1 deletion tests/http/query-experiment-nb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ http -v --verify=no -a admin:WoKeN --timeout 180 POST http://localhost:8087/mini
grouping:='[]' \
covariables:='[{"code":"subjectage"}, {"code":"leftcuncuneus"}]' \
covariablesMustExist:=true \
targetTable='cde_features_a' \
targetTable:='{"name":"cde_features_a"}' \
algorithms:='[{"code":"naiveBayes", "parameters": []}]' \
validations:='[{"code":"kfold", "name": "kfold", "parameters": [{"code": "k", "value": "2"}]}]'
2 changes: 1 addition & 1 deletion tests/http/query-experiment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ http -v --verify=no -a admin:WoKeN --timeout 180 POST http://localhost:8087/mini
grouping:='[]' \
covariables:='[{"code":"score_test1"}, {"code":"college_math"}]' \
covariablesMustExist:=true \
targetTable='sample_data' \
targetTable:='{"name":"sample_data"}' \
algorithms:='[{"code":"knn", "name": "knn", "parameters": []}]' \
validations:='[{"code":"kfold", "name": "kfold", "parameters": [{"code": "k", "value": "2"}]}]'
2 changes: 1 addition & 1 deletion tests/http/query-knn-distributed.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ http -v --verify=no -a admin:WoKeN --timeout 180 POST http://localhost:8087/mini
covariables:='[{"code":"leftcaudate"}]' \
covariablesMustExist:=true \
datasets:='[{"code":"desd-synthdata"},{"code":"qqni-synthdata"}]' \
targetTable='cde_features_mixed' \
targetTable:='{"name":"cde_features_mixed"}' \
algorithm:='{"code":"knn", "name": "KNN", "parameters": []}'
2 changes: 1 addition & 1 deletion tests/http/query-knn-validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ http -v --verify=no -a admin:WoKeN --timeout 180 POST http://localhost:8087/mini
grouping:='[]' \
covariables:='[{"code":"leftcaudate"}]' \
covariablesMustExist:=true \
targetTable='cde_features_mixed' \
targetTable:='{"name":"cde_features_mixed"}' \
algorithms:='[{"code":"knn", "name": "knn", "parameters": []}]' \
validations:='[{"code":"kfold", "name": "kfold", "parameters": [{"code": "k", "value": "2"}]}]' \
trainingDatasets:='[{"code":"desd-synthdata"}]' \
Expand Down
2 changes: 1 addition & 1 deletion tests/http/query-knn.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ http -v --verify=no -a admin:WoKeN --timeout 180 POST http://localhost:8087/mini
grouping:='[]' \
covariables:='[{"code":"score_math_course1"}]' \
covariablesMustExist:=true \
targetTable='sample_data' \
targetTable:='{"name":"sample_data"}' \
algorithm:='{"code":"knn", "name": "KNN", "parameters": [{"code": "k", "value": "2"}]}'
2 changes: 1 addition & 1 deletion tests/http/query-linear-regression.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ http -v --verify=no -a admin:WoKeN --timeout 180 POST http://localhost:8087/mini
grouping:='[]' \
covariables:='[{"code":"score_test1"}]' \
covariablesMustExist:=true \
targetTable='sample_data' \
targetTable:='{"name":"sample_data"}' \
algorithm:='{"code":"linearRegression", "name": "linearRegression", "parameters": []}'
2 changes: 1 addition & 1 deletion tests/http/query-summary.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ http -v --verify=no -a admin:WoKeN --timeout 180 POST http://localhost:8087/mini
grouping:='[]' \
covariables:='[{"code":"score_test1"}]' \
covariablesMustExist:=true \
targetTable='sample_data' \
targetTable:='{"name":"sample_data"}' \
algorithm:='{"code":"statisticsSummary", "name": "Statistics Summary", "parameters": []}'
2 changes: 1 addition & 1 deletion tests/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fi
if [[ $NO_SUDO || -n "$CIRCLECI" ]]; then
DOCKER="docker"
DOCKER_COMPOSE="docker-compose"
elif groups $USER | grep &>/dev/null '\bdocker\b'; then
elif groups "$USER" | grep &>/dev/null '\bdocker\b'; then
DOCKER="docker"
DOCKER_COMPOSE="docker-compose"
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@
"code": "sample_data"
}
],
"feedback": [],
"feedback": [
{
"msg": "Missing variables score_math_course1,score_math_course2",
"severity": "info"
}
],
"jobId": "*",
"node": "local",
"query": {
Expand All @@ -65,7 +70,11 @@
"covariablesMustExist": true,
"datasets": [],
"grouping": [],
"targetTable": "sample_data",
"targetTable": {
"database": "features",
"dbSchema": "public",
"name": "sample_data"
},
"user": {
"code": "test1"
},
Expand Down
Loading

0 comments on commit ed5efd0

Please sign in to comment.