Skip to content

Commit

Permalink
feat: patch method support for http sink (#188) (#191)
Browse files Browse the repository at this point in the history
* feat: patch method support for http sink

Signed-off-by: Deep Mistry <[email protected]>

* address PR comments

Signed-off-by: Deep Mistry <[email protected]>

Signed-off-by: Deep Mistry <[email protected]>

Signed-off-by: Deep Mistry <[email protected]>
Co-authored-by: Deep Mistry <[email protected]>
  • Loading branch information
lavkesh and mistrys47 authored Aug 24, 2022
1 parent f3e520e commit b052605
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docs/docs/reference/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ configs mean, please have a look at the config section.

#### Does it support DELETE calls?

At the moment, the HTTP Sink supports only PUT and POST methods.
At the moment, the HTTP Sink supports only PUT, POST and PATCH methods.

#### How many messages are pushed in one call?

Expand Down
4 changes: 2 additions & 2 deletions docs/docs/sinks/http-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ An Http sink Firehose \(`SINK_TYPE`=`http`\) requires the following variables to

### `SINK_HTTP_SERVICE_URL`

The HTTP endpoint of the service to which this consumer should PUT/POST data. This can be configured as per the requirement, a constant or a dynamic one \(which extract given field values from each message and use that as the endpoint\)
The HTTP endpoint of the service to which this consumer should PUT/POST/PATCH data. This can be configured as per the requirement, a constant or a dynamic one \(which extract given field values from each message and use that as the endpoint\)
If service url is constant, messages will be sent as batches while in case of dynamic one each message will be sent as a separate request \(Since they’d be having different endpoints\).

- Example value: `http://http-service.test.io`
Expand All @@ -15,7 +15,7 @@ If service url is constant, messages will be sent as batches while in case of dy

### `SINK_HTTP_REQUEST_METHOD`

Defines the HTTP verb supported by the endpoint, Supports PUT and POST verbs as of now.
Defines the HTTP verb supported by the endpoint, Supports PUT, POST and PATCH verbs as of now.

- Example value: `post`
- Type: `required`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public enum HttpSinkRequestMethodType {
PUT,
POST
POST,
PATCH
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.odpf.firehose.config.enums.HttpSinkRequestMethodType;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpPatch;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;

Expand All @@ -19,10 +20,13 @@ public class HttpRequestMethodFactory {
* @return the http entity enclosing request base
*/
public static HttpEntityEnclosingRequestBase create(URI uri, HttpSinkRequestMethodType method) {
if (method.equals(HttpSinkRequestMethodType.POST)) {
return new HttpPost(uri);
} else {
return new HttpPut(uri);
switch (method) {
case POST:
return new HttpPost(uri);
case PATCH:
return new HttpPatch(uri);
default:
return new HttpPut(uri);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void shouldWrapMessageToASingleRequest() throws DeserializerException, UR
List<HttpEntityEnclosingRequestBase> requests = batchRequestCreator.create(messages, requestEntityBuilder);

assertEquals(1, requests.size());
assertEquals(HttpSinkRequestMethodType.PUT.toString(), requests.get(0).getMethod());
verify(firehoseInstrumentation, times(1)).logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: {}\nRequest method: {}",
uriBuilder.build(), headerBuilder.build(), jsonBody.serialize(messages), HttpSinkRequestMethodType.PUT);
}
Expand All @@ -72,10 +73,22 @@ public void shouldWrapMessageToASingleRequestWhenPostRequest() throws Deserializ
List<HttpEntityEnclosingRequestBase> requests = batchRequestCreator.create(messages, requestEntityBuilder);

assertEquals(1, requests.size());
assertEquals(HttpSinkRequestMethodType.POST.toString(), requests.get(0).getMethod());
verify(firehoseInstrumentation, times(1)).logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: {}\nRequest method: {}",
uriBuilder.build(), headerBuilder.build(), jsonBody.serialize(messages), HttpSinkRequestMethodType.POST);
}

@Test
public void shouldWrapMessageToASingleRequestWhenPatchRequest() throws DeserializerException, URISyntaxException {
BatchRequestCreator batchRequestCreator = new BatchRequestCreator(firehoseInstrumentation, uriBuilder, headerBuilder, HttpSinkRequestMethodType.PATCH, jsonBody);
List<HttpEntityEnclosingRequestBase> requests = batchRequestCreator.create(messages, requestEntityBuilder);

assertEquals(1, requests.size());
assertEquals(HttpSinkRequestMethodType.PATCH.toString(), requests.get(0).getMethod());
verify(firehoseInstrumentation, times(1)).logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: {}\nRequest method: {}",
uriBuilder.build(), headerBuilder.build(), jsonBody.serialize(messages), HttpSinkRequestMethodType.PATCH);
}

@Test
public void shouldWrapMessagesToASingleRequest() throws DeserializerException, URISyntaxException {
Message message1 = new Message(new byte[]{10, 20}, new byte[]{1, 2}, "sample-topic", 0, 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,39 @@ public void shouldProduceIndividualRequests() throws DeserializerException, URIS
List<HttpEntityEnclosingRequestBase> requests = individualRequestCreator.create(messages, requestEntityBuilder);

assertEquals(2, requests.size());
assertEquals(HttpSinkRequestMethodType.PUT.toString(), requests.get(0).getMethod());
assertEquals(HttpSinkRequestMethodType.PUT.toString(), requests.get(1).getMethod());
verify(firehoseInstrumentation, times(1)).logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: {}\nRequest method: {}",
uriBuilder.build(), headerBuilder.build(), jsonBody.serialize(messages).get(0), HttpSinkRequestMethodType.PUT);
verify(firehoseInstrumentation, times(1)).logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: {}\nRequest method: {}",
uriBuilder.build(), headerBuilder.build(), jsonBody.serialize(messages).get(1), HttpSinkRequestMethodType.PUT);
}

@Test
public void shouldProduceIndividualRequestsWhenPatchRequest() throws DeserializerException, URISyntaxException {
Message message1 = new Message(new byte[]{10, 20}, new byte[]{1, 2}, "sample-topic", 0, 100);
Message message2 = new Message(new byte[]{10, 20}, new byte[]{1, 2}, "sample-topic", 0, 100);
ArrayList<Message> messages = new ArrayList<>();
messages.add(message1);
messages.add(message2);

ArrayList<String> serializedMessages = new ArrayList<>();
serializedMessages.add("dummyMessage1");
serializedMessages.add("dummyMessage2");
when(jsonBody.serialize(messages)).thenReturn(serializedMessages);

IndividualRequestCreator individualRequestCreator = new IndividualRequestCreator(firehoseInstrumentation, uriBuilder, headerBuilder, HttpSinkRequestMethodType.PATCH, jsonBody);
List<HttpEntityEnclosingRequestBase> requests = individualRequestCreator.create(messages, requestEntityBuilder);

assertEquals(2, requests.size());
assertEquals(HttpSinkRequestMethodType.PATCH.toString(), requests.get(0).getMethod());
assertEquals(HttpSinkRequestMethodType.PATCH.toString(), requests.get(1).getMethod());
verify(firehoseInstrumentation, times(1)).logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: {}\nRequest method: {}",
uriBuilder.build(), headerBuilder.build(), jsonBody.serialize(messages).get(0), HttpSinkRequestMethodType.PATCH);
verify(firehoseInstrumentation, times(1)).logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: {}\nRequest method: {}",
uriBuilder.build(), headerBuilder.build(), jsonBody.serialize(messages).get(1), HttpSinkRequestMethodType.PATCH);
}

@Test
public void shouldSetRequestPropertiesMultipleTimes() throws DeserializerException, URISyntaxException {
Message message1 = new Message(new byte[]{10, 20}, new byte[]{1, 2}, "sample-topic", 0, 100);
Expand Down

0 comments on commit b052605

Please sign in to comment.