From 5f6f138aef1cc9a9418c626842ae03375f13bbe0 Mon Sep 17 00:00:00 2001 From: Yong Sheng Tan Date: Tue, 24 Dec 2024 14:42:21 +0800 Subject: [PATCH] Added lease extension to java sdk and fixed tests --- .../conductor-client/build.gradle | 3 +- .../client/automator/TaskRunner.java | 66 +++++++++- .../automator/TaskRunnerConfigurer.java | 3 + .../conductor/client/worker/Worker.java | 5 + .../automator/TaskRunnerConfigurerTest.java | 115 ++++++++++++++---- .../client/config/TestPropertyFactory.java | 19 +-- .../client/worker/TestWorkflowTask.java | 10 +- .../resources/conductor-workers.properties | 11 ++ .../client/http/WorkflowBulkResource.java | 10 +- .../client/http/WorkflowClientTests.java | 6 +- .../dao/index/OpenSearchRestDaoBaseTest.java | 5 +- .../os/dao/index/OpenSearchTest.java | 11 +- .../index/TestBulkRequestBuilderWrapper.java | 4 +- 13 files changed, 206 insertions(+), 62 deletions(-) create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/test/resources/conductor-workers.properties diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/build.gradle b/conductor-clients/java/conductor-java-sdk/conductor-client/build.gradle index 7a420745b..896deac04 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/build.gradle +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/build.gradle @@ -21,8 +21,7 @@ dependencies { testImplementation "org.junit.jupiter:junit-jupiter-api:${versions.junit}" testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${versions.junit}" - testImplementation "org.powermock:powermock-module-junit4:2.0.9" - testImplementation "org.powermock:powermock-api-mockito2:2.0.9" + testImplementation 'org.mockito:mockito-inline:5.2.0' testImplementation 'org.spockframework:spock-core:2.3-groovy-3.0' testImplementation 'org.codehaus.groovy:groovy:3.0.15' diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java index 8c4945e93..f6985cd07 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java @@ -16,13 +16,17 @@ import java.io.StringWriter; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -68,6 +72,10 @@ class TaskRunner { private final EventDispatcher eventDispatcher; private final LinkedBlockingQueue tasksTobeExecuted; private final boolean enableUpdateV2; + private static final int LEASE_EXTEND_RETRY_COUNT = 3; + private static final double LEASE_EXTEND_DURATION_FACTOR = 0.8; + private final ScheduledExecutorService leaseExtendExecutorService; + private Map> leaseExtendMap = new HashMap<>(); TaskRunner(Worker worker, TaskClient taskClient, @@ -122,6 +130,15 @@ class TaskRunner { pollingIntervalInMillis, domain); LOGGER.info("Polling errors for taskType {} will be printed at every {} occurrence.", taskType, errorAt); + + LOGGER.info("Initialized the task lease extend executor"); + leaseExtendExecutorService = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder() + .namingPattern("workflow-lease-extend-%d") + .daemon(true) + .uncaughtExceptionHandler(uncaughtExceptionHandler) + .build() + ); } public void pollAndExecute() { @@ -145,7 +162,25 @@ public void pollAndExecute() { LOGGER.trace("Poller for task {} waited for {} ms before getting {} tasks to execute", taskType, stopwatch.elapsed(TimeUnit.MILLISECONDS), tasks.size()); stopwatch = null; } - tasks.forEach(task -> this.executorService.submit(() -> this.processTask(task))); + tasks.forEach(task -> { + Future taskFuture = this.executorService.submit(() -> this.processTask(task)); + + if (task.getResponseTimeoutSeconds() > 0 && worker.leaseExtendEnabled()) { + ScheduledFuture scheduledFuture = leaseExtendMap.get(task.getTaskId()); + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } + + long delay = Math.round(task.getResponseTimeoutSeconds() * LEASE_EXTEND_DURATION_FACTOR); + ScheduledFuture leaseExtendFuture = leaseExtendExecutorService.scheduleWithFixedDelay( + extendLease(task, taskFuture), + delay, + delay, + TimeUnit.SECONDS + ); + leaseExtendMap.put(task.getTaskId(), leaseExtendFuture); + } + }); } catch (Throwable t) { LOGGER.error(t.getMessage(), t); } @@ -251,7 +286,7 @@ private List pollTask(int count) { LOGGER.error("Uncaught exception. Thread {} will exit now", thread, error); }; - private void processTask(Task task) { + private Task processTask(Task task) { eventDispatcher.publish(new TaskExecutionStarted(taskType, task.getTaskId(), worker.getIdentity())); LOGGER.trace("Executing task: {} of type: {} in worker: {} at {}", task.getTaskId(), taskType, worker.getClass().getSimpleName(), worker.getIdentity()); LOGGER.trace("task {} is getting executed after {} ms of getting polled", task.getTaskId(), (System.currentTimeMillis() - task.getStartTime())); @@ -271,6 +306,7 @@ private void processTask(Task task) { } finally { permits.release(); } + return task; } private void executeTask(Worker worker, Task task) { @@ -400,4 +436,30 @@ private void handleException(Throwable t, TaskResult result, Worker worker, Task result.log(stringWriter.toString()); updateTaskResult(updateRetryCount, task, result, worker); } + + private Runnable extendLease(Task task, Future taskCompletableFuture) { + return () -> { + if (taskCompletableFuture.isDone()) { + LOGGER.warn( + "Task processing for {} completed, but its lease extend was not cancelled", + task.getTaskId()); + return; + } + LOGGER.info("Attempting to extend lease for {}", task.getTaskId()); + try { + TaskResult result = new TaskResult(task); + result.setExtendLease(true); + retryOperation( + (TaskResult taskResult) -> { + taskClient.updateTask(taskResult); + return null; + }, + LEASE_EXTEND_RETRY_COUNT, + result, + "extend lease"); + } catch (Exception e) { + LOGGER.error("Failed to extend lease for {}", task.getTaskId(), e); + } + }; + } } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java index b04ed2ed1..44b341f01 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java @@ -260,6 +260,9 @@ public TaskRunnerConfigurer.Builder withTaskToDomain(Map taskToD public TaskRunnerConfigurer.Builder withTaskThreadCount( Map taskToThreadCount) { this.taskToThreadCount = taskToThreadCount; + if (taskToThreadCount.values().stream().anyMatch(v -> v < 1)) { + throw new IllegalArgumentException("No. of threads cannot be less than 1"); + } return this; } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/worker/Worker.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/worker/Worker.java index 0ea08be7e..6940f765f 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/worker/Worker.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/worker/Worker.java @@ -29,6 +29,7 @@ public interface Worker { String PROP_ALL_WORKERS = "all"; String PROP_LOG_INTERVAL = "log_interval"; String PROP_POLL_INTERVAL = "poll_interval"; + String PROP_LEASE_EXTEND_ENABLED = "leaseExtendEnabled"; String PROP_PAUSED = "paused"; /** @@ -91,6 +92,10 @@ default int getPollingInterval() { return PropertyFactory.getInteger(getTaskDefName(), PROP_POLL_INTERVAL, 1000); } + default boolean leaseExtendEnabled() { + return PropertyFactory.getBoolean(getTaskDefName(), PROP_LEASE_EXTEND_ENABLED, false); + } + static Worker create(String taskType, Function executor) { return new Worker() { diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/automator/TaskRunnerConfigurerTest.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/automator/TaskRunnerConfigurerTest.java index cbd9df8a9..a8dcca13b 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/automator/TaskRunnerConfigurerTest.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/automator/TaskRunnerConfigurerTest.java @@ -15,16 +15,19 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import com.netflix.conductor.client.exception.ConductorClientException; import com.netflix.conductor.client.http.TaskClient; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.Task; @@ -32,41 +35,50 @@ import static com.netflix.conductor.common.metadata.tasks.TaskResult.Status.COMPLETED; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@TestMethodOrder(MethodOrderer.MethodName.class) public class TaskRunnerConfigurerTest { - private static final String TEST_TASK_DEF_NAME = "test"; - private TaskClient client; - @Before + @BeforeEach public void setup() { client = Mockito.mock(TaskClient.class); } - @Test(expected = NullPointerException.class) + @Test public void testNoWorkersException() { - new TaskRunnerConfigurer.Builder(null, null).build(); + assertThrows(NullPointerException.class, () -> new TaskRunnerConfigurer.Builder(null, null).build()); } - @Test(expected = ConductorClientException.class) + @Test public void testInvalidThreadConfig() { Worker worker1 = Worker.create("task1", TaskResult::new); Worker worker2 = Worker.create("task2", TaskResult::new); Map taskThreadCount = new HashMap<>(); - taskThreadCount.put(worker1.getTaskDefName(), 2); + taskThreadCount.put(worker1.getTaskDefName(), 0); taskThreadCount.put(worker2.getTaskDefName(), 3); - new TaskRunnerConfigurer.Builder(client, Arrays.asList(worker1, worker2)) - .withThreadCount(10) - .withTaskThreadCount(taskThreadCount) - .build(); + + assertThrows(IllegalArgumentException.class, () -> new TaskRunnerConfigurer.Builder(client, Arrays.asList(worker1, worker2)) + .withThreadCount(-1) + .withTaskThreadCount(taskThreadCount) + .build()); + + assertThrows(IllegalArgumentException.class, () -> new TaskRunnerConfigurer.Builder(client, Arrays.asList(worker1, worker2)) + .withTaskThreadCount(taskThreadCount) + .build()); } @Test @@ -81,12 +93,12 @@ public void testMissingTaskThreadConfig() { .build(); assertFalse(configurer.getTaskThreadCount().isEmpty()); - assertEquals(2, configurer.getTaskThreadCount().size()); + assertEquals(1, configurer.getTaskThreadCount().size()); assertEquals(2, configurer.getTaskThreadCount().get("task1").intValue()); - assertEquals(1, configurer.getTaskThreadCount().get("task2").intValue()); } @Test + @SuppressWarnings("deprecation") public void testPerTaskThreadPool() { Worker worker1 = Worker.create("task1", TaskResult::new); Worker worker2 = Worker.create("task2", TaskResult::new); @@ -104,19 +116,18 @@ public void testPerTaskThreadPool() { } @Test + @SuppressWarnings("deprecation") public void testSharedThreadPool() { Worker worker = Worker.create(TEST_TASK_DEF_NAME, TaskResult::new); TaskRunnerConfigurer configurer = new TaskRunnerConfigurer.Builder(client, Arrays.asList(worker, worker, worker)) .build(); configurer.init(); - assertEquals(3, configurer.getThreadCount()); + assertEquals(-1, configurer.getThreadCount()); assertEquals(500, configurer.getSleepWhenRetry()); assertEquals(3, configurer.getUpdateRetryCount()); assertEquals(10, configurer.getShutdownGracePeriodSeconds()); - assertFalse(configurer.getTaskThreadCount().isEmpty()); - assertEquals(1, configurer.getTaskThreadCount().size()); - assertEquals(3, configurer.getTaskThreadCount().get(TEST_TASK_DEF_NAME).intValue()); + assertTrue(configurer.getTaskThreadCount().isEmpty()); configurer = new TaskRunnerConfigurer.Builder(client, Collections.singletonList(worker)) @@ -133,9 +144,7 @@ public void testSharedThreadPool() { assertEquals(10, configurer.getUpdateRetryCount()); assertEquals(15, configurer.getShutdownGracePeriodSeconds()); assertEquals("test-worker-", configurer.getWorkerNamePrefix()); - assertFalse(configurer.getTaskThreadCount().isEmpty()); - assertEquals(1, configurer.getTaskThreadCount().size()); - assertEquals(100, configurer.getTaskThreadCount().get(TEST_TASK_DEF_NAME).intValue()); + assertTrue(configurer.getTaskThreadCount().isEmpty()); } @Test @@ -186,9 +195,9 @@ public void testMultipleWorkersExecution() throws Exception { Object[] args = invocation.getArguments(); String taskName = args[0].toString(); if (taskName.equals(task1Name)) { - return Arrays.asList(task1); + return List.of(task1); } else if (taskName.equals(task2Name)) { - return Arrays.asList(task2); + return List.of(task2); } else { return Collections.emptyList(); } @@ -220,6 +229,58 @@ public void testMultipleWorkersExecution() throws Exception { assertEquals(1, task2Counter.get()); } + @Test + public void testLeaseExtension() throws Exception { + TaskClient taskClient = mock(TaskClient.class); + String taskName = "task1"; + + Worker worker = mock(Worker.class); + when(worker.getTaskDefName()).thenReturn(taskName); + when(worker.leaseExtendEnabled()).thenReturn(true); + + doAnswer(invocation -> { + TaskResult result = new TaskResult(invocation.getArgument(0)); + result.setStatus(TaskResult.Status.IN_PROGRESS); + return result; + }).when(worker).execute(any(Task.class)); + + Task task = new Task(); + task.setTaskId("task123"); + task.setTaskDefName(taskName); + task.setStatus(Task.Status.IN_PROGRESS); + task.setResponseTimeoutSeconds(2000); + + when(taskClient.batchPollTasksInDomain(any(), any(), any(), anyInt(), anyInt())) + .thenAnswer((invocation) -> List.of(task)); + when(taskClient.ack(any(), any())).thenReturn(true); + + CountDownLatch latch = new CountDownLatch(1); + doAnswer(invocation -> { + latch.countDown(); + return null; + }).when(taskClient).updateTask(any(TaskResult.class)); + + TaskRunnerConfigurer configurer = new TaskRunnerConfigurer.Builder(taskClient, List.of(worker)) + .withSleepWhenRetry(100) + .withUpdateRetryCount(3) + .withThreadCount(1) + .build(); + + configurer.init(); + latch.await(); + + ArgumentCaptor taskResultCaptor = ArgumentCaptor.forClass(TaskResult.class); + verify(taskClient, atLeastOnce()).updateTask(taskResultCaptor.capture()); + + TaskResult capturedResult = taskResultCaptor.getValue(); + assertNotNull(capturedResult); + assertEquals("task123", capturedResult.getTaskId()); + assertEquals(TaskResult.Status.IN_PROGRESS, capturedResult.getStatus()); + + verify(worker, atLeastOnce()).execute(task); + assertTrue(worker.leaseExtendEnabled(), "Worker lease extension should be enabled"); + } + private Task testTask(String taskDefName) { Task task = new Task(); task.setTaskId(UUID.randomUUID().toString()); diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/config/TestPropertyFactory.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/config/TestPropertyFactory.java index 398738c59..b432c241b 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/config/TestPropertyFactory.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/config/TestPropertyFactory.java @@ -12,16 +12,17 @@ */ package com.netflix.conductor.client.config; -import org.junit.Test; +import org.junit.jupiter.api.Test; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.TaskResult; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class TestPropertyFactory { @@ -30,14 +31,14 @@ public void testIdentity() { Worker worker = Worker.create("Test2", TaskResult::new); assertNotNull(worker.getIdentity()); boolean paused = worker.paused(); - assertFalse("Paused? " + paused, paused); + assertFalse(paused); } @Test public void test() { int val = PropertyFactory.getInteger("workerB", "pollingInterval", 100); - assertEquals("got: " + val, 2, val); + assertEquals(2, val); assertEquals( 100, PropertyFactory.getInteger("workerB", "propWithoutValue", 100).intValue()); @@ -67,6 +68,6 @@ public void test() { public void testProperty() { Worker worker = Worker.create("Test", TaskResult::new); boolean paused = worker.paused(); - assertTrue("Paused? " + paused, paused); + assertTrue(paused); } } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/worker/TestWorkflowTask.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/worker/TestWorkflowTask.java index 32f7bcb76..1ad4a1d05 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/worker/TestWorkflowTask.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/java/com/netflix/conductor/client/worker/TestWorkflowTask.java @@ -15,8 +15,8 @@ import java.io.InputStream; import java.util.List; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import com.netflix.conductor.common.config.ObjectMapperProvider; import com.netflix.conductor.common.metadata.tasks.Task; @@ -25,14 +25,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class TestWorkflowTask { private ObjectMapper objectMapper; - @Before + @BeforeEach public void setup() { objectMapper = new ObjectMapperProvider().getObjectMapper(); } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/resources/conductor-workers.properties b/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/resources/conductor-workers.properties new file mode 100644 index 000000000..93fd67347 --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/test/resources/conductor-workers.properties @@ -0,0 +1,11 @@ +conductor.worker.pollingInterval=2 +conductor.worker.paused=false +conductor.worker.workerA.paused=true +conductor.worker.workerA.domain=domainA +conductor.worker.workerB.batchSize=84 +conductor.worker.workerB.domain=domainB +conductor.worker.Test.paused=true +conductor.worker.domainTestTask2.domain=visinghDomain +conductor.worker.task_run_always.pollOutOfDiscovery=true +conductor.worker.task_explicit_do_not_run_always.pollOutOfDiscovery=false +conductor.worker.task_ignore_override.pollOutOfDiscovery=true \ No newline at end of file diff --git a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java index 56c3e92a5..d7649a348 100644 --- a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java +++ b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java @@ -38,7 +38,7 @@ BulkResponse pauseWorkflows(List workflowIds) { .body(workflowIds) .build(); - ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + ConductorClientResponse> resp = client.execute(request, new TypeReference<>() { }); return resp.getData(); @@ -52,7 +52,7 @@ BulkResponse restartWorkflows(List workflowIds, Boolean useLates .body(workflowIds) .build(); - ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + ConductorClientResponse> resp = client.execute(request, new TypeReference<>() { }); return resp.getData(); @@ -65,7 +65,7 @@ BulkResponse resumeWorkflows(List workflowIds) { .body(workflowIds) .build(); - ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + ConductorClientResponse> resp = client.execute(request, new TypeReference<>() { }); return resp.getData(); @@ -78,7 +78,7 @@ BulkResponse retryWorkflows(List workflowIds) { .body(workflowIds) .build(); - ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + ConductorClientResponse> resp = client.execute(request, new TypeReference<>() { }); return resp.getData(); @@ -93,7 +93,7 @@ public BulkResponse terminateWorkflows(List workflowIds, String .body(workflowIds) .build(); - ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + ConductorClientResponse> resp = client.execute(request, new TypeReference<>() { }); return resp.getData(); diff --git a/conductor-clients/java/conductor-java-sdk/tests/src/test/java/io/orkes/conductor/client/http/WorkflowClientTests.java b/conductor-clients/java/conductor-java-sdk/tests/src/test/java/io/orkes/conductor/client/http/WorkflowClientTests.java index 7e8ec62cf..81d61f071 100644 --- a/conductor-clients/java/conductor-java-sdk/tests/src/test/java/io/orkes/conductor/client/http/WorkflowClientTests.java +++ b/conductor-clients/java/conductor-java-sdk/tests/src/test/java/io/orkes/conductor/client/http/WorkflowClientTests.java @@ -16,17 +16,17 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import com.netflix.conductor.common.metadata.tasks.TaskResult; -import com.netflix.conductor.common.metadata.workflow.WorkflowTask; -import com.netflix.conductor.common.run.WorkflowTestRequest; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.tasks.TaskResult; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.common.run.WorkflowTestRequest; import com.netflix.conductor.sdk.workflow.def.ConductorWorkflow; import com.netflix.conductor.sdk.workflow.def.tasks.Http; import com.netflix.conductor.sdk.workflow.def.tasks.SimpleTask; diff --git a/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/OpenSearchRestDaoBaseTest.java b/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/OpenSearchRestDaoBaseTest.java index 8e17b97be..b7bb57a65 100644 --- a/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/OpenSearchRestDaoBaseTest.java +++ b/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/OpenSearchRestDaoBaseTest.java @@ -16,15 +16,14 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; -import java.util.regex.Pattern; import org.apache.http.HttpHost; +import org.junit.After; +import org.junit.Before; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; -import org.junit.After; -import org.junit.Before; import org.springframework.retry.support.RetryTemplate; public abstract class OpenSearchRestDaoBaseTest extends OpenSearchTest { diff --git a/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/OpenSearchTest.java b/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/OpenSearchTest.java index 980b7a496..a296d279b 100644 --- a/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/OpenSearchTest.java +++ b/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/OpenSearchTest.java @@ -12,22 +12,22 @@ */ package com.netflix.conductor.os.dao.index; -import com.netflix.conductor.os.config.OpenSearchProperties; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.runner.RunWith; +import org.opensearch.testcontainers.OpensearchContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringRunner; -import org.opensearch.testcontainers.OpensearchContainer; +import org.testcontainers.utility.DockerImageName; import com.netflix.conductor.common.config.TestObjectMapperConfiguration; +import com.netflix.conductor.os.config.OpenSearchProperties; import com.fasterxml.jackson.databind.ObjectMapper; -import org.testcontainers.utility.DockerImageName; @ContextConfiguration( classes = {TestObjectMapperConfiguration.class, OpenSearchTest.TestConfiguration.class}) @@ -46,7 +46,10 @@ public OpenSearchProperties elasticSearchProperties() { } protected static OpensearchContainer container = - new OpensearchContainer<>(DockerImageName.parse("opensearchproject/opensearch:2.18.0")); // this should match the client version + new OpensearchContainer<>( + DockerImageName.parse( + "opensearchproject/opensearch:2.18.0")); // this should match the client + // version @Autowired protected ObjectMapper objectMapper; diff --git a/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/TestBulkRequestBuilderWrapper.java b/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/TestBulkRequestBuilderWrapper.java index 617d3616f..9208cb509 100644 --- a/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/TestBulkRequestBuilderWrapper.java +++ b/os-persistence/src/test/java/com/netflix/conductor/os/dao/index/TestBulkRequestBuilderWrapper.java @@ -12,11 +12,11 @@ */ package com.netflix.conductor.os.dao.index; +import org.junit.Test; +import org.mockito.Mockito; import org.opensearch.action.bulk.BulkRequestBuilder; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.update.UpdateRequest; -import org.junit.Test; -import org.mockito.Mockito; public class TestBulkRequestBuilderWrapper { BulkRequestBuilder builder = Mockito.mock(BulkRequestBuilder.class);