Skip to content

Commit

Permalink
Merge pull request #959 from aureamunoz/empty-address
Browse files Browse the repository at this point in the history
Fix issue about empty consul instance host
  • Loading branch information
cescoffier authored Oct 17, 2024
2 parents a41f949 + 91d1fe2 commit 59540c9
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 21 deletions.
5 changes: 4 additions & 1 deletion docs/docs/service-discovery/consul.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ It's often used as service discovery backend to register and locate the services
Consul makes it simple for services to register themselves and to discover other services via a DNS or HTTP interface.
External services can be registered as well.

As [specified](https://developer.hashicorp.com/consul/api-docs/agent/service#address) in the Consul documentation, if the host address is not provided, Stork will automatically use the Consul node address for the instance.

This page explains how Stork can use Consul to handle the service discovery and service registration.

## Dependency
Expand Down Expand Up @@ -35,4 +37,5 @@ For each service that should get the service instances from Consul, configure th

Consul service discovery is configured with the following parameters:

--8<-- "target/attributes/META-INF/stork-docs/consul-sd-attributes.txt"
--8<-- "target/attributes/META-INF/stork-docs/consul-sd-attributes.txt"

3 changes: 1 addition & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
<url>http://smallrye.io</url>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.release>17</maven.compiler.release>

<version.assertj>3.26.3</version.assertj>
<version.microprofile-config-api>3.1</version.microprofile-config-api>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ private List<ServiceInstance> toStorkServiceInstances(ServiceEntryList serviceEn
Metadata<ConsulMetadataKey> consulMetadata = createConsulMetadata(serviceEntry);
String address = service.getAddress();
int port = serviceEntry.getService().getPort();
if (address == null) {
throw new IllegalArgumentException("Got null address for service " + serviceName);
if (address == null || address.isEmpty() || address.isBlank()) {
//If address not provided, the agent address should be used. See https://developer.hashicorp.com/consul/api-docs/agent/service#address
address = serviceEntry.getNode().getAddress();
}
ServiceInstance matching = ServiceInstanceUtils.findMatching(previousInstances, address, port);
if (matching != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -68,7 +69,7 @@ void shouldNotFetchWhenRefreshPeriodNotReached() throws InterruptedException {
null);
stork = StorkTestUtils.getNewStorkInstance();
List<String> tags = List.of("primary");
registerService(serviceName, 8406, tags, "example.com");
registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com")));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand All @@ -84,7 +85,7 @@ void shouldNotFetchWhenRefreshPeriodNotReached() throws InterruptedException {
deregisterServiceInstances(instances.get());

List<String> sTags = List.of("secondary");
registerService(serviceName, 8506, sTags, "another.example.com");
registerService(new ConsulServiceOptions(serviceName, 8506, sTags, List.of("another.example.com")));

// when the consul service discovery is called before the end of refreshing period
service.getServiceDiscovery().getServiceInstances()
Expand Down Expand Up @@ -117,7 +118,7 @@ void shouldRefetchWhenRefreshPeriodReached() throws InterruptedException {
//Given a service `my-service` registered in consul
List<String> tags = List.of("primary");
Map<String, String> metadata = Maps.newHashMap("meta", "metadata for my-service");
registerService(serviceName, 8406, tags, "example.com");
registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com")));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand All @@ -138,7 +139,7 @@ void shouldRefetchWhenRefreshPeriodReached() throws InterruptedException {

//the service settings change in consul
List<String> sTags = List.of("secondary");
registerService(serviceName, 8506, sTags, "another.example.com");
registerService(new ConsulServiceOptions(serviceName, 8506, sTags, List.of("another.example.com")));

// let's wait until the new services are populated to Stork (from Consul)
await().atMost(Duration.ofSeconds(7))
Expand Down Expand Up @@ -173,8 +174,8 @@ void shouldDiscoverServiceWithSpecificName() throws InterruptedException {
null);
stork = StorkTestUtils.getNewStorkInstance();
//Given a service `my-service` registered in consul
registerService("my-consul-service", 8406, null, "consul.com");
registerService("another-service", 8606, null, "another.example.com");
registerService(new ConsulServiceOptions("my-consul-service", 8406, null, List.of("consul.com")));
registerService(new ConsulServiceOptions("another-service", 8606, null, List.of("another.example.com")));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand Down Expand Up @@ -203,8 +204,8 @@ void shouldHandleTheSecureAttribute() throws InterruptedException {
null);
stork = StorkTestUtils.getNewStorkInstance();
//Given a service `my-service` registered in consul
registerService("my-consul-service", 8406, null, "consul.com");
registerService("another-service", 8606, null, "another.example.com");
registerService(new ConsulServiceOptions("my-consul-service", 8406, null, List.of("consul.com")));
registerService(new ConsulServiceOptions("another-service", 8606, null, List.of("another.example.com")));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand Down Expand Up @@ -233,7 +234,7 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException {
stork = StorkTestUtils.getNewStorkInstance();
//Given a service `my-service` registered in consul
List<String> tags = List.of("primary");
registerService(serviceName, 8406, tags, "example.com");
registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com")));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand All @@ -256,7 +257,7 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException {
deregisterServiceInstances(instances.get());

//the service settings change in consul
registerService(serviceName, 8406, tags, "example.com", "another.example.com");
registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com", "another.example.com")));

// let's wait until the new services are populated to Stork (from Consul)
await().atMost(Duration.ofSeconds(10)).until(
Expand Down Expand Up @@ -285,13 +286,48 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException {
.hasValueSatisfying(instance -> assertThat(instance.getId()).isNotEqualTo(serviceId));
}

private void registerService(String serviceName, int port, List<String> tags,
String... addresses) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(addresses.length);
for (String address : addresses) {
@Test
void shouldDiscoverServiceWithoutAddress() throws InterruptedException {
//Given a service `my-consul-service` registered in consul and a refresh-period of 5 seconds
String serviceName = "my-consul-service";
TestConfigProvider.addServiceConfig("my-consul-service", null, "consul", null,
null, Map.of("consul-host", "localhost", "consul-port", String.valueOf(consulPort), "refresh-period", "5",
"application", "my-consul-service"),
null);
stork = StorkTestUtils.getNewStorkInstance();
registerService(new ConsulServiceOptions("my-consul-service", 8406, null, new ArrayList<>()));

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Service service = stork.getService(serviceName);
// call stork service discovery and gather service instances in the cache
service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from Consul", th))
.subscribe().with(instances::set);

await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get() != null);

// Then the service instance is found
assertThat(instances.get()).hasSize(1);
assertThat(instances.get().get(0).getHost()).isEqualTo("127.0.0.1");
assertThat(instances.get().get(0).getPort()).isEqualTo(8406);
assertThat(instances.get().get(0).isSecure()).isFalse();
}

public record ConsulServiceOptions(String serviceName, int port, List<String> tags, List<String> addresses) {
}

private void registerService(ConsulServiceOptions consulServiceOptions) throws InterruptedException {
if (consulServiceOptions.addresses().isEmpty()) {
consulServiceOptions.addresses.add("");
}
CountDownLatch latch = new CountDownLatch(consulServiceOptions.addresses().size());
for (String address : consulServiceOptions.addresses()) {
client.registerService(
new ServiceOptions().setId("" + (consulId++)).setName(serviceName).setTags(tags)
.setAddress(address).setPort(port))
new ServiceOptions().setId("" + (consulId++)).setName(consulServiceOptions.serviceName())
.setTags(consulServiceOptions.tags())
.setAddress(address).setPort(consulServiceOptions.port()))
.onComplete(result -> {
if (result.failed()) {
fail("Failed to register service in Consul", result.cause());
Expand Down

0 comments on commit 59540c9

Please sign in to comment.