Skip to content

Commit

Permalink
Rewrite job status test to use direct HTTP calls and avoid using the …
Browse files Browse the repository at this point in the history
…client for build reasons.
  • Loading branch information
Miles-Garnsey committed Sep 21, 2023
1 parent b9f5f7f commit 13d2017
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;

import com.datastax.mgmtapi.client.api.DefaultApi;
import com.datastax.mgmtapi.client.invoker.ApiClient;
import com.datastax.mgmtapi.helpers.IntegrationTestUtils;
import com.datastax.mgmtapi.helpers.NettyHttpClient;
import com.datastax.mgmtapi.resources.models.CompactRequest;
Expand All @@ -34,6 +32,8 @@
import com.datastax.mgmtapi.resources.models.ScrubRequest;
import com.datastax.mgmtapi.resources.models.Table;
import com.datastax.mgmtapi.resources.models.TakeSnapshotRequest;
import com.datastax.mgmtapi.resources.v2.models.RepairParallelism;
import com.datastax.mgmtapi.resources.v2.models.RepairRequestResponse;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -1036,38 +1036,62 @@ public void testMoveNode() throws IOException, URISyntaxException {
});
}

public void ensureStatusChanges() throws Exception {
@Test
public void testEnsureStatusChanges() throws Exception {
assumeTrue(IntegrationTestUtils.shouldRun());
ensureStarted();
NettyHttpClient client = new NettyHttpClient(BASE_URL);
DefaultApi apiClient = new DefaultApi(new ApiClient().setBasePath(BASE_HOST));
com.datastax.mgmtapi.client.model.RepairRequest req =
new com.datastax.mgmtapi.client.model.RepairRequest()
.keyspace("system_distributed")
.fullRepair(true)
.notifications(true)
.repairParallelism(
com.datastax.mgmtapi.client.model.RepairRequest.RepairParallelismEnum.SEQUENTIAL)
.associatedTokens(
Collections.singletonList(
new com.datastax.mgmtapi.client.model.RingRange()
.start(Long.valueOf(-1))
.end(Long.valueOf(100))));

com.datastax.mgmtapi.resources.v2.models.RepairRequest req =
new com.datastax.mgmtapi.resources.v2.models.RepairRequest(
"system_distributed",
null,
true,
true,
Collections.singletonList(
new com.datastax.mgmtapi.resources.v2.models.RingRange(-1L, 100L)),
RepairParallelism.SEQUENTIAL,
null,
null);

logger.info("Sending repair request: {}", req);
String jobID = apiClient.putRepairV2(req).getRepairId();
Integer repairID = Integer.parseInt(jobID.substring(7)); // Trimming off "repair-" prefix.
URI repairUri = new URIBuilder(BASE_PATH_V2 + "/repairs").build();
Pair<Integer, String> repairResp =
client
.put(repairUri.toURL(), new ObjectMapper().writeValueAsString(req))
.thenApply(this::responseAsCodeAndBody)
.join();
logger.info("Repair response: {}", repairResp);
String jobID =
new ObjectMapper().readValue(repairResp.getRight(), RepairRequestResponse.class).repairID;
Integer repairID =
Integer.parseInt(
jobID.substring(7) // Trimming off "repair-" prefix.
);
logger.info("Repair ID: {}", repairID);
assertThat(repairID).isNotNull();
assertThat(repairID).isGreaterThan(0);

com.datastax.mgmtapi.client.model.Job status = apiClient.getJobStatus(jobID);
logger.info("Repair job status: {}", status);
assertThat(status.getStatus()).isNotNull();
assertThat(status.getStatusChanges()).isNotNull();
await().atMost(5, SECONDS).until(() -> status.getStatusChanges().size() > 0);
URI statusUri =
new URIBuilder(BASE_PATH_V2 + "/ops/executor/job").addParameter("job_id", jobID).build();
Pair<Integer, String> statusResp =
client.get(statusUri.toURL()).thenApply(this::responseAsCodeAndBody).join();
logger.info("Repair job status: {}", statusResp);
Job jobStatus = new ObjectMapper().readValue(statusResp.getRight(), Job.class);

assertThat(jobStatus.getStatus()).isNotNull();
assertThat(jobStatus.getStatusChanges()).isNotNull();
await()
.atMost(5, SECONDS)
.until(
() -> status.getStatus() == com.datastax.mgmtapi.client.model.Job.StatusEnum.COMPLETED);
() -> {
Pair<Integer, String> statusResp2 =
client.get(statusUri.toURL()).thenApply(this::responseAsCodeAndBody).join();
logger.info("Repair job status: {}", statusResp);
Job jobStatus2 = new ObjectMapper().readValue(statusResp.getRight(), Job.class);
return jobStatus2.getStatusChanges().size() > 0
&& jobStatus2.getStatus()
== com.datastax.mgmtapi.resources.models.Job.JobStatus.COMPLETED;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,36 @@ public CompletableFuture<FullHttpResponse> post(
return result;
}

public CompletableFuture<FullHttpResponse> put(
URL url, final CharSequence body, String contentType) throws UnsupportedEncodingException {
CompletableFuture<FullHttpResponse> result = new CompletableFuture<>();

if (!activeRequestFuture.compareAndSet(null, result))
throw new RuntimeException("outstanding request");

DefaultFullHttpRequest request =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, url.getFile());
request.headers().set(HttpHeaders.Names.CONTENT_TYPE, contentType);
request.headers().set(HttpHeaderNames.HOST, url.getHost());

if (body != null) {
request.content().writeBytes(body.toString().getBytes(CharsetUtil.UTF_8.name()));
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
} else {
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
}

// Send the HTTP request.
client.writeAndFlush(request);

return result;
}

public CompletableFuture<FullHttpResponse> put(URL url, final CharSequence body)
throws UnsupportedEncodingException {
return post(url, body, "application/json");
}

public CompletableFuture<FullHttpResponse> delete(URL url) {
return buildAndSendRequest(HttpMethod.DELETE, url);
}
Expand Down

0 comments on commit 13d2017

Please sign in to comment.