Skip to content

Commit

Permalink
Merge pull request #20 from ashanhr/main
Browse files Browse the repository at this point in the history
Fix Indefinite Hanging of Consumer Scheduled Service in Transaction Counter
  • Loading branch information
IsuruMaduranga authored Nov 21, 2024
2 parents 8c074b8 + 1cc76d4 commit 74b759e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 21 deletions.
2 changes: 1 addition & 1 deletion counter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>org.wso2.integration.transaction.counter</groupId>
<version>1.1.0</version>
<version>1.2.0</version>
<artifactId>transaction-count-handler</artifactId>
<packaging>bundle</packaging>
<name>WSO2 Integration Transaction Counting Handler</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.wso2.integration.transaction.counter.record.TransactionRecord;

import java.io.IOException;
Expand All @@ -40,57 +40,61 @@
public class TransactionRecordStoreImpl implements TransactionRecordStore {

private static final Log LOG = LogFactory.getLog(TransactionRecordStoreImpl.class);
private static HttpClient httpClient;
private static String ENDPOINT;
private static String encodedCredentials;
private CloseableHttpClient httpClient;
private String ENDPOINT;
private String encodedCredentials;

@Override
public void init(String endpoint, String username, String password) {

ENDPOINT = endpoint;
String credentials = username + ":" + password;
encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes(StandardCharsets.UTF_8));

URL url = null;
try {
url = new URL(endpoint);
new URL(endpoint);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
throw new RuntimeException("Invalid endpoint URL: " + endpoint, e);
}
httpClient = HttpClientBuilder.create().build();

httpClient = HttpClients.createDefault();
}

@Override
public boolean commit(ArrayList<TransactionRecord> transactionRecordList, int maxRetryCount) {

Gson gson = new Gson();
String jsonPayload = gson.toJson(transactionRecordList);

HttpPost httpPost = new HttpPost(ENDPOINT);
HttpEntity stringEntity = new StringEntity(jsonPayload , ContentType.APPLICATION_JSON);
httpPost.setEntity(stringEntity);
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
httpPost.setHeader("Authorization", "Basic " + encodedCredentials);

int retryCount = 0;
boolean retry;

do {
try {
retry = false;
HttpResponse result = httpClient.execute(httpPost);
int statusCode = result.getStatusLine().getStatusCode();
if (statusCode < 200 || statusCode >= 300) {
throw new IOException("Status Code: " + statusCode);

HttpEntity stringEntity = new StringEntity(jsonPayload, ContentType.APPLICATION_JSON);
httpPost.setEntity(stringEntity);

try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode < 200 || statusCode >= 300) {
throw new IOException("Unexpected response status: " + statusCode);
}
}

} catch (IOException ex) {
retryCount++;
if (retryCount < maxRetryCount) {
retry = true;
LOG.warn("Failed to persist transaction count records to remote endpoint. Retrying after 1s");
LOG.warn("Failed to persist transaction count records to remote endpoint. Retrying after 1s", ex);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Could not persist following transaction count records: " + jsonPayload, e);
}
} else {
Expand All @@ -106,6 +110,13 @@ public boolean commit(ArrayList<TransactionRecord> transactionRecordList, int ma

@Override
public void clenUp() {
// To be implemented
try {
if (httpClient != null) {
httpClient.close();
}
} catch (IOException e) {
LOG.error("Error while closing the HttpClient", e);
}
}
}

0 comments on commit 74b759e

Please sign in to comment.