diff --git a/database/src/main/postgres/runs/V1.9.2__write_checkpoint_v2.sql b/database/src/main/postgres/runs/V1.9.2__write_checkpoint_v2.sql new file mode 100644 index 000000000..ba5373b0b --- /dev/null +++ b/database/src/main/postgres/runs/V1.9.2__write_checkpoint_v2.sql @@ -0,0 +1,87 @@ +CREATE OR REPLACE FUNCTION runs.write_checkpoint_v2( + IN i_partitioning_id BIGINT, + IN i_id_checkpoint UUID, + IN i_checkpoint_name TEXT, + IN i_process_start_time TIMESTAMP WITH TIME ZONE, + IN i_process_end_time TIMESTAMP WITH TIME ZONE, + IN i_measurements JSONB[], + IN i_measured_by_atum_agent BOOLEAN, + IN i_by_user TEXT, + OUT status INTEGER, + OUT status_text TEXT +) RETURNS record AS +$$ + ------------------------------------------------------------------------------- +-- +-- Function: runs.write_checkpoint_v2(10) +-- Creates a checkpoint and adds all the measurements that it consists of +-- +-- Parameters: +-- i_partitioning_id - ID of the partitioning the measure belongs to +-- i_id_checkpoint - reference to the checkpoint this measure belongs into +-- i_checkpoint_name - name of the checkpoint +-- i_process_start_time - the start of processing (measuring) of the checkpoint +-- i_process_end_time - the end of the processing (measuring) of the checkpoint +-- i_measurements - array of JSON objects of the following format (values of the keys are examples only) +-- { +-- "measure": { +-- "measureName": "count", +-- "measuredColumns": ["a","b"] +-- }, +-- "result": { +-- whatever here +-- } +-- } +-- i_measured_by_atum_agent - flag it the checkpoint was measured by Atum or data provided by user +-- i_by_user - user behind the change +-- +-- Returns: +-- status - Status code +-- status_text - Status text +-- +-- Status codes: +-- 11 - Checkpoint created +-- 31 - Conflict, checkpoint already present +-- +------------------------------------------------------------------------------- +BEGIN + + PERFORM 1 + FROM runs.checkpoints CP + WHERE CP.id_checkpoint = i_id_checkpoint; + + IF found THEN + status := 31; + status_text := 'Checkpoint already present'; + RETURN; + END IF; + + INSERT INTO runs.checkpoints (id_checkpoint, fk_partitioning, + checkpoint_name, measured_by_atum_agent, + process_start_time, process_end_time, created_by) + VALUES (i_id_checkpoint, i_partitioning_id, + i_checkpoint_name, i_measured_by_atum_agent, + i_process_start_time, i_process_end_time, i_by_user); + + -- maybe could use `jsonb_populate_record` function to be little bit more effective + PERFORM runs._write_measurement( + i_id_checkpoint, + i_partitioning_id, + UN.measurement->'measure'->>'measureName', + jsonb_array_to_text_array(UN.measurement->'measure'->'measuredColumns'), + UN.measurement->'result', + i_by_user + ) + FROM ( + SELECT unnest(i_measurements) AS measurement + ) UN; + + status := 11; + status_text := 'Checkpoint created'; + RETURN; +END; +$$ + LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +ALTER FUNCTION runs.write_checkpoint(BIGINT, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) OWNER TO atum_owner; +GRANT EXECUTE ON FUNCTION runs.write_checkpoint(BIGINT, UUID, TEXT, TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH TIME ZONE, JSONB[], BOOLEAN, TEXT) TO atum_user; \ No newline at end of file diff --git a/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointV2IntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointV2IntegrationTests.scala new file mode 100644 index 000000000..cb386287d --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/runs/WriteCheckpointV2IntegrationTests.scala @@ -0,0 +1,264 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.runs + +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString +import za.co.absa.balta.classes.setter.CustomDBType + +import java.time.OffsetDateTime +import java.util.UUID + +class WriteCheckpointV2IntegrationTests extends DBTestSuite { + + private val partitioning = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["key1", "key3", "key2", "key4"], + | "keysToValues": { + | "key1": "valueX", + | "key2": "valueY", + | "key3": "valueZ", + | "key4": "valueA" + | } + |} + |""".stripMargin + ) + + test("Write new checkpoint without data") { + val uuid = UUID.randomUUID + val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") + val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") + + + table("runs.partitionings").insert( + add("partitioning", partitioning) + .add("created_by", "John von Neumann") + ) + //DBTable's insert doesn't return the values yet correctly + val fkPartitioning: Long = table("runs.partitionings").fieldValue("partitioning", partitioning, "id_partitioning").get.get + + table("runs.measure_definitions").insert( + add("fk_partitioning", fkPartitioning) + .add("measure_name", "avg") + .add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]")) + .add("created_by", "Aristoteles") + ) + + assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0) + + function("runs.write_checkpoint_v2") + .setParam("i_partitioning_id", fkPartitioning) + .setParam("i_id_checkpoint", uuid) + .setParam("i_checkpoint_name", "Empty path") + .setParam("i_process_start_time", startTime) + .setParam("i_process_end_time", endTime) + .setParam("i_measurements", CustomDBType("{}", "JSONB[]")) + .setParam("i_measured_by_atum_agent", true) + .setParam("i_by_user", "J. Robert Oppenheimer") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Checkpoint created")) + } + + assert(table("runs.measure_definitions").count(add("fk_partitioning", fkPartitioning)) == 1) + assert(table("runs.measurements").count(add("fk_checkpoint", uuid)) == 0) + assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 1) + table("runs.checkpoints").where(add("fk_partitioning", fkPartitioning)) {resultSet => + val row = resultSet.next() + assert(row.getString("checkpoint_name").contains("Empty path")) + assert(row.getOffsetDateTime("process_start_time").contains(startTime)) + assert(row.getOffsetDateTime("process_end_time").contains(endTime)) + assert(row.getBoolean("measured_by_atum_agent").contains(true)) + assert(row.getString("created_by").contains("J. Robert Oppenheimer")) + assert(row.getOffsetDateTime("created_at").contains(now())) + } + + } + + test("Write new checkpoint"){ + val uuid = UUID.randomUUID + val user = "Franz Kafka" + val startTime = OffsetDateTime.parse("1992-08-03T10:00:00Z") + val endTime = OffsetDateTime.parse("2022-11-05T08:00:00Z") + val measurements = + """ + |{ + | "{ + | \"measure\": { + | \"measureName\": \"count\", + | \"measuredColumns\": [] + | }, + | \"result\":{ + | \"value\":\"3\", + | \"type\":\"int\" + | } + | }", + | "{ + | \"measure\": { + | \"measureName\": \"avg\", + | \"measuredColumns\": [\"col1\"] + | }, + | \"result\":{ + | \"value\":\"3.14\", + | \"type\":\"double\" + | } + | }", + | "{ + | \"measure\": { + | \"measureName\": \"avg\", + | \"measuredColumns\": [\"a\",\"b\"] + | }, + | \"result\":{ + | \"value\":\"2.71\", + | \"type\":\"double\" + | } + | }" + |} + |""".stripMargin + + table("runs.partitionings").insert( + add("partitioning", partitioning) + .add("created_by", user) + ) + //DBTable's insert doesn't return the values yet correctly + val fkPartitioning: Long = table("runs.partitionings").fieldValue("partitioning", partitioning, "id_partitioning").get.get + + table("runs.measure_definitions").insert( + add("fk_partitioning", fkPartitioning) + .add("measure_name", "avg") + .add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]")) + .add("created_by", "Aristoteles") + ) + + assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 0) + + function("runs.write_checkpoint_v2") + .setParam("i_partitioning", fkPartitioning) + .setParam("i_id_checkpoint", uuid) + .setParam("i_checkpoint_name", "Happy path") + .setParam("i_process_start_time", startTime) + .setParam("i_process_end_time", endTime) + .setParam("i_measurements", CustomDBType(measurements, "JSONB[]")) + .setParam("i_measured_by_atum_agent", false) + .setParam("i_by_user", user) + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(11)) + assert(row.getString("status_text").contains("Checkpoint created")) + } + + assert(table("runs.checkpoints").count(add("fk_partitioning", fkPartitioning)) == 1) + assert(table("runs.measure_definitions").count(add("fk_partitioning", fkPartitioning)) == 3) + assert(table("runs.measurements").count(add("fk_checkpoint", uuid)) == 3) + table("runs.checkpoints").where(add("fk_partitioning", fkPartitioning)) { resultSet => + val row = resultSet.next() + assert(row.getString("checkpoint_name").contains("Happy path")) + assert(row.getOffsetDateTime("process_start_time").contains(startTime)) + assert(row.getOffsetDateTime("process_end_time").contains(endTime)) + assert(row.getBoolean("measured_by_atum_agent").contains(false)) + assert(row.getString("created_by").contains(user)) + assert(row.getOffsetDateTime("created_at").contains(now())) + } + + val measureDefinitionIds = table("runs.measure_definitions") + .where(add("fk_partitioning", fkPartitioning),"id_measure_definition") { resultSet => + val row1 = resultSet.next() + val result1: Vector[Long] = Vector(row1.getLong("id_measure_definition").get) + assert(row1.getString("measure_name").contains("avg")) + assert(row1.getArray[String]("measured_columns").map(_.toList).contains(List("col1"))) + assert(row1.getString("created_by").contains("Aristoteles")) + assert(row1.getOffsetDateTime("created_at").contains(now())) + val row2 = resultSet.next() + val result2: Vector[Long] = result1 :+ row2.getLong("id_measure_definition").get + assert(row2.getString("measure_name").contains("count")) + assert(row2.getArray[String]("measured_columns").map(_.toList).contains(List.empty)) + assert(row2.getString("created_by").contains(user)) + assert(row2.getOffsetDateTime("created_at").contains(now())) + val row3 = resultSet.next() + val result3: Vector[Long] = result2 :+ row3.getLong("id_measure_definition").get + assert(row3.getString("measure_name").contains("avg")) + assert(row3.getArray[String]("measured_columns").map(_.toList).contains(List("a", "b"))) + assert(row3.getString("created_by").contains(user)) + assert(row3.getOffsetDateTime("created_at").contains(now())) + result3 + } + table("runs.measurements").where(add("fk_checkpoint", uuid), "id_measurement") { resultSet => + val row1 = resultSet.next() + // because measure definition of `count` was created only after the manual enter of the `avg`, it's actually only + // second in the list + assert(row1.getLong("fk_measure_definition").contains(measureDefinitionIds(1))) + assert(row1.getJsonB("measurement_value").contains(JsonBString("""{"type": "int", "value": "3"}"""))) + val row2 = resultSet.next() + assert(row2.getLong("fk_measure_definition").contains(measureDefinitionIds(0))) + assert(row2.getJsonB("measurement_value").contains(JsonBString("""{"type": "double", "value": "3.14"}"""))) + val row3 = resultSet.next() + assert(row3.getLong("fk_measure_definition").contains(measureDefinitionIds(2))) + assert(row3.getJsonB("measurement_value").contains(JsonBString("""{"type": "double", "value": "2.71"}"""))) + } + } + + test("Checkpoint already exists") { + val uuid = UUID.randomUUID + val origAuthor = "John von Neumann" + table("runs.partitionings").insert( + add("partitioning", partitioning) + .add("created_by", origAuthor) + ) + + //DBTable's insert doesn't return the values yet correctly + val fkPartitioning: Long = table("runs.partitionings").fieldValue("partitioning", partitioning, "id_partitioning").get.get + + table("runs.checkpoints").insert( + add("id_checkpoint", uuid) + .add("fk_partitioning", fkPartitioning) + .add("checkpoint_name", "I came before") + .add("process_start_time", now()) + .add("process_end_time", now()) + .add("measured_by_atum_agent", false) + .add("created_by", origAuthor) + ) + + function("runs.write_checkpoint_v2") + .setParam("i_partitioning", fkPartitioning) + .setParam("i_id_checkpoint", uuid) + .setParam("i_checkpoint_name", "Won't go in") + .setParam("i_process_start_time", now()) + .setParamNull("i_process_end_time") + .setParamNull("i_measurements") + .setParam("i_measured_by_atum_agent", true) + .setParam("i_by_user", "J. Robert Oppenheimer") + .execute { queryResult => + assert(queryResult.hasNext) + val row = queryResult.next() + assert(row.getInt("status").contains(31)) + assert(row.getString("status_text").contains("Checkpoint already present")) + } + + table("runs.checkpoints").where(add("id_checkpoint", uuid)){queryResult => + val row = queryResult.next() + assert(row.getString("checkpoint_name").contains("I came before")) + assert(row.getBoolean("measured_by_atum_agent").contains(false)) + assert(row.getString("created_by").contains(origAuthor)) + } + } + +} diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index 19ab07fb8..5a07279b6 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -56,6 +56,7 @@ object Main extends ZIOAppDefault with Server { CreateOrUpdateAdditionalData.layer, GetPartitioningCheckpoints.layer, WriteCheckpoint.layer, + WriteCheckpointV2.layer, GetFlowCheckpoints.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala index 88c133487..1a97b6b9b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala @@ -16,8 +16,9 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.server.api.exception.ServiceError -import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse} +import za.co.absa.atum.server.api.exception.{ConflictServiceError, GeneralServiceError, ServiceError} +import za.co.absa.atum.server.config.SslConfig +import za.co.absa.atum.server.model.{ConflictErrorResponse, ErrorResponse, InternalServerErrorResponse} import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} import zio._ @@ -29,8 +30,9 @@ trait BaseController { ): IO[ErrorResponse, B] = { serviceCall - .mapError { serviceError: ServiceError => - InternalServerErrorResponse(serviceError.message) + .mapError { + case ConflictServiceError(message) => ConflictErrorResponse(message) + case GeneralServiceError(message) => InternalServerErrorResponse(message) } .flatMap { result => ZIO.succeed(onSuccessFnc(result)) @@ -49,4 +51,22 @@ trait BaseController { ): IO[ErrorResponse, MultiSuccessResponse[A]] = { effect.map(MultiSuccessResponse(_)) } + + protected def createResourceUri(parts: Seq[String]): IO[ErrorResponse, String] = { + for { + hostname <- System.env("HOSTNAME") + .orElseFail(InternalServerErrorResponse("Failed to get hostname")) + .flatMap { + case Some(value) => ZIO.succeed(value) + case None => ZIO.fail(InternalServerErrorResponse("Failed to get hostname")) + } + sslConfig <- ZIO.config[SslConfig](SslConfig.config) + .orElseFail(InternalServerErrorResponse("Failed to get SSL config")) + } yield { + val protocol = if (sslConfig.enabled) "https" else "http" + val port = if (sslConfig.enabled) 8443 else 8080 + val path = parts.mkString("/") + s"$protocol://$hostname:$port/$path" + } + } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala index 6b547c3cb..d4bb2229d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala @@ -27,8 +27,9 @@ trait CheckpointController { def createCheckpointV1(checkpointDTO: CheckpointDTO): IO[ErrorResponse, CheckpointDTO] - def createCheckpointV2( + def postCheckpointV2( + partitioningId: Long, checkpointDTO: CheckpointDTO - ): IO[ErrorResponse, SingleSuccessResponse[CheckpointDTO]] + ): IO[ErrorResponse, (SingleSuccessResponse[CheckpointDTO], String)] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala index b64c825a7..44fc93372 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala @@ -17,6 +17,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto.CheckpointDTO +import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.api.service.CheckpointService import za.co.absa.atum.server.model.ErrorResponse import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse @@ -33,12 +34,22 @@ class CheckpointControllerImpl(checkpointService: CheckpointService) extends Che ) } - override def createCheckpointV2( + override def postCheckpointV2( + partitioningId: Long, checkpointDTO: CheckpointDTO - ): IO[ErrorResponse, SingleSuccessResponse[CheckpointDTO]] = { - mapToSingleSuccessResponse(createCheckpointV1(checkpointDTO)) + ): IO[ErrorResponse, (SingleSuccessResponse[CheckpointDTO], String)] = { + for { + response <- mapToSingleSuccessResponse( + serviceCall[Unit, CheckpointDTO]( + checkpointService.saveCheckpointV2(partitioningId, checkpointDTO), + _ => checkpointDTO + ) + ) + uri <- createResourceUri( + Seq(V2Paths.Partitionings, partitioningId.toString, V2Paths.Checkpoints, checkpointDTO.id.toString) + ) + } yield (response, uri) } - } object CheckpointControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2.scala new file mode 100644 index 000000000..f440ec6f7 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointV2.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.runs.functions + +import doobie.implicits.toSqlInterpolator +import za.co.absa.atum.server.model.WriteCheckpointV2Args +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.runs.Runs +import zio._ +import io.circe.syntax._ + +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbArrayPut +import doobie.postgres.implicits._ + +class WriteCheckpointV2(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieSingleResultFunctionWithStatus[WriteCheckpointV2Args, Unit, Task](args => + Seq( + fr"${args.partitioningId}", + fr"${args.checkpointDTO.id}", + fr"${args.checkpointDTO.name}", + fr"${args.checkpointDTO.processStartTime}", + fr"${args.checkpointDTO.processEndTime}", + fr"${args.checkpointDTO.measurements.toList.map(_.asJson)}", + fr"${args.checkpointDTO.measuredByAtumAgent}", + fr"${args.checkpointDTO.author}" + ) + ) + with StandardStatusHandling + +object WriteCheckpointV2 { + val layer: URLayer[PostgresDatabaseProvider, WriteCheckpointV2] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new WriteCheckpointV2()(Runs, dbProvider.dbEngine) + } +} + diff --git a/server/src/main/scala/za/co/absa/atum/server/api/exception/AppError.scala b/server/src/main/scala/za/co/absa/atum/server/api/exception/AppError.scala index c4b129b94..106d647f2 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/exception/AppError.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/exception/AppError.scala @@ -20,5 +20,11 @@ sealed trait AppError extends Throwable { def message: String } -case class DatabaseError(message: String) extends AppError -case class ServiceError(message: String) extends AppError +sealed trait DatabaseError extends AppError +case class GeneralDatabaseError(message: String) extends DatabaseError +case class ConflictDatabaseError(message: String) extends DatabaseError + +sealed trait ServiceError extends AppError + +case class GeneralServiceError(message: String) extends ServiceError +case class ConflictServiceError(message: String) extends ServiceError diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala index 3ef8edfb1..04b3c228a 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala @@ -19,12 +19,7 @@ package za.co.absa.atum.server.api.http import sttp.model.StatusCode import sttp.tapir.generic.auto.schemaForCaseClass import sttp.tapir.json.circe.jsonBody -import za.co.absa.atum.server.model.{ - BadRequestResponse, - ErrorResponse, - GeneralErrorResponse, - InternalServerErrorResponse -} +import za.co.absa.atum.server.model.{BadRequestResponse, ConflictErrorResponse, ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse} import sttp.tapir.typelevel.MatchType import sttp.tapir.ztapir._ import sttp.tapir.{EndpointOutput, PublicEndpoint} @@ -36,6 +31,13 @@ trait BaseEndpoints { implicit val uuidMatchType: MatchType[UUID] = (a: Any) => a.isInstanceOf[UUID] + protected val conflictErrorOneOfVariant: EndpointOutput.OneOfVariant[ConflictErrorResponse] = { + oneOfVariantFromMatchType( + StatusCode.Conflict, + jsonBody[ConflictErrorResponse] + ) + } + private val badRequestOneOfVariant: EndpointOutput.OneOfVariant[BadRequestResponse] = { oneOfVariantFromMatchType( StatusCode.BadRequest, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index f98f73150..78800ae37 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -25,7 +25,7 @@ import za.co.absa.atum.server.Constants.Endpoints._ import za.co.absa.atum.server.model.ErrorResponse import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} import sttp.tapir.{PublicEndpoint, endpoint} -import za.co.absa.atum.server.api.http.ApiPaths.V2Paths +import za.co.absa.atum.server.api.http.ApiPaths.{V1Paths, V2Paths} trait Endpoints extends BaseEndpoints { @@ -37,15 +37,6 @@ trait Endpoints extends BaseEndpoints { .out(jsonBody[CheckpointDTO]) } -// protected val createCheckpointEndpointV2 -// : PublicEndpoint[CheckpointDTO, ErrorResponse, SingleSuccessResponse[CheckpointDTO], Any] = { -// apiV2.post -// .in(CreateCheckpoint) -// .in(jsonBody[CheckpointDTO]) -// .out(statusCode(StatusCode.Created)) -// .out(jsonBody[SingleSuccessResponse[CheckpointDTO]]) -// } - protected val postCheckpointEndpointV2 : PublicEndpoint[(Long, CheckpointDTO), ErrorResponse, (SingleSuccessResponse[CheckpointDTO], String), Any] = { apiV2.post @@ -54,6 +45,7 @@ trait Endpoints extends BaseEndpoints { .out(statusCode(StatusCode.Created)) .out(jsonBody[SingleSuccessResponse[CheckpointDTO]]) .out(header[String]("Location")) + .errorOutVariantPrepend(conflictErrorOneOfVariant) } protected val createPartitioningEndpointV1 diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 1da88efce..af229a453 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -19,14 +19,19 @@ package za.co.absa.atum.server.api.http import cats.syntax.semigroupk._ import org.http4s.HttpRoutes import sttp.tapir.PublicEndpoint +import sttp.tapir.model.ServerRequest import sttp.tapir.server.http4s.Http4sServerInterpreter import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ +import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.Constants.{SwaggerApiName, SwaggerApiVersion} -import za.co.absa.atum.server.api.controller.{CheckpointController, PartitioningController, FlowController} +import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} +import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} +import za.co.absa.atum.server.model.ErrorResponse +import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse import zio._ import zio.interop.catz._ import zio.metrics.connectors.prometheus.PrometheusPublisher @@ -39,7 +44,16 @@ trait Routes extends Endpoints with ServerOptions { } val endpoints = List( createServerEndpoint(createCheckpointEndpointV1, CheckpointController.createCheckpointV1), - createServerEndpoint(createCheckpointEndpointV2, CheckpointController.createCheckpointV2), + createServerEndpoint[ + (Long, CheckpointDTO), + ErrorResponse, + (SingleSuccessResponse[CheckpointDTO], String) + ]( + postCheckpointEndpointV2, + { case (partitioningId: Long, checkpointDTO: CheckpointDTO) => + CheckpointController.postCheckpointV2(partitioningId, checkpointDTO) + } + ), createServerEndpoint(createPartitioningEndpointV1, PartitioningController.createPartitioningIfNotExistsV1), createServerEndpoint(createPartitioningEndpointV2, PartitioningController.createPartitioningIfNotExistsV2), createServerEndpoint( @@ -59,7 +73,7 @@ trait Routes extends Endpoints with ServerOptions { private def createSwaggerRoutes: HttpRoutes[HttpEnv.F] = { val endpoints = List( createCheckpointEndpointV1, - createCheckpointEndpointV2, + postCheckpointEndpointV2, createPartitioningEndpointV1, createPartitioningEndpointV2, createOrUpdateAdditionalDataEndpointV2, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala index 9f9764f91..c9ce33570 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala @@ -16,8 +16,8 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.db.fadb.exceptions.StatusException +import za.co.absa.atum.server.api.exception.{ConflictDatabaseError, DatabaseError, GeneralDatabaseError} +import za.co.absa.db.fadb.exceptions.{DataConflictException, StatusException} import za.co.absa.db.fadb.status.{FailedOrRow, FailedOrRows} import zio._ @@ -32,7 +32,7 @@ trait BaseRepository { case Left(statusException: StatusException) => ZIO.logError( s"Exception caused by operation: '$operationName': " + - s"(${statusException.status.statusCode}), ${statusException.status.statusText}" + s"(${statusException.status.statusCode}) ${statusException.status.statusText}" ) case Right(_) => ZIO.logDebug(s"Operation '$operationName' succeeded in database") } @@ -40,12 +40,15 @@ trait BaseRepository { private def defaultErrorHandler(operationName: String): PartialFunction[Throwable, DatabaseError] = { case statusException: StatusException => - DatabaseError( - s"Exception caused by operation: '$operationName': " + - s"(${statusException.status.statusCode}) ${statusException.status.statusText}" - ) + val message = s"Exception caused by operation: '$operationName': " + + s"(${statusException.status.statusCode}) ${statusException.status.statusText}" + + statusException match { + case DataConflictException(_) => ConflictDatabaseError(message) + case _ => GeneralDatabaseError(message) + } case error => - DatabaseError(s"Operation '$operationName' failed with unexpected error: ${error.getMessage}") + GeneralDatabaseError(s"Operation '$operationName' failed with unexpected error: ${error.getMessage}") } def dbSingleResultCallWithStatus[R](dbFuncCall: Task[FailedOrRow[R]], operationName: String): IO[DatabaseError, R] = { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala index 59c33d1b6..a2237a98b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala @@ -24,5 +24,5 @@ import zio.macros.accessible @accessible trait CheckpointRepository { def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] - + def writeCheckpointV2(partitioningId: Long, checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala index e63d92c9f..9d117c663 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala @@ -17,23 +17,33 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.CheckpointDTO -import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpoint +import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpoint, WriteCheckpointV2} import za.co.absa.atum.server.api.exception.DatabaseError +import za.co.absa.atum.server.model.WriteCheckpointV2Args import zio._ import zio.interop.catz.asyncInstance -class CheckpointRepositoryImpl(writeCheckpointFn: WriteCheckpoint) extends CheckpointRepository with BaseRepository { +class CheckpointRepositoryImpl(writeCheckpointFn: WriteCheckpoint, writeCheckpointV2Fn: WriteCheckpointV2) + extends CheckpointRepository + with BaseRepository { override def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] = { dbSingleResultCallWithStatus(writeCheckpointFn(checkpointDTO), "writeCheckpoint") } + override def writeCheckpointV2(partitioningId: Long, checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] = { + dbSingleResultCallWithStatus( + writeCheckpointV2Fn(WriteCheckpointV2Args(partitioningId, checkpointDTO)), + "writeCheckpoint" + ) + } } object CheckpointRepositoryImpl { - val layer: URLayer[WriteCheckpoint, CheckpointRepository] = ZLayer { + val layer: URLayer[WriteCheckpoint with WriteCheckpointV2, CheckpointRepository] = ZLayer { for { writeCheckpoint <- ZIO.service[WriteCheckpoint] - } yield new CheckpointRepositoryImpl(writeCheckpoint) + writeCheckpointV2 <- ZIO.service[WriteCheckpointV2] + } yield new CheckpointRepositoryImpl(writeCheckpoint, writeCheckpointV2) } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala index 680dcc22d..0ca695299 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala @@ -16,16 +16,21 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError} +import za.co.absa.atum.server.api.exception._ import zio._ trait BaseService { def repositoryCall[R](repositoryCall: IO[DatabaseError, R], operationName: String): IO[ServiceError, R] = { repositoryCall - .mapError { case DatabaseError(message) => - ServiceError(s"Failed to perform '$operationName': $message") + .mapError { + case ConflictDatabaseError(message) => ConflictServiceError(createMessage(operationName, message)) + case GeneralDatabaseError(message) => GeneralServiceError(createMessage(operationName, message)) } } + private def createMessage(operationName: String, message: String): String = { + s"Failed to perform '$operationName': $message" + } + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala index a38811890..ce026ff05 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala @@ -24,5 +24,5 @@ import zio.macros.accessible @accessible trait CheckpointService { def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Unit] - + def saveCheckpointV2(partitioningId: Long, checkpointDTO: CheckpointDTO): IO[ServiceError, Unit] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala index aae123ea4..f8e772c2c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala @@ -30,6 +30,13 @@ class CheckpointServiceImpl(checkpointRepository: CheckpointRepository) extends ) } + override def saveCheckpointV2(partitioningId: Long, checkpointDTO: CheckpointDTO): IO[ServiceError, Unit] = { + repositoryCall( + checkpointRepository.writeCheckpointV2(partitioningId, checkpointDTO), + "saveCheckpoint" + ) + } + } object CheckpointServiceImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala index 5f788b86c..f768a3262 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ -import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.api.exception.{GeneralServiceError, ServiceError} import za.co.absa.atum.server.api.repository.FlowRepository import za.co.absa.atum.server.model.CheckpointFromDB import zio._ @@ -33,7 +33,7 @@ class FlowServiceImpl(flowRepository: FlowRepository) extends FlowService with B checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => ZIO .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) - .mapError(error => ServiceError(error.getMessage)) + .mapError(error => GeneralServiceError(error.getMessage)) } } yield checkpointDTOs } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index a4978718d..e4617879f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ -import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.api.exception.{GeneralServiceError, ServiceError} import za.co.absa.atum.server.api.repository.PartitioningRepository import za.co.absa.atum.server.model.CheckpointFromDB import zio._ @@ -65,7 +65,7 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => ZIO .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) - .mapError(error => ServiceError(error.getMessage)) + .mapError(error => GeneralServiceError(error.getMessage)) } } yield checkpointDTOs diff --git a/server/src/main/scala/za/co/absa/atum/server/model/WriteCheckpointV2Args.scala b/server/src/main/scala/za/co/absa/atum/server/model/WriteCheckpointV2Args.scala new file mode 100644 index 000000000..1a7a80828 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/model/WriteCheckpointV2Args.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.model + +import za.co.absa.atum.model.dto.CheckpointDTO + +case class WriteCheckpointV2Args(partitioningId: Long, checkpointDTO: CheckpointDTO) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointUnitTests.scala index 0621d7ca4..11d5eeec6 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointUnitTests.scala @@ -36,17 +36,17 @@ object CreateCheckpointEndpointUnitTests extends ZIOSpecDefault with Endpoints w private val checkpointControllerMock = mock(classOf[CheckpointController]) - when(checkpointControllerMock.createCheckpointV2(checkpointDTO1)) + when(checkpointControllerMock.postCheckpointV2(checkpointDTO1)) .thenReturn(ZIO.succeed(SingleSuccessResponse(checkpointDTO1, uuid))) - when(checkpointControllerMock.createCheckpointV2(checkpointDTO2)) + when(checkpointControllerMock.postCheckpointV2(checkpointDTO2)) .thenReturn(ZIO.fail(GeneralErrorResponse("error"))) - when(checkpointControllerMock.createCheckpointV2(checkpointDTO3)) + when(checkpointControllerMock.postCheckpointV2(checkpointDTO3)) .thenReturn(ZIO.fail(InternalServerErrorResponse("error"))) private val checkpointControllerMockLayer = ZLayer.succeed(checkpointControllerMock) private val createCheckpointServerEndpoint = createCheckpointEndpointV2 - .zServerLogic(CheckpointController.createCheckpointV2) + .zServerLogic(CheckpointController.postCheckpointV2) def spec: Spec[TestEnvironment with Scope, Any] = { val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[CheckpointController]))