From f3d1ddf0ed8d620ab591936f3c6f3d987730a388 Mon Sep 17 00:00:00 2001 From: Manu Nazareth Date: Mon, 26 Aug 2024 20:05:23 +0530 Subject: [PATCH] Refactor to allow customization of conductor client library The HTTP facing element of conductor client is refactored to allow effective customization. Also, supplying of http headers (as simple as an auth token when required) is taken care with providing defaults and hence with no changes in behavior. --- .../conductor/client/http/ClientBase.java | 138 ++++++++++++++++-- .../client/http/ClientRequestHandler.java | 38 ++++- .../conductor/client/http/MetadataClient.java | 13 +- .../conductor/client/http/TaskClient.java | 32 +++- .../conductor/client/http/WorkflowClient.java | 7 + 5 files changed, 210 insertions(+), 18 deletions(-) diff --git a/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java b/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java index 5326abe4d..1f3780bcd 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java +++ b/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java @@ -81,25 +81,48 @@ public void setRootURI(String root) { } protected void delete(String url, Object... uriVariables) { - deleteWithUriVariables(null, url, uriVariables); + deleteWithUriVariables(null, null, url, uriVariables); + } + + protected void delete(Map headers, String url, Object... uriVariables) { + deleteWithUriVariables(null, headers, url, uriVariables); } protected void deleteWithUriVariables( Object[] queryParams, String url, Object... uriVariables) { - delete(queryParams, url, uriVariables, null); + delete(queryParams, Map.of(), url, uriVariables, null); + } + + protected void deleteWithUriVariables( + Object[] queryParams, Map headers, String url, Object... uriVariables) { + delete(queryParams, headers, url, uriVariables, null); } protected BulkResponse deleteWithRequestBody(Object[] queryParams, String url, Object body) { - return delete(queryParams, url, null, body); + return deleteWithRequestBody(queryParams, Map.of(), url, body); + } + + protected BulkResponse deleteWithRequestBody( + Object[] queryParams, Map headers, String url, Object body) { + return delete(queryParams, headers, url, null, body); } private BulkResponse delete( Object[] queryParams, String url, Object[] uriVariables, Object body) { + return delete(queryParams, Map.of(), url, uriVariables, body); + } + + private BulkResponse delete( + Object[] queryParams, + Map headers, + String url, + Object[] uriVariables, + Object body) { URI uri = null; BulkResponse response = null; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - response = requestHandler.delete(uri, body); + response = requestHandler.delete(uri, headers, body); } catch (UniformInterfaceException e) { handleUniformInterfaceException(e, uri); } catch (RuntimeException e) { @@ -109,18 +132,32 @@ private BulkResponse delete( } protected void put(String url, Object[] queryParams, Object request, Object... uriVariables) { + put(url, queryParams, Map.of(), request, uriVariables); + } + + protected void put( + String url, + Object[] queryParams, + Map headers, + Object request, + Object... uriVariables) { URI uri = null; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - requestHandler.getWebResourceBuilder(uri, request).put(); + requestHandler.getWebResourceBuilder(uri, headers, request).put(); } catch (RuntimeException e) { handleException(uri, e); } } protected void postForEntityWithRequestOnly(String url, Object request) { + postForEntityWithRequestOnly(url, Map.of(), request); + } + + protected void postForEntityWithRequestOnly( + String url, Map headers, Object request) { Class type = null; - postForEntity(url, request, null, type); + postForEntity(url, request, null, headers, type); } protected void postForEntityWithUriVariablesOnly(String url, Object... uriVariables) { @@ -134,10 +171,37 @@ protected T postForEntity( Object[] queryParams, Class responseType, Object... uriVariables) { + return postForEntity(url, request, queryParams, Map.of(), responseType, uriVariables); + } + + protected T postForEntity( + String url, + Object request, + Object[] queryParams, + Map headers, + Class responseType, + Object... uriVariables) { + return postForEntity( + url, + request, + queryParams, + headers, + responseType, + builder -> builder.post(responseType), + uriVariables); + } + + protected T postForEntity( + String url, + Object request, + Object[] queryParams, + GenericType responseType, + Object... uriVariables) { return postForEntity( url, request, queryParams, + Map.of(), responseType, builder -> builder.post(responseType), uriVariables); @@ -147,12 +211,14 @@ protected T postForEntity( String url, Object request, Object[] queryParams, + Map headers, GenericType responseType, Object... uriVariables) { return postForEntity( url, request, queryParams, + headers, responseType, builder -> builder.post(responseType), uriVariables); @@ -165,10 +231,23 @@ private T postForEntity( Object responseType, Function postWithEntity, Object... uriVariables) { + return postForEntity( + url, request, queryParams, Map.of(), responseType, postWithEntity, uriVariables); + } + + private T postForEntity( + String url, + Object request, + Object[] queryParams, + Map headers, + Object responseType, + Function postWithEntity, + Object... uriVariables) { URI uri = null; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - Builder webResourceBuilder = requestHandler.getWebResourceBuilder(uri, request); + Builder webResourceBuilder = + requestHandler.getWebResourceBuilder(uri, headers, request); if (responseType == null) { webResourceBuilder.post(); return null; @@ -185,7 +264,25 @@ private T postForEntity( protected T getForEntity( String url, Object[] queryParams, Class responseType, Object... uriVariables) { return getForEntity( - url, queryParams, response -> response.getEntity(responseType), uriVariables); + url, + queryParams, + Map.of(), + response -> response.getEntity(responseType), + uriVariables); + } + + protected T getForEntity( + String url, + Object[] queryParams, + Map headers, + Class responseType, + Object... uriVariables) { + return getForEntity( + url, + queryParams, + headers, + response -> response.getEntity(responseType), + uriVariables); } protected T getForEntity( @@ -194,16 +291,39 @@ protected T getForEntity( url, queryParams, response -> response.getEntity(responseType), uriVariables); } + protected T getForEntity( + String url, + Object[] queryParams, + Map headers, + GenericType responseType, + Object... uriVariables) { + return getForEntity( + url, + queryParams, + headers, + response -> response.getEntity(responseType), + uriVariables); + } + + private T getForEntity( + String url, + Object[] queryParams, + Function entityProvider, + Object... uriVariables) { + return getForEntity(url, queryParams, Map.of(), entityProvider, uriVariables); + } + private T getForEntity( String url, Object[] queryParams, + Map headers, Function entityProvider, Object... uriVariables) { URI uri = null; ClientResponse clientResponse; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - clientResponse = requestHandler.get(uri); + clientResponse = requestHandler.get(uri, headers); if (clientResponse.getStatus() < 300) { return entityProvider.apply(clientResponse); } else { diff --git a/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java b/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java index 164cb579b..71eef9959 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java +++ b/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java @@ -13,6 +13,7 @@ package com.netflix.conductor.client.http; import java.net.URI; +import java.util.Map; import javax.ws.rs.core.MediaType; @@ -57,29 +58,56 @@ public ClientRequestHandler( } public BulkResponse delete(URI uri, Object body) { + return delete(uri, Map.of(), body); + } + + public BulkResponse delete(URI uri, Map headers, Object body) { if (body != null) { - return client.resource(uri) + return getWebResourceBuilder(client.resource(uri), headers) .type(MediaType.APPLICATION_JSON_TYPE) .delete(BulkResponse.class, body); } else { - client.resource(uri).delete(); + getWebResourceBuilder(client.resource(uri), headers).delete(); } return null; } public ClientResponse get(URI uri) { - return client.resource(uri) + return get(uri, Map.of()); + } + + public ClientResponse get(URI uri, Map headers) { + return getWebResourceBuilder(client.resource(uri), headers) .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN) .get(ClientResponse.class); } - public WebResource.Builder getWebResourceBuilder(URI URI, Object entity) { - return client.resource(URI) + public WebResource.Builder getWebResourceBuilder(URI uri, Object entity) { + return getWebResourceBuilder(uri, Map.of(), entity); + } + + public WebResource.Builder getWebResourceBuilder( + URI uri, Map headers, Object entity) { + return getWebResourceBuilder(client.resource(uri), headers) .type(MediaType.APPLICATION_JSON) .entity(entity) .accept(MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON); } + private WebResource.Builder getWebResourceBuilder( + WebResource resource, Map headers) { + WebResource.Builder builder = resource.getRequestBuilder(); + if (headers == null || headers.isEmpty()) { + return builder; + } + + for (Map.Entry entry : headers.entrySet()) { + builder = builder.header(entry.getKey(), entry.getValue()); + } + + return builder; + } + private boolean isNewerJacksonVersion() { Version version = com.fasterxml.jackson.databind.cfg.PackageVersion.VERSION; return version.getMajorVersion() == 2 && version.getMinorVersion() >= 12; diff --git a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java index 2db8df237..51ac4ae7f 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java @@ -13,6 +13,7 @@ package com.netflix.conductor.client.http; import java.util.List; +import java.util.Map; import org.apache.commons.lang3.Validate; @@ -91,8 +92,12 @@ public MetadataClient( * @param workflowDef the workflow definition */ public void registerWorkflowDef(WorkflowDef workflowDef) { + registerWorkflowDef(workflowDef, Map.of()); + } + + protected void registerWorkflowDef(WorkflowDef workflowDef, Map headers) { Validate.notNull(workflowDef, "Workflow definition cannot be null"); - postForEntityWithRequestOnly("metadata/workflow", workflowDef); + postForEntityWithRequestOnly("metadata/workflow", headers, workflowDef); } public void validateWorkflowDef(WorkflowDef workflowDef) { @@ -118,10 +123,16 @@ public void updateWorkflowDefs(List workflowDefs) { * @return Workflow definition for the given workflow and version */ public WorkflowDef getWorkflowDef(String name, Integer version) { + return getWorkflowDef(name, version, Map.of()); + } + + protected WorkflowDef getWorkflowDef( + String name, Integer version, Map headers) { Validate.notBlank(name, "name cannot be blank"); return getForEntity( "metadata/workflow/{name}", new Object[] {"version", version}, + headers, WorkflowDef.class, name); } diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index 35d48b2e3..e970c8990 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -150,6 +150,15 @@ public Task pollTask(String taskType, String workerId, String domain) { */ public List batchPollTasksByTaskType( String taskType, String workerId, int count, int timeoutInMillisecond) { + return batchPollTasksByTaskType(taskType, workerId, count, timeoutInMillisecond, Map.of()); + } + + protected List batchPollTasksByTaskType( + String taskType, + String workerId, + int count, + int timeoutInMillisecond, + Map headers) { Validate.notBlank(taskType, "Task type cannot be blank"); Validate.notBlank(workerId, "Worker id cannot be blank"); Validate.isTrue(count > 0, "Count must be greater than 0"); @@ -158,7 +167,8 @@ public List batchPollTasksByTaskType( new Object[] { "workerid", workerId, "count", count, "timeout", timeoutInMillisecond }; - List tasks = getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType); + List tasks = + getForEntity("tasks/poll/batch/{taskType}", params, headers, taskList, taskType); tasks.forEach(this::populateTaskPayloads); return tasks; } @@ -176,6 +186,17 @@ public List batchPollTasksByTaskType( */ public List batchPollTasksInDomain( String taskType, String domain, String workerId, int count, int timeoutInMillisecond) { + return batchPollTasksInDomain( + taskType, domain, workerId, count, timeoutInMillisecond, Map.of()); + } + + protected List batchPollTasksInDomain( + String taskType, + String domain, + String workerId, + int count, + int timeoutInMillisecond, + Map headers) { Validate.notBlank(taskType, "Task type cannot be blank"); Validate.notBlank(workerId, "Worker id cannot be blank"); Validate.isTrue(count > 0, "Count must be greater than 0"); @@ -191,7 +212,8 @@ public List batchPollTasksInDomain( "domain", domain }; - List tasks = getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType); + List tasks = + getForEntity("tasks/poll/batch/{taskType}", params, headers, taskList, taskType); tasks.forEach(this::populateTaskPayloads); return tasks; } @@ -236,8 +258,12 @@ private void populateTaskPayloads(Task task) { * @param taskResult the {@link TaskResult} of the executed task to be updated. */ public void updateTask(TaskResult taskResult) { + updateTask(taskResult, Map.of()); + } + + protected void updateTask(TaskResult taskResult, Map headers) { Validate.notNull(taskResult, "Task result cannot be null"); - postForEntityWithRequestOnly("tasks", taskResult); + postForEntityWithRequestOnly("tasks", headers, taskResult); } public Optional evaluateAndUploadLargePayload( diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 14deabff3..a014018b4 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -15,6 +15,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; @@ -116,6 +117,11 @@ public WorkflowClient( * @throws IllegalArgumentException if {@link StartWorkflowRequest#getName()} is empty. */ public String startWorkflow(StartWorkflowRequest startWorkflowRequest) { + return startWorkflow(startWorkflowRequest, Map.of()); + } + + protected String startWorkflow( + StartWorkflowRequest startWorkflowRequest, Map headers) { Validate.notNull(startWorkflowRequest, "StartWorkflowRequest cannot be null"); Validate.notBlank(startWorkflowRequest.getName(), "Workflow name cannot be null or empty"); Validate.isTrue( @@ -173,6 +179,7 @@ public String startWorkflow(StartWorkflowRequest startWorkflowRequest) { "workflow", startWorkflowRequest, null, + headers, String.class, startWorkflowRequest.getName()); } catch (ConductorClientException e) {