diff --git a/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java b/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java index 5326abe4d..1f3780bcd 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java +++ b/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java @@ -81,25 +81,48 @@ public void setRootURI(String root) { } protected void delete(String url, Object... uriVariables) { - deleteWithUriVariables(null, url, uriVariables); + deleteWithUriVariables(null, null, url, uriVariables); + } + + protected void delete(Map headers, String url, Object... uriVariables) { + deleteWithUriVariables(null, headers, url, uriVariables); } protected void deleteWithUriVariables( Object[] queryParams, String url, Object... uriVariables) { - delete(queryParams, url, uriVariables, null); + delete(queryParams, Map.of(), url, uriVariables, null); + } + + protected void deleteWithUriVariables( + Object[] queryParams, Map headers, String url, Object... uriVariables) { + delete(queryParams, headers, url, uriVariables, null); } protected BulkResponse deleteWithRequestBody(Object[] queryParams, String url, Object body) { - return delete(queryParams, url, null, body); + return deleteWithRequestBody(queryParams, Map.of(), url, body); + } + + protected BulkResponse deleteWithRequestBody( + Object[] queryParams, Map headers, String url, Object body) { + return delete(queryParams, headers, url, null, body); } private BulkResponse delete( Object[] queryParams, String url, Object[] uriVariables, Object body) { + return delete(queryParams, Map.of(), url, uriVariables, body); + } + + private BulkResponse delete( + Object[] queryParams, + Map headers, + String url, + Object[] uriVariables, + Object body) { URI uri = null; BulkResponse response = null; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - response = requestHandler.delete(uri, body); + response = requestHandler.delete(uri, headers, body); } catch (UniformInterfaceException e) { handleUniformInterfaceException(e, uri); } catch (RuntimeException e) { @@ -109,18 +132,32 @@ private BulkResponse delete( } protected void put(String url, Object[] queryParams, Object request, Object... uriVariables) { + put(url, queryParams, Map.of(), request, uriVariables); + } + + protected void put( + String url, + Object[] queryParams, + Map headers, + Object request, + Object... uriVariables) { URI uri = null; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - requestHandler.getWebResourceBuilder(uri, request).put(); + requestHandler.getWebResourceBuilder(uri, headers, request).put(); } catch (RuntimeException e) { handleException(uri, e); } } protected void postForEntityWithRequestOnly(String url, Object request) { + postForEntityWithRequestOnly(url, Map.of(), request); + } + + protected void postForEntityWithRequestOnly( + String url, Map headers, Object request) { Class type = null; - postForEntity(url, request, null, type); + postForEntity(url, request, null, headers, type); } protected void postForEntityWithUriVariablesOnly(String url, Object... uriVariables) { @@ -134,10 +171,37 @@ protected T postForEntity( Object[] queryParams, Class responseType, Object... uriVariables) { + return postForEntity(url, request, queryParams, Map.of(), responseType, uriVariables); + } + + protected T postForEntity( + String url, + Object request, + Object[] queryParams, + Map headers, + Class responseType, + Object... uriVariables) { + return postForEntity( + url, + request, + queryParams, + headers, + responseType, + builder -> builder.post(responseType), + uriVariables); + } + + protected T postForEntity( + String url, + Object request, + Object[] queryParams, + GenericType responseType, + Object... uriVariables) { return postForEntity( url, request, queryParams, + Map.of(), responseType, builder -> builder.post(responseType), uriVariables); @@ -147,12 +211,14 @@ protected T postForEntity( String url, Object request, Object[] queryParams, + Map headers, GenericType responseType, Object... uriVariables) { return postForEntity( url, request, queryParams, + headers, responseType, builder -> builder.post(responseType), uriVariables); @@ -165,10 +231,23 @@ private T postForEntity( Object responseType, Function postWithEntity, Object... uriVariables) { + return postForEntity( + url, request, queryParams, Map.of(), responseType, postWithEntity, uriVariables); + } + + private T postForEntity( + String url, + Object request, + Object[] queryParams, + Map headers, + Object responseType, + Function postWithEntity, + Object... uriVariables) { URI uri = null; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - Builder webResourceBuilder = requestHandler.getWebResourceBuilder(uri, request); + Builder webResourceBuilder = + requestHandler.getWebResourceBuilder(uri, headers, request); if (responseType == null) { webResourceBuilder.post(); return null; @@ -185,7 +264,25 @@ private T postForEntity( protected T getForEntity( String url, Object[] queryParams, Class responseType, Object... uriVariables) { return getForEntity( - url, queryParams, response -> response.getEntity(responseType), uriVariables); + url, + queryParams, + Map.of(), + response -> response.getEntity(responseType), + uriVariables); + } + + protected T getForEntity( + String url, + Object[] queryParams, + Map headers, + Class responseType, + Object... uriVariables) { + return getForEntity( + url, + queryParams, + headers, + response -> response.getEntity(responseType), + uriVariables); } protected T getForEntity( @@ -194,16 +291,39 @@ protected T getForEntity( url, queryParams, response -> response.getEntity(responseType), uriVariables); } + protected T getForEntity( + String url, + Object[] queryParams, + Map headers, + GenericType responseType, + Object... uriVariables) { + return getForEntity( + url, + queryParams, + headers, + response -> response.getEntity(responseType), + uriVariables); + } + + private T getForEntity( + String url, + Object[] queryParams, + Function entityProvider, + Object... uriVariables) { + return getForEntity(url, queryParams, Map.of(), entityProvider, uriVariables); + } + private T getForEntity( String url, Object[] queryParams, + Map headers, Function entityProvider, Object... uriVariables) { URI uri = null; ClientResponse clientResponse; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - clientResponse = requestHandler.get(uri); + clientResponse = requestHandler.get(uri, headers); if (clientResponse.getStatus() < 300) { return entityProvider.apply(clientResponse); } else { diff --git a/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java b/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java index 164cb579b..71eef9959 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java +++ b/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java @@ -13,6 +13,7 @@ package com.netflix.conductor.client.http; import java.net.URI; +import java.util.Map; import javax.ws.rs.core.MediaType; @@ -57,29 +58,56 @@ public ClientRequestHandler( } public BulkResponse delete(URI uri, Object body) { + return delete(uri, Map.of(), body); + } + + public BulkResponse delete(URI uri, Map headers, Object body) { if (body != null) { - return client.resource(uri) + return getWebResourceBuilder(client.resource(uri), headers) .type(MediaType.APPLICATION_JSON_TYPE) .delete(BulkResponse.class, body); } else { - client.resource(uri).delete(); + getWebResourceBuilder(client.resource(uri), headers).delete(); } return null; } public ClientResponse get(URI uri) { - return client.resource(uri) + return get(uri, Map.of()); + } + + public ClientResponse get(URI uri, Map headers) { + return getWebResourceBuilder(client.resource(uri), headers) .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN) .get(ClientResponse.class); } - public WebResource.Builder getWebResourceBuilder(URI URI, Object entity) { - return client.resource(URI) + public WebResource.Builder getWebResourceBuilder(URI uri, Object entity) { + return getWebResourceBuilder(uri, Map.of(), entity); + } + + public WebResource.Builder getWebResourceBuilder( + URI uri, Map headers, Object entity) { + return getWebResourceBuilder(client.resource(uri), headers) .type(MediaType.APPLICATION_JSON) .entity(entity) .accept(MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON); } + private WebResource.Builder getWebResourceBuilder( + WebResource resource, Map headers) { + WebResource.Builder builder = resource.getRequestBuilder(); + if (headers == null || headers.isEmpty()) { + return builder; + } + + for (Map.Entry entry : headers.entrySet()) { + builder = builder.header(entry.getKey(), entry.getValue()); + } + + return builder; + } + private boolean isNewerJacksonVersion() { Version version = com.fasterxml.jackson.databind.cfg.PackageVersion.VERSION; return version.getMajorVersion() == 2 && version.getMinorVersion() >= 12; diff --git a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java index 2db8df237..51ac4ae7f 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java @@ -13,6 +13,7 @@ package com.netflix.conductor.client.http; import java.util.List; +import java.util.Map; import org.apache.commons.lang3.Validate; @@ -91,8 +92,12 @@ public MetadataClient( * @param workflowDef the workflow definition */ public void registerWorkflowDef(WorkflowDef workflowDef) { + registerWorkflowDef(workflowDef, Map.of()); + } + + protected void registerWorkflowDef(WorkflowDef workflowDef, Map headers) { Validate.notNull(workflowDef, "Workflow definition cannot be null"); - postForEntityWithRequestOnly("metadata/workflow", workflowDef); + postForEntityWithRequestOnly("metadata/workflow", headers, workflowDef); } public void validateWorkflowDef(WorkflowDef workflowDef) { @@ -118,10 +123,16 @@ public void updateWorkflowDefs(List workflowDefs) { * @return Workflow definition for the given workflow and version */ public WorkflowDef getWorkflowDef(String name, Integer version) { + return getWorkflowDef(name, version, Map.of()); + } + + protected WorkflowDef getWorkflowDef( + String name, Integer version, Map headers) { Validate.notBlank(name, "name cannot be blank"); return getForEntity( "metadata/workflow/{name}", new Object[] {"version", version}, + headers, WorkflowDef.class, name); } diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index 35d48b2e3..e970c8990 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -150,6 +150,15 @@ public Task pollTask(String taskType, String workerId, String domain) { */ public List batchPollTasksByTaskType( String taskType, String workerId, int count, int timeoutInMillisecond) { + return batchPollTasksByTaskType(taskType, workerId, count, timeoutInMillisecond, Map.of()); + } + + protected List batchPollTasksByTaskType( + String taskType, + String workerId, + int count, + int timeoutInMillisecond, + Map headers) { Validate.notBlank(taskType, "Task type cannot be blank"); Validate.notBlank(workerId, "Worker id cannot be blank"); Validate.isTrue(count > 0, "Count must be greater than 0"); @@ -158,7 +167,8 @@ public List batchPollTasksByTaskType( new Object[] { "workerid", workerId, "count", count, "timeout", timeoutInMillisecond }; - List tasks = getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType); + List tasks = + getForEntity("tasks/poll/batch/{taskType}", params, headers, taskList, taskType); tasks.forEach(this::populateTaskPayloads); return tasks; } @@ -176,6 +186,17 @@ public List batchPollTasksByTaskType( */ public List batchPollTasksInDomain( String taskType, String domain, String workerId, int count, int timeoutInMillisecond) { + return batchPollTasksInDomain( + taskType, domain, workerId, count, timeoutInMillisecond, Map.of()); + } + + protected List batchPollTasksInDomain( + String taskType, + String domain, + String workerId, + int count, + int timeoutInMillisecond, + Map headers) { Validate.notBlank(taskType, "Task type cannot be blank"); Validate.notBlank(workerId, "Worker id cannot be blank"); Validate.isTrue(count > 0, "Count must be greater than 0"); @@ -191,7 +212,8 @@ public List batchPollTasksInDomain( "domain", domain }; - List tasks = getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType); + List tasks = + getForEntity("tasks/poll/batch/{taskType}", params, headers, taskList, taskType); tasks.forEach(this::populateTaskPayloads); return tasks; } @@ -236,8 +258,12 @@ private void populateTaskPayloads(Task task) { * @param taskResult the {@link TaskResult} of the executed task to be updated. */ public void updateTask(TaskResult taskResult) { + updateTask(taskResult, Map.of()); + } + + protected void updateTask(TaskResult taskResult, Map headers) { Validate.notNull(taskResult, "Task result cannot be null"); - postForEntityWithRequestOnly("tasks", taskResult); + postForEntityWithRequestOnly("tasks", headers, taskResult); } public Optional evaluateAndUploadLargePayload( diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 14deabff3..a014018b4 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -15,6 +15,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; @@ -116,6 +117,11 @@ public WorkflowClient( * @throws IllegalArgumentException if {@link StartWorkflowRequest#getName()} is empty. */ public String startWorkflow(StartWorkflowRequest startWorkflowRequest) { + return startWorkflow(startWorkflowRequest, Map.of()); + } + + protected String startWorkflow( + StartWorkflowRequest startWorkflowRequest, Map headers) { Validate.notNull(startWorkflowRequest, "StartWorkflowRequest cannot be null"); Validate.notBlank(startWorkflowRequest.getName(), "Workflow name cannot be null or empty"); Validate.isTrue( @@ -173,6 +179,7 @@ public String startWorkflow(StartWorkflowRequest startWorkflowRequest) { "workflow", startWorkflowRequest, null, + headers, String.class, startWorkflowRequest.getName()); } catch (ConductorClientException e) {