-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8a40ba5
commit 502e282
Showing
3 changed files
with
223 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
package e2e; | ||
|
||
import static io.grpc.Status.*; | ||
import static org.assertj.core.api.Assertions.*; | ||
|
||
import io.grpc.StatusRuntimeException; | ||
import io.littlehorse.sdk.common.LHLibUtil; | ||
import io.littlehorse.sdk.common.config.LHConfig; | ||
import io.littlehorse.sdk.common.proto.DeleteTaskDefRequest; | ||
import io.littlehorse.sdk.common.proto.DeleteWfSpecRequest; | ||
import io.littlehorse.sdk.common.proto.GetLatestWfSpecRequest; | ||
import io.littlehorse.sdk.common.proto.LHHostInfo; | ||
import io.littlehorse.sdk.common.proto.LittleHorseGrpc; | ||
import io.littlehorse.sdk.common.proto.PutTaskDefRequest; | ||
import io.littlehorse.sdk.common.proto.PutWfSpecRequest; | ||
import io.littlehorse.sdk.common.proto.RegisterTaskWorkerRequest; | ||
import io.littlehorse.sdk.common.proto.RegisterTaskWorkerResponse; | ||
import io.littlehorse.sdk.common.proto.TaskDef; | ||
import io.littlehorse.sdk.common.proto.TaskDefId; | ||
import io.littlehorse.sdk.common.proto.WfSpec; | ||
import io.littlehorse.sdk.common.proto.WfSpecId; | ||
import io.littlehorse.sdk.wfsdk.Workflow; | ||
import io.littlehorse.sdk.wfsdk.internal.WorkflowImpl; | ||
import io.littlehorse.test.LHTest; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.Test; | ||
|
||
@LHTest | ||
public class LHMetadataLifecycle { | ||
|
||
private LittleHorseGrpc.LittleHorseBlockingStub client; | ||
private Set<String> allHosts = new HashSet<>(); | ||
private final String taskA = "task-a"; | ||
private final String testWfSpec = "test-wf-spec"; | ||
private LHConfig config; | ||
|
||
@AfterEach | ||
public void cleanup() { | ||
client.deleteTaskDef(DeleteTaskDefRequest.newBuilder() | ||
.setId(TaskDefId.newBuilder().setName(taskA)) | ||
.build()); | ||
try { | ||
WfSpec latestWfSpec = client.getLatestWfSpec( | ||
GetLatestWfSpecRequest.newBuilder().setName(testWfSpec).build()); | ||
client.deleteWfSpec(DeleteWfSpecRequest.newBuilder() | ||
.setId(WfSpecId.newBuilder() | ||
.setName(latestWfSpec.getId().getName()) | ||
.setMajorVersion(0) | ||
.setRevision(0)) | ||
.build()); | ||
} catch (Exception ex) { | ||
// ignore | ||
} | ||
} | ||
|
||
@Test | ||
public void shouldRemoveDeadTaskWorkers() throws Exception { | ||
client.putTaskDef(PutTaskDefRequest.newBuilder().setName(taskA).build()); | ||
|
||
// Taskdef needs to propagate to all servers | ||
Thread.sleep(50); | ||
|
||
String client1 = "client-1"; | ||
String client2 = "client-2"; | ||
String client3 = "client-3"; | ||
String client4 = "client-4"; | ||
String client5 = "client-5"; | ||
|
||
RegisterTaskWorkerResponse reply1 = client.registerTaskWorker(register(client1)); | ||
for (LHHostInfo host : reply1.getYourHostsList()) { | ||
allHosts.add(hostToString(host)); | ||
} | ||
|
||
// It distributes available worker (one server each worker) | ||
client.registerTaskWorker(register(client2)); | ||
client.registerTaskWorker(register(client3)); | ||
client.registerTaskWorker(register(client4)); | ||
|
||
// Wait until all workers are dead | ||
Thread.sleep(15000); | ||
|
||
RegisterTaskWorkerResponse reply5 = client.registerTaskWorker(register(client5)); | ||
int newCount = reply5.getYourHostsCount(); | ||
|
||
// It should assign all the workers available for this only task worker | ||
if (newCount != allHosts.size()) { | ||
throw new RuntimeException("dead workers aren't being deleted!"); | ||
} | ||
} | ||
|
||
@Test | ||
public void shouldNotRegisterAWfSpecThatHasAMissingTaskDef() { | ||
Workflow wf = new WorkflowImpl(testWfSpec, thread -> { | ||
thread.execute(taskA); | ||
}); | ||
PutWfSpecRequest request = wf.compileWorkflow(); | ||
assertThatThrownBy(() -> client.putWfSpec(request)) | ||
.isInstanceOf(StatusRuntimeException.class) | ||
.matches(throwable -> ((StatusRuntimeException) throwable) | ||
.getStatus() | ||
.getCode() | ||
.equals(Code.INVALID_ARGUMENT)); | ||
assertThatThrownBy(() -> client.getLatestWfSpec( | ||
GetLatestWfSpecRequest.newBuilder().setName(testWfSpec).build())) | ||
.isInstanceOf(StatusRuntimeException.class) | ||
.matches(throwable -> ((StatusRuntimeException) throwable) | ||
.getStatus() | ||
.getCode() | ||
.equals(Code.NOT_FOUND)); | ||
TaskDef createdTaskDef = | ||
client.putTaskDef(PutTaskDefRequest.newBuilder().setName(taskA).build()); | ||
assertThat(createdTaskDef).isNotNull(); | ||
WfSpec result = client.putWfSpec(request); | ||
assertThat(result.getId().getMajorVersion()) | ||
.withFailMessage("Somehow the version wasn't zero!") | ||
.isZero(); | ||
} | ||
|
||
private String hostToString(LHHostInfo host) { | ||
return host.getHost() + ":" + host.getPort(); | ||
} | ||
|
||
private RegisterTaskWorkerRequest register(String clientId) { | ||
return RegisterTaskWorkerRequest.newBuilder() | ||
.setTaskWorkerId(clientId) | ||
.setTaskDefId(LHLibUtil.taskDefId(taskA)) | ||
.setListenerName(config.getConnectListener()) | ||
.build(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package e2e; | ||
|
||
import static org.assertj.core.api.Assertions.*; | ||
|
||
import io.littlehorse.sdk.common.LHLibUtil; | ||
import io.littlehorse.sdk.common.config.LHConfig; | ||
import io.littlehorse.sdk.common.proto.LHHostInfo; | ||
import io.littlehorse.sdk.common.proto.LittleHorseGrpc.LittleHorseBlockingStub; | ||
import io.littlehorse.sdk.common.proto.PutTaskDefRequest; | ||
import io.littlehorse.sdk.common.proto.RegisterTaskWorkerRequest; | ||
import io.littlehorse.sdk.common.proto.RegisterTaskWorkerResponse; | ||
import io.littlehorse.test.LHTest; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import org.junit.jupiter.api.Test; | ||
|
||
@LHTest | ||
public class TaskWorkerTest { | ||
|
||
private final String taskDefName = "test-task"; | ||
private LHConfig config; | ||
private LittleHorseBlockingStub client; | ||
|
||
@Test | ||
public void shouldBalanceWorkerGroupConnections() throws Exception { | ||
final Set<String> allHosts = new HashSet<>(); | ||
// Create taskdef | ||
client.putTaskDef(PutTaskDefRequest.newBuilder().setName(taskDefName).build()); | ||
|
||
// Taskdef needs to propagate to all servers | ||
Thread.sleep(50); | ||
|
||
String client1 = "client-1"; | ||
String client2 = "client-2"; | ||
String client3 = "client-3"; | ||
String client4 = "client-4"; | ||
|
||
// This is the first worker to connect, so it should get ALL of the hosts | ||
RegisterTaskWorkerResponse reply1 = client.registerTaskWorker(register(client1)); | ||
for (LHHostInfo host : reply1.getYourHostsList()) { | ||
allHosts.add(hostToString(host)); | ||
} | ||
|
||
// Since we require that each server has at least two connections on it, | ||
// we should check that when we add the worker #2, then it still gets all | ||
// the hosts. | ||
final AtomicInteger numberOfHosts = new AtomicInteger(); | ||
assertThat(client.registerTaskWorker(register(client1))) | ||
.extracting(RegisterTaskWorkerResponse::getYourHostsList) | ||
.extracting(List::size) | ||
.matches(numberOfConnections -> numberOfConnections > 0) | ||
.satisfies(numberOfHosts::set); | ||
assertThat(client.registerTaskWorker(register(client2))) | ||
.withFailMessage("Second worker should still get all hosts!") | ||
.extracting(RegisterTaskWorkerResponse::getYourHostsList) | ||
.extracting(List::size) | ||
.matches(numberOfConnections -> numberOfConnections.equals(numberOfHosts.get())); | ||
|
||
assertThat(client.registerTaskWorker(register(client1))) | ||
.withFailMessage("First worker should still get all hosts when only one other") | ||
.extracting(RegisterTaskWorkerResponse::getYourHostsList) | ||
.extracting(List::size) | ||
.matches(numberOfConnections -> numberOfConnections.equals(numberOfHosts.get())); | ||
|
||
// When we add a third and fourth worker, if there are more than one server, | ||
// then they shouldn't all get all the hosts | ||
assertThat(client.registerTaskWorker(register(client3))).isNotNull(); | ||
assertThat(client.registerTaskWorker(register(client4))).isNotNull(); | ||
|
||
assertThat(client.registerTaskWorker(register(client1))) | ||
.withFailMessage("work isn't being balanced!") | ||
.extracting(RegisterTaskWorkerResponse::getYourHostsCount) | ||
.matches(newCount -> !(newCount > 1 && newCount.equals(numberOfHosts.get()))); | ||
} | ||
|
||
private RegisterTaskWorkerRequest register(String clientId) { | ||
return RegisterTaskWorkerRequest.newBuilder() | ||
.setTaskWorkerId(clientId) | ||
.setTaskDefId(LHLibUtil.taskDefId(taskDefName)) | ||
.setListenerName(config.getConnectListener()) | ||
.build(); | ||
} | ||
|
||
private String hostToString(LHHostInfo host) { | ||
return host.getHost() + ":" + host.getPort(); | ||
} | ||
} |