Skip to content

Commit

Permalink
Improve feedback and validation tests for incoming queries
Browse files Browse the repository at this point in the history
  • Loading branch information
ludovicc committed Feb 17, 2019
1 parent ed5efd0 commit 1b3d253
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ case class DatabaseServices[F[_]: ConcurrentEffect: ContextShift: Timer](
}

private def tableFieldsShouldMatchMetadata(table: FeaturesTableService[F],
variables: VariablesMeta): F[Unit] =
variables: VariablesMeta): F[Unit] =
table
.validateFields(variables.allVariables())
.map { validation =>
Expand Down
157 changes: 94 additions & 63 deletions src/main/scala/ch/chuv/lren/woken/service/QueryToJobService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,17 @@ package ch.chuv.lren.woken.service
import java.util.UUID

import cats.data._
import cats.data.NonEmptyList._
import cats.data.Validated._
import cats.effect.{ Async, Effect }
import cats.implicits._
import ch.chuv.lren.woken.config.JobsConfiguration
import ch.chuv.lren.woken.core.features.Queries
import ch.chuv.lren.woken.core.features.Queries._
import ch.chuv.lren.woken.core.model._
import ch.chuv.lren.woken.core.model.jobs.{ ExperimentJob, _ }
import ch.chuv.lren.woken.cromwell.core.ConfigUtil.Validation
import ch.chuv.lren.woken.dao.VariablesMetaRepository
import ch.chuv.lren.woken.messages.datasets.TableId
import ch.chuv.lren.woken.messages.query._
import ch.chuv.lren.woken.messages.variables.{ VariableId, VariableMetaData }
import ch.chuv.lren.woken.messages.variables.{ FeatureIdentifier, VariableId, VariableMetaData }
import com.typesafe.scalalogging.LazyLogging
import shapeless.{ ::, HNil }

Expand Down Expand Up @@ -222,80 +219,114 @@ class QueryToJobServiceImpl[F[_]: Effect](
val jobId = UUID.randomUUID().toString
val featuresTable = query.targetTable.getOrElse(jobsConfiguration.defaultFeaturesTable)

def prepareFeedback(oldVars: FeatureIdentifiers,
def missingVariablesMessage(variableType: String, missing: FeatureIdentifiers): String = {
val missingFields = missing.map(_.id).mkString_(",")
val plural = if (missing.length > 1) "s" else ""
s"$variableType$plural $missingFields do not exist in node ${jobsConfiguration.node} and table ${featuresTable.name}"
}

def prepareFeedback(variableType: String,
requestedVars: FeatureIdentifiers,
existingVars: FeatureIdentifiers): UserFeedbacks =
oldVars
.intersect(existingVars)
requestedVars
.diff(existingVars)
.toNel
.fold[UserFeedbacks](Nil)(
missing => {
val missingFields = missing.map(Queries.toField).mkString_(",")
List(UserInfo(s"Missing variables $missingFields"))
val missingMsg = missingVariablesMessage(variableType, missing.toList)
List(UserInfo(missingMsg))
}
)

// Fetch the metadata for variables
variablesMetaService.get(featuresTable).map { variablesMetaO =>
val variablesMeta: Validation[VariablesMeta] = Validated.fromOption(
variablesMetaO,
NonEmptyList(s"Cannot find metadata for table ${featuresTable.toString}", Nil)
)

val validatedQueryWithFeedback: Validation[(Q, UserFeedbacks)] = variablesMeta.map { v =>
// Take only the covariables (and groupings) known to exist on the target table
val existingCovariables = v
.filterVariables { v: VariableId =>
query.covariables.contains(v)
}
.map(_.toId)
if (query.covariablesMustExist && (existingCovariables.size != query.covariables.size)) {
(query, prepareFeedback(existingCovariables, query.covariables))
} else {

val covariablesFeedback = prepareFeedback(query.covariables, existingCovariables)

val existingGroupings = v
.filterVariables { v: VariableId =>
query.grouping.contains(v)
variablesMeta
.andThen { vars =>
def filterExisting(requestedVars: FeatureIdentifiers): List[VariableId] =
vars
.filterVariables { v: VariableId =>
requestedVars.contains(v)
}
.map(_.toId)

def findMissing(
requestedVars: FeatureIdentifiers
): Option[NonEmptyList[FeatureIdentifier]] =
requestedVars.diff(filterExisting(requestedVars)).toNel

def ensureVariablesExists(variableType: String, requestedVars: FeatureIdentifiers)
: Validation[FeatureIdentifiers] =
findMissing(requestedVars).fold(requestedVars.validNel[String])(
missing =>
missingVariablesMessage(variableType, missing.toList).invalidNel[FeatureIdentifiers]
)

// Check the target variable
ensureVariablesExists("Variable", query.variables)
.andThen { _ =>
// Check corariables
if (query.covariablesMustExist)
ensureVariablesExists("Covariable", query.covariables)
.map(covars => (covars, List.empty[UserFeedback]))
else {
val existingCovars = filterExisting(query.covariables)
val covariablesFeedback =
prepareFeedback("Covariable", query.covariables, existingCovars)
if (query.covariables.nonEmpty && existingCovars.isEmpty)
covariablesFeedback.map(_.msg).mkString(",").invalidNel
else
(existingCovars, covariablesFeedback).validNel[String]
}
}
.andThen {
// Check groupings
case (existingCovariables, covariablesFeedback) =>
val existingGroupings = filterExisting(query.grouping)
val groupingsFeedback =
prepareFeedback("Grouping", query.grouping, existingGroupings)
val combinedFeedback = covariablesFeedback ++ groupingsFeedback
if (query.grouping.nonEmpty && existingGroupings.isEmpty)
groupingsFeedback.map(_.msg).mkString(",").invalidNel
else
(existingCovariables, existingGroupings, combinedFeedback).validNel[String]
}
.andThen {
// Update query
case (existingCovariables, existingGroupings, existanceFeedback) =>
// TODO: looks like a good use case for lenses
val updatedQuery: Q = query match {
case q: MiningQuery =>
q.copy(covariables = existingCovariables,
grouping = existingGroupings,
targetTable = Some(featuresTable))
.asInstanceOf[Q]
case q: ExperimentQuery =>
q.copy(covariables = existingCovariables,
grouping = existingGroupings,
targetTable = Some(featuresTable))
.asInstanceOf[Q]
}

(updatedQuery, existanceFeedback).validNel[String]
}
.map(_.toId)
val groupingsFeedback = prepareFeedback(query.grouping, existingGroupings)

val feedback: UserFeedbacks = covariablesFeedback ++ groupingsFeedback

// TODO: looks like a good use case for lenses
val updatedQuery: Q = query match {
case q: MiningQuery =>
q.copy(covariables = existingCovariables,
grouping = existingGroupings,
targetTable = Some(featuresTable))
.asInstanceOf[Q]
case q: ExperimentQuery =>
q.copy(covariables = existingCovariables,
grouping = existingGroupings,
targetTable = Some(featuresTable))
.asInstanceOf[Q]
}

(updatedQuery, feedback)
}
}

val validatedQuery: Validation[Q] = validatedQueryWithFeedback.map(_._1)

val mq: Validation[(VariablesMeta, Q)] =
(variablesMeta, validatedQuery) mapN Tuple2.apply

val metadata: Validation[List[VariableMetaData]] = mq.andThen {
case (v, q) =>
v.selectVariables(q.allVars)
}

val feedback: UserFeedbacks = validatedQueryWithFeedback.map(_._2).getOrElse(Nil)

(metadata, validatedQuery) mapN Tuple2.apply map {
case (m, q) =>
jobId :: featuresTable :: m :: q :: feedback :: HNil
}
.andThen {
case (q, feedback) =>
variablesMeta
.andThen { meta =>
// Get the metadata associated with the variables
meta.selectVariables(q.allVars)
}
.map { m =>
// Build the PreparedQuery
jobId :: featuresTable :: m :: q :: feedback :: HNil
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ import scala.language.postfixOps
import ch.chuv.lren.woken.config.{ AlgorithmsConfiguration, JobsConfiguration }
import ch.chuv.lren.woken.core.model.AlgorithmDefinition
import ch.chuv.lren.woken.dao.VariablesMetaRepository

import ExperimentQuerySupport._

import scala.collection.immutable.TreeSet
import ch.chuv.lren.woken.config.ConfigurationInstances._
import org.scalamock.scalatest.MockFactory

/**
* Experiment flow should always complete with success, but the error is reported inside the response.
Expand All @@ -53,6 +53,7 @@ class LocalExperimentServiceTest
with ValidatedMatchers
with ValidatedValues
with BeforeAndAfterAll
with MockFactory
with JsonUtils
with LazyLogging {

Expand Down Expand Up @@ -80,8 +81,11 @@ class LocalExperimentServiceTest
val queryToJobService: QueryToJobService[IO] =
QueryToJobService[IO](featuresService, variablesMetaService, jobsConf, algorithmLookup)

val algorithmExecutor: AlgorithmExecutor[IO] = mock[AlgorithmExecutor[IO]]
(algorithmExecutor.node _).expects().anyNumberOfTimes().returns("local")

lazy val service: LocalExperimentService[IO] =
LocalExperimentService[IO](TestServices.algorithmExecutor,
LocalExperimentService[IO](algorithmExecutor,
TestServices.wokenWorker,
featuresService,
TestServices.jobResultService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class QueryToJobServiceTest
val query = MiningQuery(
user = user,
variables = List(VariableId("score_test1")),
covariables = List(VariableId("lefthippocampus")),
covariables = List(VariableId("stress_before_test1")),
covariablesMustExist = false,
grouping = Nil,
filters = None,
Expand Down Expand Up @@ -170,15 +170,36 @@ class QueryToJobServiceTest
val maybeJob = queryToJobService.miningQuery2Job(query).unsafeRunSync()

maybeJob shouldBe invalid
maybeJob.invalidValue.head shouldBe "Found 1 out of 2 variables. Missing unknown"
maybeJob.invalidValue.head shouldBe "Variable unknown do not exist in node testNode and table sample_data"
maybeJob.invalidValue.size shouldBe 1
}

"fail when the covariable is unknown yet must exist" in {
"fail when no covariables can be found" in {
val query = MiningQuery(
user = user,
variables = List(VariableId("score_test1")),
covariables = List(VariableId("unknown")),
covariables = List(VariableId("unknown1"), VariableId("unknown2")),
covariablesMustExist = false,
grouping = Nil,
filters = None,
targetTable = Some(sampleDataTableId),
algorithm = AlgorithmSpec("knn", Nil, None),
datasets = TreeSet(),
executionPlan = None
)

val maybeJob = queryToJobService.miningQuery2Job(query).unsafeRunSync()

maybeJob shouldBe invalid
maybeJob.invalidValue.head shouldBe "Covariables unknown1,unknown2 do not exist in node testNode and table sample_data"
maybeJob.invalidValue.size shouldBe 1
}

"fail when a covariable is unknown yet all must exist" in {
val query = MiningQuery(
user = user,
variables = List(VariableId("score_test1")),
covariables = List(VariableId("stress_before_test1"), VariableId("unknown")),
covariablesMustExist = true,
grouping = Nil,
filters = None,
Expand All @@ -191,7 +212,7 @@ class QueryToJobServiceTest
val maybeJob = queryToJobService.miningQuery2Job(query).unsafeRunSync()

maybeJob shouldBe invalid
maybeJob.invalidValue.head shouldBe "Found 1 out of 2 variables. Missing unknown"
maybeJob.invalidValue.head shouldBe "Covariable unknown do not exist in node testNode and table sample_data"
maybeJob.invalidValue.size shouldBe 1
}

Expand Down Expand Up @@ -243,7 +264,7 @@ class QueryToJobServiceTest

job.query.sql shouldBe """SELECT "score_test1","stress_before_test1" FROM "sample_data" WHERE "score_test1" IS NOT NULL AND "stress_before_test1" IS NOT NULL"""

feedback shouldBe List(UserInfo("Missing variables stress_before_test1"))
feedback shouldBe Nil
}

"create a DockerJob for a kNN algorithm on a table with several datasets" in {
Expand Down Expand Up @@ -293,7 +314,7 @@ class QueryToJobServiceTest

job.query.sql shouldBe """SELECT "apoe4","lefthippocampus" FROM "cde_features_a" WHERE "apoe4" IS NOT NULL AND "lefthippocampus" IS NOT NULL AND "dataset" IN ('desd-synthdata')"""

feedback shouldBe List(UserInfo("Missing variables lefthippocampus"))
feedback shouldBe Nil
}

"drop the unknown covariables that do not need to exist" in {
Expand Down Expand Up @@ -340,7 +361,9 @@ class QueryToJobServiceTest

job.query.sql shouldBe """SELECT "apoe4","lefthippocampus" FROM "cde_features_a" WHERE "apoe4" IS NOT NULL AND "lefthippocampus" IS NOT NULL AND "dataset" IN ('desd-synthdata')"""

feedback shouldBe List(UserInfo("Missing variables lefthippocampus"))
feedback shouldBe List(
UserInfo("Covariable unknown do not exist in node testNode and table cde_features_a")
)
}

"create a ValidationJob for a validation algorithm" in {
Expand Down Expand Up @@ -372,7 +395,7 @@ class QueryToJobServiceTest
'metadata (List(CdeVariables.apoe4, CdeVariables.leftHipocampus))
)

feedback shouldBe List(UserInfo("Missing variables lefthippocampus"))
feedback shouldBe Nil
}
}

Expand Down Expand Up @@ -448,7 +471,7 @@ class QueryToJobServiceTest
queryToJobService.experimentQuery2Job(query).unsafeRunSync()

maybeJob shouldBe invalid
maybeJob.invalidValue.head shouldBe "Found 1 out of 2 variables. Missing unknown"
maybeJob.invalidValue.head shouldBe "Variable unknown do not exist in node testNode and table cde_features_a"
maybeJob.invalidValue.size shouldBe 1
}

Expand All @@ -473,7 +496,7 @@ class QueryToJobServiceTest
queryToJobService.experimentQuery2Job(query).unsafeRunSync()

maybeJob shouldBe invalid
maybeJob.invalidValue.head shouldBe "Found 1 out of 2 variables. Missing unknown"
maybeJob.invalidValue.head shouldBe "Covariable unknown do not exist in node testNode and table cde_features_a"
maybeJob.invalidValue.size shouldBe 1
}

Expand Down

0 comments on commit 1b3d253

Please sign in to comment.