From fbe8bbf45e724c3cb346ba7410d9a4f1779646b7 Mon Sep 17 00:00:00 2001 From: SinghAdes <90304030+SinghAdes@users.noreply.github.com> Date: Fri, 1 Dec 2023 17:45:42 +0530 Subject: [PATCH 1/3] [CIRC-1966]-Added validation (#1388) * [CIRC-1966]-Added validation * [CIRC-1966]-Added validation * [CIRC-1966]-Added validation * [CIRC-1966]-Added validation --- .../infrastructure/storage/inventory/ItemRepository.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/folio/circulation/infrastructure/storage/inventory/ItemRepository.java b/src/main/java/org/folio/circulation/infrastructure/storage/inventory/ItemRepository.java index da8a21ef18..450aa9e8ec 100644 --- a/src/main/java/org/folio/circulation/infrastructure/storage/inventory/ItemRepository.java +++ b/src/main/java/org/folio/circulation/infrastructure/storage/inventory/ItemRepository.java @@ -275,7 +275,8 @@ private CompletableFuture>> fetchItems(Collection records.mapRecords(mapper::toDomain))); + .thenApply(r -> r.map(records -> records.mapRecords(mapper::toDomain))) + .thenApply(r -> r.mapFailure(failure -> succeeded(MultipleRecords.empty()))); } private CompletableFuture> fetchItem(String itemId) { From 0dccb3ebf19c1f147a8208d4e586e626e6b27391 Mon Sep 17 00:00:00 2001 From: Alexander Kurash Date: Fri, 1 Dec 2023 18:07:54 +0200 Subject: [PATCH 2/3] CIRC-1961 Add new error codes (#1392) * CIRC-1961 Add new error codes * CIRC-1961 Fix tests --- .../domain/RequestServiceUtility.java | 20 ++++++--- .../folio/circulation/support/ErrorCode.java | 2 + .../requests/RequestsAPICreationTests.java | 41 ++++++++++++------- .../requests/scenarios/MoveRequestTests.java | 4 +- 4 files changed, 46 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/folio/circulation/domain/RequestServiceUtility.java b/src/main/java/org/folio/circulation/domain/RequestServiceUtility.java index 486d66bf06..68dad70a93 100644 --- a/src/main/java/org/folio/circulation/domain/RequestServiceUtility.java +++ b/src/main/java/org/folio/circulation/domain/RequestServiceUtility.java @@ -3,7 +3,10 @@ import static java.lang.String.format; import static org.folio.circulation.domain.representations.RequestProperties.PICKUP_SERVICE_POINT_ID; import static org.folio.circulation.domain.representations.RequestProperties.REQUEST_TYPE; +import static org.folio.circulation.support.ErrorCode.INSTANCE_ALREADY_REQUESTED; import static org.folio.circulation.support.ErrorCode.ITEM_ALREADY_REQUESTED; +import static org.folio.circulation.support.ErrorCode.ITEM_OF_THIS_INSTANCE_ALREADY_REQUESTED; +import static org.folio.circulation.support.ErrorCode.MOVING_REQUEST_TO_THE_SAME_ITEM; import static org.folio.circulation.support.ValidationErrorFailure.failedValidation; import static org.folio.circulation.support.results.Result.of; import static org.folio.circulation.support.results.Result.succeeded; @@ -254,20 +257,26 @@ private static Result alreadyRequestedFailure( Request requestBeingPlaced = requestAndRelatedRecords.getRequest(); HashMap parameters = new HashMap<>(); String message; + ErrorCode errorCode; if (requestBeingPlaced.isTitleLevel()) { if (existingRequest.isTitleLevel()) { parameters.put(REQUESTER_ID, requestBeingPlaced.getUserId()); parameters.put(INSTANCE_ID, requestBeingPlaced.getInstanceId()); - message = requestBeingPlaced.getOperation() == Operation.MOVE - ? "Not allowed to move title level page request to the same item" - : "This requester already has an open request for this instance"; + if (requestBeingPlaced.getOperation() == Operation.MOVE) { + message = "Not allowed to move title level page request to the same item"; + errorCode = MOVING_REQUEST_TO_THE_SAME_ITEM; + } else { + message = "This requester already has an open request for this instance"; + errorCode = INSTANCE_ALREADY_REQUESTED; + } } else { parameters.put(REQUESTER_ID, requestBeingPlaced.getUserId()); parameters.put(INSTANCE_ID, requestBeingPlaced.getInstanceId()); message = "This requester already has an open request for one of the instance's items"; + errorCode = ITEM_OF_THIS_INSTANCE_ALREADY_REQUESTED; } } else { parameters.put(REQUESTER_ID, requestBeingPlaced.getUserId()); @@ -275,10 +284,11 @@ private static Result alreadyRequestedFailure( parameters.put(REQUEST_ID, requestBeingPlaced.getId()); message = "This requester already has an open request for this item"; + errorCode = ITEM_ALREADY_REQUESTED; } - log.info("alreadyRequestedFailure:: message: {}", message); + log.info("alreadyRequestedFailure:: message: {}, errorCode: {}", message, errorCode); - return failedValidation(message, parameters, ITEM_ALREADY_REQUESTED); + return failedValidation(message, parameters, errorCode); } static boolean isTheSameRequester(RequestAndRelatedRecords it, Request that) { diff --git a/src/main/java/org/folio/circulation/support/ErrorCode.java b/src/main/java/org/folio/circulation/support/ErrorCode.java index 6712c52d23..9a2773b6ad 100644 --- a/src/main/java/org/folio/circulation/support/ErrorCode.java +++ b/src/main/java/org/folio/circulation/support/ErrorCode.java @@ -23,5 +23,7 @@ public enum ErrorCode { CANNOT_CREATE_PAGE_TLR_WITHOUT_ITEM_ID, MOVING_REQUEST_TO_THE_SAME_ITEM, ITEM_ALREADY_REQUESTED, + ITEM_OF_THIS_INSTANCE_ALREADY_REQUESTED, + INSTANCE_ALREADY_REQUESTED, HOLD_SHELF_REQUESTS_REQUIRE_PICKUP_SERVICE_POINT } diff --git a/src/test/java/api/requests/RequestsAPICreationTests.java b/src/test/java/api/requests/RequestsAPICreationTests.java index bfcd4569a2..42c35155ec 100644 --- a/src/test/java/api/requests/RequestsAPICreationTests.java +++ b/src/test/java/api/requests/RequestsAPICreationTests.java @@ -62,8 +62,11 @@ import static org.folio.circulation.domain.representations.logs.LogEventType.NOTICE_ERROR; import static org.folio.circulation.domain.representations.logs.LogEventType.REQUEST_CREATED_THROUGH_OVERRIDE; import static org.folio.circulation.support.ErrorCode.FULFILLMENT_PREFERENCE_IS_NOT_ALLOWED; -import static org.folio.circulation.support.ErrorCode.HOLD_SHELF_REQUESTS_REQUIRE_PICKUP_SERVICE_POINT; import static org.folio.circulation.support.ErrorCode.HOLD_AND_RECALL_TLR_NOT_ALLOWED_PAGEABLE_AVAILABLE_ITEM_FOUND; +import static org.folio.circulation.support.ErrorCode.HOLD_SHELF_REQUESTS_REQUIRE_PICKUP_SERVICE_POINT; +import static org.folio.circulation.support.ErrorCode.INSTANCE_ALREADY_REQUESTED; +import static org.folio.circulation.support.ErrorCode.ITEM_ALREADY_REQUESTED; +import static org.folio.circulation.support.ErrorCode.ITEM_OF_THIS_INSTANCE_ALREADY_REQUESTED; import static org.folio.circulation.support.ErrorCode.REQUESTER_ALREADY_HAS_LOAN_FOR_ONE_OF_INSTANCES_ITEMS; import static org.folio.circulation.support.ErrorCode.REQUESTER_ALREADY_HAS_THIS_ITEM_ON_LOAN; import static org.folio.circulation.support.ErrorCode.REQUEST_LEVEL_IS_NOT_ALLOWED; @@ -102,7 +105,6 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -import api.support.builders.AddInfoRequestBuilder; import org.apache.http.HttpStatus; import org.awaitility.Awaitility; import org.folio.circulation.domain.ItemStatus; @@ -132,6 +134,7 @@ import api.support.APITests; import api.support.TlrFeatureStatus; +import api.support.builders.AddInfoRequestBuilder; import api.support.builders.Address; import api.support.builders.CheckInByBarcodeRequestBuilder; import api.support.builders.HoldingBuilder; @@ -697,6 +700,7 @@ void cannotCreateTlrWhenUserAlreadyRequestedTheSameTitle() { assertThat(postResponse, hasStatus(HTTP_UNPROCESSABLE_ENTITY)); assertThat(postResponse.getJson(), hasErrorWith(allOf( hasMessage("This requester already has an open request for this instance"), + hasCode(INSTANCE_ALREADY_REQUESTED), hasParameter("requesterId", usersFixture.james().getId().toString()), hasParameter("instanceId", instanceId.toString())))); } @@ -727,6 +731,7 @@ void cannotCreateTlrWhenUserAlreadyRequestedAnItemFromTheSameTitle() { assertThat(response.getJson(), hasErrors(1)); assertThat(response.getJson(), hasErrorWith(allOf( hasMessage("This requester already has an open request for one of the instance's items"), + hasCode(ITEM_OF_THIS_INSTANCE_ALREADY_REQUESTED), hasParameter("requesterId", jessica.getId().toString()), hasParameter("instanceId", item1.getInstanceId().toString())))); } @@ -1497,8 +1502,10 @@ void cannotCreateItemLevelRequestIfTitleLevelRequestForInstanceAlreadyCreated() patronId, pickupServicePointId, instanceId, secondItem)); assertThat(response, hasStatus(HTTP_UNPROCESSABLE_ENTITY)); - assertThat(response.getJson(), hasErrorWith(hasMessage( - "This requester already has an open request for this item"))); + assertThat(response.getJson(), hasErrorWith(allOf( + hasMessage("This requester already has an open request for this item"), + hasCode(ITEM_ALREADY_REQUESTED) + ))); } @Test @@ -1517,8 +1524,10 @@ void cannotCreateTitleLevelRequestIfItemLevelRequestAlreadyCreated() { Response response = requestsClient.attemptCreate(buildPageTitleLevelRequest( patronId, pickupServicePointId, instanceId)); assertThat(response, hasStatus(HTTP_UNPROCESSABLE_ENTITY)); - assertThat(response.getJson(), hasErrorWith(hasMessage( - "This requester already has an open request for one of the instance's items"))); + assertThat(response.getJson(), hasErrorWith(allOf( + hasMessage("This requester already has an open request for one of the instance's items"), + hasCode(ITEM_OF_THIS_INSTANCE_ALREADY_REQUESTED) + ))); } @Test @@ -1556,8 +1565,10 @@ void cannotCreateTwoTitleLevelRequestsForSameInstance() { Response response = requestsClient.attemptCreate(buildPageTitleLevelRequest( userId, pickupServicePointId, instanceId)); assertThat(response, hasStatus(HTTP_UNPROCESSABLE_ENTITY)); - assertThat(response.getJson(), hasErrorWith(hasMessage( - "This requester already has an open request for this instance"))); + assertThat(response.getJson(), hasErrorWith(allOf( + hasMessage("This requester already has an open request for this instance"), + hasCode(INSTANCE_ALREADY_REQUESTED) + ))); assertThat(requestsClient.getAll(), hasSize(1)); } @@ -1575,8 +1586,10 @@ void cannotCreateTwoItemLevelRequestsForSameItem() { Response response = requestsClient.attemptCreate(buildItemLevelRequest(userId, pickupServicePointId, instanceId, item)); assertThat(response, hasStatus(HTTP_UNPROCESSABLE_ENTITY)); - assertThat(response.getJson(), hasErrorWith(hasMessage( - "This requester already has an open request for this item"))); + assertThat(response.getJson(), hasErrorWith(allOf( + hasMessage("This requester already has an open request for this item"), + hasCode(ITEM_ALREADY_REQUESTED) + ))); assertThat(requestsClient.getAll(), hasSize(1)); } @@ -2127,10 +2140,10 @@ void cannotCreateTwoRequestsFromTheSameUserForTheSameItem() { assertThat(response, hasStatus(HTTP_UNPROCESSABLE_ENTITY)); assertThat(response.getJson(), hasErrors(1)); - assertThat( - response.getJson(), - hasErrorWith(hasMessage("This requester already has an open request for this item")) - ); + assertThat(response.getJson(), hasErrorWith(allOf( + hasMessage("This requester already has an open request for this item"), + hasCode(ITEM_ALREADY_REQUESTED) + ))); } @Test diff --git a/src/test/java/api/requests/scenarios/MoveRequestTests.java b/src/test/java/api/requests/scenarios/MoveRequestTests.java index b5167b1345..7a53868f15 100644 --- a/src/test/java/api/requests/scenarios/MoveRequestTests.java +++ b/src/test/java/api/requests/scenarios/MoveRequestTests.java @@ -21,7 +21,7 @@ import static org.folio.circulation.domain.representations.ItemProperties.CALL_NUMBER_COMPONENTS; import static org.folio.circulation.domain.representations.RequestProperties.REQUEST_TYPE; import static org.folio.circulation.domain.representations.logs.LogEventType.REQUEST_MOVED; -import static org.folio.circulation.support.ErrorCode.ITEM_ALREADY_REQUESTED; +import static org.folio.circulation.support.ErrorCode.MOVING_REQUEST_TO_THE_SAME_ITEM; import static org.folio.circulation.support.utils.ClockUtil.getClock; import static org.folio.circulation.support.utils.ClockUtil.getZonedDateTime; import static org.folio.circulation.support.utils.ClockUtil.setClock; @@ -428,7 +428,7 @@ void cannotMoveTlrToTheSameItem() { Response response = requestsFixture.attemptMove(new MoveRequestBuilder(nodPage.getId(), item.getId())); assertThat(response.getJson(), hasErrorWith(allOf( hasMessage("Not allowed to move title level page request to the same item"), - hasCode(ITEM_ALREADY_REQUESTED), + hasCode(MOVING_REQUEST_TO_THE_SAME_ITEM), hasParameter("requesterId", jessica.getId().toString()), hasParameter("instanceId", item.getInstanceId().toString())))); } From 4e1ec3fb5571381903428bdf88a0f510c22d4e1a Mon Sep 17 00:00:00 2001 From: OleksandrVidinieiev <56632770+OleksandrVidinieiev@users.noreply.github.com> Date: Mon, 4 Dec 2023 11:12:01 +0000 Subject: [PATCH 3/3] CIRC-1977: Do not refresh circulation rules cache on GET and PUT `/circulation/rules` (#1389) * CIRC-1977 Do not send event on GET and PUT * CIRC-1977 Mention Kafka-related environment variables in README * CIRC-1977 Clean up in tests --- README.md | 17 ++++++++++++ .../circulation/EventConsumerVerticle.java | 1 - .../resources/CirculationRulesResource.java | 6 ----- .../java/api/CirculationRulesAPITests.java | 14 ---------- src/test/java/api/support/Wait.java | 5 ++-- .../fixtures/CirculationRulesFixture.java | 12 ++++++++- .../EventConsumerVerticleTest.java | 27 ++++++++++++------- 7 files changed, 49 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index d349d74cb7..1a54081227 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,23 @@ system property variables in this order, and uses the default 9801 as fallback. The Docker container exposes port 9801. +## Environment variables + +Module integrates with Kafka in order to consume/publish domain events. This integration can +be configured using the following environment variables: + +| Variable name | Default value | +|--------------------|-------------------| +| KAFKA_HOST | localhost | +| KAFKA_PORT | 9092 | +| REPLICATION FACTOR | 1 | +| MAX_REQUEST_SIZE | 4000000 | +| ENV | folio | +| OKAPI_URL | http://okapi:9130 | + +If a variable is not present, its default values is used as a fallback. If this configuration is +invalid, the module will start, but Kafka integration will not work. + ## Design Notes ### Known Limitations diff --git a/src/main/java/org/folio/circulation/EventConsumerVerticle.java b/src/main/java/org/folio/circulation/EventConsumerVerticle.java index 64e6516f57..2d35a4819d 100644 --- a/src/main/java/org/folio/circulation/EventConsumerVerticle.java +++ b/src/main/java/org/folio/circulation/EventConsumerVerticle.java @@ -110,7 +110,6 @@ private Future> createConsumer(DomainEventT .processRecordErrorHandler((t, r) -> log.error("Failed to process event: {}", r, t)) .build(); - return moduleIdProvider.getModuleId() .onSuccess(moduleId -> log.info("createConsumer:: moduleId={}", moduleId)) .compose(moduleId -> consumer.start(handler, moduleId)) diff --git a/src/main/java/org/folio/circulation/resources/CirculationRulesResource.java b/src/main/java/org/folio/circulation/resources/CirculationRulesResource.java index 3c0cacc275..5b9a001456 100644 --- a/src/main/java/org/folio/circulation/resources/CirculationRulesResource.java +++ b/src/main/java/org/folio/circulation/resources/CirculationRulesResource.java @@ -29,7 +29,6 @@ import org.folio.circulation.rules.CirculationRulesException; import org.folio.circulation.rules.CirculationRulesParser; import org.folio.circulation.rules.Text2Drools; -import org.folio.circulation.rules.cache.CirculationRulesCache; import org.folio.circulation.support.Clients; import org.folio.circulation.support.CollectionResourceClient; import org.folio.circulation.support.ForwardOnFailure; @@ -101,9 +100,6 @@ private void get(RoutingContext routingContext) { return; } JsonObject circulationRules = new JsonObject(response.getBody()); - CirculationRulesCache.getInstance() - .buildRules(context.getTenantId(), circulationRules.getString("rulesAsText")); - context.write(ok(circulationRules)); } catch (Exception e) { @@ -158,8 +154,6 @@ private void proceedWithUpdate(Map> existingPoliciesIds, clients.circulationRulesStorage().put(rulesInput.copy()) .thenApply(this::failWhenResponseOtherThanNoContent) - .thenApply(result -> result.map(response -> CirculationRulesCache.getInstance() - .buildRules(webContext.getTenantId(), rulesAsText))) .thenApply(result -> result.map(response -> noContent())) .thenAccept(webContext::writeResultToHttpResponse); } diff --git a/src/test/java/api/CirculationRulesAPITests.java b/src/test/java/api/CirculationRulesAPITests.java index 6ecab212a1..108adc87a6 100644 --- a/src/test/java/api/CirculationRulesAPITests.java +++ b/src/test/java/api/CirculationRulesAPITests.java @@ -1,21 +1,16 @@ package api; -import static api.support.APITestContext.TENANT_ID; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.Is.is; import java.util.HashSet; import java.util.Set; import java.util.UUID; -import org.folio.circulation.rules.cache.CirculationRulesCache; import org.folio.circulation.support.http.client.Response; import org.junit.jupiter.api.Test; -import api.support.APITestContext; import api.support.APITests; import api.support.builders.LoanPolicyBuilder; import api.support.builders.LostItemFeePolicyBuilder; @@ -432,15 +427,6 @@ void canReportValidationError() { assertThat(json.getInteger("column"), is(2)); } - @Test - void getRefreshesCirculationRulesCache() { - CirculationRulesCache cache = CirculationRulesCache.getInstance(); - cache.dropCache(); - assertThat(cache.getRules(TENANT_ID), nullValue()); - String rules = circulationRulesFixture.getCirculationRules(); - assertThat(cache.getRules(TENANT_ID).getRulesAsText(), equalTo(rules)); - } - /** @return rulesAsText field */ private String getRulesText() { Response response = circulationRulesFixture.getRules(); diff --git a/src/test/java/api/support/Wait.java b/src/test/java/api/support/Wait.java index ea466da11f..2093216878 100644 --- a/src/test/java/api/support/Wait.java +++ b/src/test/java/api/support/Wait.java @@ -5,6 +5,7 @@ import static org.awaitility.Awaitility.waitAtMost; import java.util.Collection; +import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; @@ -24,8 +25,8 @@ public static Collection waitForSize(Callable> supplier, in return waitForValue(supplier, (Predicate>) c -> c.size() == expectedSize); } - public static T waitForValue(Callable valueSupplier, T expectedValue) { - return waitForValue(valueSupplier, (Predicate) actualValue -> actualValue == expectedValue); + public static T waitForValue(Callable valueSupplier, T expected) { + return waitForValue(valueSupplier, (Predicate) actual -> Objects.equals(actual, expected)); } public static T waitForValue(Callable valueSupplier, Predicate valuePredicate) { diff --git a/src/test/java/api/support/fixtures/CirculationRulesFixture.java b/src/test/java/api/support/fixtures/CirculationRulesFixture.java index ec4adf4620..bed1831905 100644 --- a/src/test/java/api/support/fixtures/CirculationRulesFixture.java +++ b/src/test/java/api/support/fixtures/CirculationRulesFixture.java @@ -1,5 +1,6 @@ package api.support.fixtures; +import static api.support.APITestContext.getTenantId; import static api.support.RestAssuredResponseConversion.toResponse; import static api.support.http.InterfaceUrls.circulationRulesStorageUrl; import static api.support.http.InterfaceUrls.circulationRulesUrl; @@ -15,11 +16,13 @@ import java.util.List; import java.util.UUID; +import org.apache.http.HttpStatus; import org.folio.circulation.rules.ItemLocation; import org.folio.circulation.rules.ItemType; import org.folio.circulation.rules.LoanType; import org.folio.circulation.rules.PatronGroup; import org.folio.circulation.rules.Policy; +import org.folio.circulation.rules.cache.CirculationRulesCache; import org.folio.circulation.support.http.client.Response; import api.support.RestAssuredClient; @@ -45,11 +48,18 @@ public String getCirculationRules() { } public Response putRules(String body) { - return toResponse(restAssuredClient + Response response = toResponse(restAssuredClient .beginRequest("put-circulation-rules") .body(body) .when().put(circulationRulesUrl()) .then().extract().response()); + + if (response.getStatusCode() == HttpStatus.SC_NO_CONTENT) { + String rulesAsText = new JsonObject(body).getString("rulesAsText"); + CirculationRulesCache.getInstance().buildRules(getTenantId(), rulesAsText); + } + + return response; } public void updateCirculationRules(UUID loanPolicyId, UUID requestPolicyId, diff --git a/src/test/java/org/folio/circulation/EventConsumerVerticleTest.java b/src/test/java/org/folio/circulation/EventConsumerVerticleTest.java index f4762e1f19..e074d803a9 100644 --- a/src/test/java/org/folio/circulation/EventConsumerVerticleTest.java +++ b/src/test/java/org/folio/circulation/EventConsumerVerticleTest.java @@ -45,8 +45,6 @@ import io.vertx.core.Future; import io.vertx.core.json.JsonObject; import io.vertx.kafka.admin.ConsumerGroupDescription; -import io.vertx.kafka.admin.ConsumerGroupListing; -import io.vertx.kafka.admin.MemberDescription; import io.vertx.kafka.client.common.TopicPartition; import io.vertx.kafka.client.consumer.OffsetAndMetadata; import io.vertx.kafka.client.producer.KafkaProducerRecord; @@ -72,29 +70,35 @@ void circulationRulesUpdateEventConsumerJoinsVacantConsumerSubgroup() { verifyConsumerGroups(Map.of(subgroup0, 1)); // verticle2 is deployed, new consumer is created in a separate subgroup1 - String verticle2DeploymentId = deployVerticle(); + String verticleId2 = deployVerticle(); verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 1)); // verticle3 is deployed, new consumer is created in a separate subgroup2 - deployVerticle(); + String verticleId3 = deployVerticle(); verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 1, subgroup2, 1)); // verticle2 is undeployed, its consumer and subgroup1 are removed - undeployVerticle(verticle2DeploymentId); + undeployVerticle(verticleId2); waitFor(kafkaAdminClient.deleteConsumerGroups(List.of(subgroup1))); verifyConsumerGroups(Map.of(subgroup0, 1, subgroup2, 1)); // verticle4 is deployed, the now vacant subgroup1 is recreated and new consumer joins it - String verticle4DeploymentId = deployVerticle(); + String verticleId4 = deployVerticle(); verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 1, subgroup2, 1)); // verticle4 is undeployed, its consumer is removed, but empty subgroup1 is not removed - undeployVerticle(verticle4DeploymentId); + undeployVerticle(verticleId4); verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 0, subgroup2, 1)); // verticle5 is deployed, new consumer joins the empty subgroup1 - deployVerticle(); + String verticleId5 = deployVerticle(); verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 1, subgroup2, 1)); + + // clean up: undeploy all active verticles deployed in this test, delete their consumer groups + undeployVerticle(verticleId3); + undeployVerticle(verticleId5); + waitFor(kafkaAdminClient.deleteConsumerGroups(List.of(subgroup1, subgroup2))); + verifyConsumerGroups(Map.of(subgroup0, 1)); } @Test @@ -104,7 +108,7 @@ void circulationRulesUpdateEventsAreDeliveredToMultipleConsumers() { // first verticle has been deployed beforehand, so we should already see subgroup0 with 1 consumer verifyConsumerGroups(Map.of(subgroup0, 1)); - deployVerticle(); + String verticleId = deployVerticle(); verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 1)); int initialOffsetForSubgroup0 = getOffsetForCirculationRulesUpdateEvents(0); @@ -115,6 +119,11 @@ void circulationRulesUpdateEventsAreDeliveredToMultipleConsumers() { waitForValue(() -> getOffsetForCirculationRulesUpdateEvents(0), initialOffsetForSubgroup0 + 1); waitForValue(() -> getOffsetForCirculationRulesUpdateEvents(1), initialOffsetForSubgroup1 + 1); + + // clean up: undeploy verticle, delete its consumer group + undeployVerticle(verticleId); + waitFor(kafkaAdminClient.deleteConsumerGroups(List.of(subgroup1))); + verifyConsumerGroups(Map.of(subgroup0, 1)); } @Test