From 6c46322d9700bd2b3a03a3eb8fa2695ff0341e1f Mon Sep 17 00:00:00 2001 From: Kunal jha Date: Thu, 24 Oct 2019 16:30:03 +0200 Subject: [PATCH] Revert "added circuit breaker changes and test" --- .../kafka/HystrixKafkaCircuitBreaker.java | 5 +- .../kafka/KafkaTopicRepository.java | 2 +- .../kafka/KafkaTopicRepositoryTest.java | 46 +++++-------------- 3 files changed, 15 insertions(+), 38 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/HystrixKafkaCircuitBreaker.java b/src/main/java/org/zalando/nakadi/repository/kafka/HystrixKafkaCircuitBreaker.java index 3c68e14d08..8145052ff7 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/HystrixKafkaCircuitBreaker.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/HystrixKafkaCircuitBreaker.java @@ -45,8 +45,8 @@ public HystrixKafkaCircuitBreaker(final String brokerId) { concurrentExecutionCount = new AtomicInteger(); } - public boolean attemptExecution() { - return circuitBreaker.attemptExecution(); + public boolean allowRequest() { + return circuitBreaker.allowRequest(); } public void markStart() { @@ -66,7 +66,6 @@ public void markFailure() { concurrentExecutionCount.decrementAndGet(); HystrixThreadEventStream.getInstance() .executionDone(ExecutionResult.from(HystrixEventType.FAILURE), commandKey, threadPoolKey); - circuitBreaker.markNonSuccess(); } public String getMetrics() { diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 28087d7d42..40fbd3b222 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -273,7 +273,7 @@ public void syncPostBatch(final String topicId, final List batch, fin item.setStep(EventPublishingStep.PUBLISHING); final HystrixKafkaCircuitBreaker circuitBreaker = circuitBreakers.computeIfAbsent( item.getBrokerId(), brokerId -> new HystrixKafkaCircuitBreaker(brokerId)); - if (circuitBreaker.attemptExecution()) { + if (circuitBreaker.allowRequest()) { sendFutures.put(item, publishItem(producer, topicId, item, circuitBreaker)); } else { shortCircuited++; diff --git a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java index bdd000abb4..a15649caea 100644 --- a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java +++ b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.google.common.collect.Sets.newHashSet; @@ -108,8 +107,6 @@ private enum ConsumerOffsetMode { @SuppressWarnings("unchecked") public KafkaTopicRepositoryTest() { System.setProperty("hystrix.command.1.metrics.healthSnapshot.intervalInMilliseconds", "10"); - System.setProperty("hystrix.command.1.metrics.rollingStats.timeInMilliseconds", "500"); - System.setProperty("hystrix.command.1.circuitBreaker.sleepWindowInMilliseconds", "500"); kafkaProducer = mock(KafkaProducer.class); when(kafkaProducer.partitionsFor(anyString())).then( invocation -> partitionsOfTopic((String) invocation.getArguments()[0]) @@ -319,42 +316,19 @@ public void whenKafkaPublishCallbackWithExceptionThenEventPublishingException() } @Test - public void checkCircuitBreakerStateBasedOnKafkaResponse() { + public void whenKafkaPublishTimeoutThenCircuitIsOpened() { + when(nakadiSettings.getKafkaSendTimeoutMs()).thenReturn(1000L); + when(kafkaProducer.partitionsFor(EXPECTED_PRODUCER_RECORD.topic())).thenReturn(ImmutableList.of( new PartitionInfo(EXPECTED_PRODUCER_RECORD.topic(), 1, new Node(1, "host", 9091), null, null))); - //Timeout Exception should cause circuit breaker to open - List batches = setResponseForSendingBatches(new TimeoutException()); - Assert.assertTrue(batches.stream() - .filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED && - item.getResponse().getDetail().equals("short circuited")) - .count() >= 1); - - //No exception should close the circuit - batches = setResponseForSendingBatches(null); - Assert.assertTrue(batches.stream() - .filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.SUBMITTED && - item.getResponse().getDetail().equals("")) - .count() >= 1); - - //Timeout Exception should cause circuit breaker to open again - batches = setResponseForSendingBatches(new TimeoutException()); - Assert.assertTrue(batches.stream() - .filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED && - item.getResponse().getDetail().equals("short circuited")) - .count() >= 1); - - } - - private List setResponseForSendingBatches(final Exception e) { when(kafkaProducer.send(any(), any())).thenAnswer(invocation -> { final Callback callback = (Callback) invocation.getArguments()[1]; - if (callback != null) { - callback.onCompletion(null, e); - } + callback.onCompletion(null, new TimeoutException()); return null; }); + final List batches = new LinkedList<>(); for (int i = 0; i < 100; i++) { try { @@ -364,13 +338,17 @@ private List setResponseForSendingBatches(final Exception e) { Collections.emptyList()); batchItem.setPartition("1"); batches.add(batchItem); - TimeUnit.MILLISECONDS.sleep(5); kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), ImmutableList.of(batchItem), "random"); - } catch (final EventPublishingException | InterruptedException ex) { + fail(); + } catch (final EventPublishingException e) { } } - return batches; + + Assert.assertTrue(batches.stream() + .filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED && + item.getResponse().getDetail().equals("short circuited")) + .count() >= 1); } @Test