Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue about empty consul instance host #959

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/docs/service-discovery/consul.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ It's often used as service discovery backend to register and locate the services
Consul makes it simple for services to register themselves and to discover other services via a DNS or HTTP interface.
External services can be registered as well.

As [specified](https://developer.hashicorp.com/consul/api-docs/agent/service#address) in the Consul documentation, if the host address is not provided, Stork will automatically use the Consul node address for the instance.

This page explains how Stork can use Consul to handle the service discovery and service registration.

## Dependency
Expand Down Expand Up @@ -35,4 +37,5 @@ For each service that should get the service instances from Consul, configure th

Consul service discovery is configured with the following parameters:

--8<-- "target/attributes/META-INF/stork-docs/consul-sd-attributes.txt"
--8<-- "target/attributes/META-INF/stork-docs/consul-sd-attributes.txt"

3 changes: 1 addition & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
<url>http://smallrye.io</url>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.release>17</maven.compiler.release>

<version.assertj>3.26.3</version.assertj>
<version.microprofile-config-api>3.1</version.microprofile-config-api>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ private List<ServiceInstance> toStorkServiceInstances(ServiceEntryList serviceEn
Metadata<ConsulMetadataKey> consulMetadata = createConsulMetadata(serviceEntry);
String address = service.getAddress();
int port = serviceEntry.getService().getPort();
if (address == null) {
throw new IllegalArgumentException("Got null address for service " + serviceName);
if (address == null || address.isEmpty() || address.isBlank()) {
//If address not provided, the agent address should be used. See https://developer.hashicorp.com/consul/api-docs/agent/service#address
address = serviceEntry.getNode().getAddress();
aureamunoz marked this conversation as resolved.
Show resolved Hide resolved
}
ServiceInstance matching = ServiceInstanceUtils.findMatching(previousInstances, address, port);
if (matching != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -68,7 +69,7 @@ void shouldNotFetchWhenRefreshPeriodNotReached() throws InterruptedException {
null);
stork = StorkTestUtils.getNewStorkInstance();
List<String> tags = List.of("primary");
registerService(serviceName, 8406, tags, "example.com");
registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com")));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand All @@ -84,7 +85,7 @@ void shouldNotFetchWhenRefreshPeriodNotReached() throws InterruptedException {
deregisterServiceInstances(instances.get());

List<String> sTags = List.of("secondary");
registerService(serviceName, 8506, sTags, "another.example.com");
registerService(new ConsulServiceOptions(serviceName, 8506, sTags, List.of("another.example.com")));

// when the consul service discovery is called before the end of refreshing period
service.getServiceDiscovery().getServiceInstances()
Expand Down Expand Up @@ -117,7 +118,7 @@ void shouldRefetchWhenRefreshPeriodReached() throws InterruptedException {
//Given a service `my-service` registered in consul
List<String> tags = List.of("primary");
Map<String, String> metadata = Maps.newHashMap("meta", "metadata for my-service");
registerService(serviceName, 8406, tags, "example.com");
registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com")));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand All @@ -138,7 +139,7 @@ void shouldRefetchWhenRefreshPeriodReached() throws InterruptedException {

//the service settings change in consul
List<String> sTags = List.of("secondary");
registerService(serviceName, 8506, sTags, "another.example.com");
registerService(new ConsulServiceOptions(serviceName, 8506, sTags, List.of("another.example.com")));

// let's wait until the new services are populated to Stork (from Consul)
await().atMost(Duration.ofSeconds(7))
Expand Down Expand Up @@ -173,8 +174,8 @@ void shouldDiscoverServiceWithSpecificName() throws InterruptedException {
null);
stork = StorkTestUtils.getNewStorkInstance();
//Given a service `my-service` registered in consul
registerService("my-consul-service", 8406, null, "consul.com");
registerService("another-service", 8606, null, "another.example.com");
registerService(new ConsulServiceOptions("my-consul-service", 8406, null, List.of("consul.com")));
registerService(new ConsulServiceOptions("another-service", 8606, null, List.of("another.example.com")));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand Down Expand Up @@ -203,8 +204,8 @@ void shouldHandleTheSecureAttribute() throws InterruptedException {
null);
stork = StorkTestUtils.getNewStorkInstance();
//Given a service `my-service` registered in consul
registerService("my-consul-service", 8406, null, "consul.com");
registerService("another-service", 8606, null, "another.example.com");
registerService(new ConsulServiceOptions("my-consul-service", 8406, null, List.of("consul.com")));
registerService(new ConsulServiceOptions("another-service", 8606, null, List.of("another.example.com")));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand Down Expand Up @@ -233,7 +234,7 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException {
stork = StorkTestUtils.getNewStorkInstance();
//Given a service `my-service` registered in consul
List<String> tags = List.of("primary");
registerService(serviceName, 8406, tags, "example.com");
registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com")));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand All @@ -256,7 +257,7 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException {
deregisterServiceInstances(instances.get());

//the service settings change in consul
registerService(serviceName, 8406, tags, "example.com", "another.example.com");
registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com", "another.example.com")));

// let's wait until the new services are populated to Stork (from Consul)
await().atMost(Duration.ofSeconds(10)).until(
Expand Down Expand Up @@ -285,13 +286,48 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException {
.hasValueSatisfying(instance -> assertThat(instance.getId()).isNotEqualTo(serviceId));
}

private void registerService(String serviceName, int port, List<String> tags,
String... addresses) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(addresses.length);
for (String address : addresses) {
@Test
void shouldDiscoverServiceWithoutAddress() throws InterruptedException {
//Given a service `my-consul-service` registered in consul and a refresh-period of 5 seconds
String serviceName = "my-consul-service";
TestConfigProvider.addServiceConfig("my-consul-service", null, "consul", null,
null, Map.of("consul-host", "localhost", "consul-port", String.valueOf(consulPort), "refresh-period", "5",
"application", "my-consul-service"),
null);
stork = StorkTestUtils.getNewStorkInstance();
registerService(new ConsulServiceOptions("my-consul-service", 8406, null, new ArrayList<>()));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Service service = stork.getService(serviceName);
// call stork service discovery and gather service instances in the cache
service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from Consul", th))
.subscribe().with(instances::set);

await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get() != null);

// Then the service instance is found
assertThat(instances.get()).hasSize(1);
assertThat(instances.get().get(0).getHost()).isEqualTo("127.0.0.1");
assertThat(instances.get().get(0).getPort()).isEqualTo(8406);
assertThat(instances.get().get(0).isSecure()).isFalse();
}

public record ConsulServiceOptions(String serviceName, int port, List<String> tags, List<String> addresses) {
}

private void registerService(ConsulServiceOptions consulServiceOptions) throws InterruptedException {
if (consulServiceOptions.addresses().isEmpty()) {
consulServiceOptions.addresses.add("");
}
CountDownLatch latch = new CountDownLatch(consulServiceOptions.addresses().size());
for (String address : consulServiceOptions.addresses()) {
client.registerService(
new ServiceOptions().setId("" + (consulId++)).setName(serviceName).setTags(tags)
.setAddress(address).setPort(port))
new ServiceOptions().setId("" + (consulId++)).setName(consulServiceOptions.serviceName())
.setTags(consulServiceOptions.tags())
.setAddress(address).setPort(consulServiceOptions.port()))
.onComplete(result -> {
if (result.failed()) {
fail("Failed to register service in Consul", result.cause());
Expand Down