Skip to content

Commit

Permalink
CIRC-1977: Do not refresh circulation rules cache on GET and PUT `/ci…
Browse files Browse the repository at this point in the history
…rculation/rules` (#1389)

* CIRC-1977 Do not send event on GET and PUT

* CIRC-1977 Mention Kafka-related environment variables in README

* CIRC-1977 Clean up in tests
  • Loading branch information
OleksandrVidinieiev authored Dec 4, 2023
1 parent 0dccb3e commit 4e1ec3f
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 33 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,23 @@ system property variables in this order, and uses the default 9801 as fallback.

The Docker container exposes port 9801.

## Environment variables

Module integrates with Kafka in order to consume/publish domain events. This integration can
be configured using the following environment variables:

| Variable name | Default value |
|--------------------|-------------------|
| KAFKA_HOST | localhost |
| KAFKA_PORT | 9092 |
| REPLICATION FACTOR | 1 |
| MAX_REQUEST_SIZE | 4000000 |
| ENV | folio |
| OKAPI_URL | http://okapi:9130 |

If a variable is not present, its default values is used as a fallback. If this configuration is
invalid, the module will start, but Kafka integration will not work.

## Design Notes

### Known Limitations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ private Future<KafkaConsumerWrapper<String, String>> createConsumer(DomainEventT
.processRecordErrorHandler((t, r) -> log.error("Failed to process event: {}", r, t))
.build();


return moduleIdProvider.getModuleId()
.onSuccess(moduleId -> log.info("createConsumer:: moduleId={}", moduleId))
.compose(moduleId -> consumer.start(handler, moduleId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.folio.circulation.rules.CirculationRulesException;
import org.folio.circulation.rules.CirculationRulesParser;
import org.folio.circulation.rules.Text2Drools;
import org.folio.circulation.rules.cache.CirculationRulesCache;
import org.folio.circulation.support.Clients;
import org.folio.circulation.support.CollectionResourceClient;
import org.folio.circulation.support.ForwardOnFailure;
Expand Down Expand Up @@ -101,9 +100,6 @@ private void get(RoutingContext routingContext) {
return;
}
JsonObject circulationRules = new JsonObject(response.getBody());
CirculationRulesCache.getInstance()
.buildRules(context.getTenantId(), circulationRules.getString("rulesAsText"));

context.write(ok(circulationRules));
}
catch (Exception e) {
Expand Down Expand Up @@ -158,8 +154,6 @@ private void proceedWithUpdate(Map<String, Set<String>> existingPoliciesIds,

clients.circulationRulesStorage().put(rulesInput.copy())
.thenApply(this::failWhenResponseOtherThanNoContent)
.thenApply(result -> result.map(response -> CirculationRulesCache.getInstance()
.buildRules(webContext.getTenantId(), rulesAsText)))
.thenApply(result -> result.map(response -> noContent()))
.thenAccept(webContext::writeResultToHttpResponse);
}
Expand Down
14 changes: 0 additions & 14 deletions src/test/java/api/CirculationRulesAPITests.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
package api;

import static api.support.APITestContext.TENANT_ID;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;

import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

import org.folio.circulation.rules.cache.CirculationRulesCache;
import org.folio.circulation.support.http.client.Response;
import org.junit.jupiter.api.Test;

import api.support.APITestContext;
import api.support.APITests;
import api.support.builders.LoanPolicyBuilder;
import api.support.builders.LostItemFeePolicyBuilder;
Expand Down Expand Up @@ -432,15 +427,6 @@ void canReportValidationError() {
assertThat(json.getInteger("column"), is(2));
}

@Test
void getRefreshesCirculationRulesCache() {
CirculationRulesCache cache = CirculationRulesCache.getInstance();
cache.dropCache();
assertThat(cache.getRules(TENANT_ID), nullValue());
String rules = circulationRulesFixture.getCirculationRules();
assertThat(cache.getRules(TENANT_ID).getRulesAsText(), equalTo(rules));
}

/** @return rulesAsText field */
private String getRulesText() {
Response response = circulationRulesFixture.getRules();
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/api/support/Wait.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.awaitility.Awaitility.waitAtMost;

import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
Expand All @@ -24,8 +25,8 @@ public static <T> Collection<T> waitForSize(Callable<Collection<T>> supplier, in
return waitForValue(supplier, (Predicate<Collection<T>>) c -> c.size() == expectedSize);
}

public static <T> T waitForValue(Callable<T> valueSupplier, T expectedValue) {
return waitForValue(valueSupplier, (Predicate<T>) actualValue -> actualValue == expectedValue);
public static <T> T waitForValue(Callable<T> valueSupplier, T expected) {
return waitForValue(valueSupplier, (Predicate<T>) actual -> Objects.equals(actual, expected));
}

public static <T> T waitForValue(Callable<T> valueSupplier, Predicate<T> valuePredicate) {
Expand Down
12 changes: 11 additions & 1 deletion src/test/java/api/support/fixtures/CirculationRulesFixture.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package api.support.fixtures;

import static api.support.APITestContext.getTenantId;
import static api.support.RestAssuredResponseConversion.toResponse;
import static api.support.http.InterfaceUrls.circulationRulesStorageUrl;
import static api.support.http.InterfaceUrls.circulationRulesUrl;
Expand All @@ -15,11 +16,13 @@
import java.util.List;
import java.util.UUID;

import org.apache.http.HttpStatus;
import org.folio.circulation.rules.ItemLocation;
import org.folio.circulation.rules.ItemType;
import org.folio.circulation.rules.LoanType;
import org.folio.circulation.rules.PatronGroup;
import org.folio.circulation.rules.Policy;
import org.folio.circulation.rules.cache.CirculationRulesCache;
import org.folio.circulation.support.http.client.Response;

import api.support.RestAssuredClient;
Expand All @@ -45,11 +48,18 @@ public String getCirculationRules() {
}

public Response putRules(String body) {
return toResponse(restAssuredClient
Response response = toResponse(restAssuredClient
.beginRequest("put-circulation-rules")
.body(body)
.when().put(circulationRulesUrl())
.then().extract().response());

if (response.getStatusCode() == HttpStatus.SC_NO_CONTENT) {
String rulesAsText = new JsonObject(body).getString("rulesAsText");
CirculationRulesCache.getInstance().buildRules(getTenantId(), rulesAsText);
}

return response;
}

public void updateCirculationRules(UUID loanPolicyId, UUID requestPolicyId,
Expand Down
27 changes: 18 additions & 9 deletions src/test/java/org/folio/circulation/EventConsumerVerticleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.admin.ConsumerGroupDescription;
import io.vertx.kafka.admin.ConsumerGroupListing;
import io.vertx.kafka.admin.MemberDescription;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
Expand All @@ -72,29 +70,35 @@ void circulationRulesUpdateEventConsumerJoinsVacantConsumerSubgroup() {
verifyConsumerGroups(Map.of(subgroup0, 1));

// verticle2 is deployed, new consumer is created in a separate subgroup1
String verticle2DeploymentId = deployVerticle();
String verticleId2 = deployVerticle();
verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 1));

// verticle3 is deployed, new consumer is created in a separate subgroup2
deployVerticle();
String verticleId3 = deployVerticle();
verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 1, subgroup2, 1));

// verticle2 is undeployed, its consumer and subgroup1 are removed
undeployVerticle(verticle2DeploymentId);
undeployVerticle(verticleId2);
waitFor(kafkaAdminClient.deleteConsumerGroups(List.of(subgroup1)));
verifyConsumerGroups(Map.of(subgroup0, 1, subgroup2, 1));

// verticle4 is deployed, the now vacant subgroup1 is recreated and new consumer joins it
String verticle4DeploymentId = deployVerticle();
String verticleId4 = deployVerticle();
verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 1, subgroup2, 1));

// verticle4 is undeployed, its consumer is removed, but empty subgroup1 is not removed
undeployVerticle(verticle4DeploymentId);
undeployVerticle(verticleId4);
verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 0, subgroup2, 1));

// verticle5 is deployed, new consumer joins the empty subgroup1
deployVerticle();
String verticleId5 = deployVerticle();
verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 1, subgroup2, 1));

// clean up: undeploy all active verticles deployed in this test, delete their consumer groups
undeployVerticle(verticleId3);
undeployVerticle(verticleId5);
waitFor(kafkaAdminClient.deleteConsumerGroups(List.of(subgroup1, subgroup2)));
verifyConsumerGroups(Map.of(subgroup0, 1));
}

@Test
Expand All @@ -104,7 +108,7 @@ void circulationRulesUpdateEventsAreDeliveredToMultipleConsumers() {

// first verticle has been deployed beforehand, so we should already see subgroup0 with 1 consumer
verifyConsumerGroups(Map.of(subgroup0, 1));
deployVerticle();
String verticleId = deployVerticle();
verifyConsumerGroups(Map.of(subgroup0, 1, subgroup1, 1));

int initialOffsetForSubgroup0 = getOffsetForCirculationRulesUpdateEvents(0);
Expand All @@ -115,6 +119,11 @@ void circulationRulesUpdateEventsAreDeliveredToMultipleConsumers() {

waitForValue(() -> getOffsetForCirculationRulesUpdateEvents(0), initialOffsetForSubgroup0 + 1);
waitForValue(() -> getOffsetForCirculationRulesUpdateEvents(1), initialOffsetForSubgroup1 + 1);

// clean up: undeploy verticle, delete its consumer group
undeployVerticle(verticleId);
waitFor(kafkaAdminClient.deleteConsumerGroups(List.of(subgroup1)));
verifyConsumerGroups(Map.of(subgroup0, 1));
}

@Test
Expand Down

0 comments on commit 4e1ec3f

Please sign in to comment.