diff --git a/docs/docs/service-discovery/consul.md b/docs/docs/service-discovery/consul.md index 68ee94d0..3c289fbc 100644 --- a/docs/docs/service-discovery/consul.md +++ b/docs/docs/service-discovery/consul.md @@ -5,6 +5,8 @@ It's often used as service discovery backend to register and locate the services Consul makes it simple for services to register themselves and to discover other services via a DNS or HTTP interface. External services can be registered as well. +As [specified](https://developer.hashicorp.com/consul/api-docs/agent/service#address) in the Consul documentation, if the host address is not provided, Stork will automatically use the Consul node address for the instance. + This page explains how Stork can use Consul to handle the service discovery and service registration. ## Dependency @@ -35,4 +37,5 @@ For each service that should get the service instances from Consul, configure th Consul service discovery is configured with the following parameters: ---8<-- "target/attributes/META-INF/stork-docs/consul-sd-attributes.txt" \ No newline at end of file +--8<-- "target/attributes/META-INF/stork-docs/consul-sd-attributes.txt" + diff --git a/pom.xml b/pom.xml index 2ebfa83f..1940d3d0 100644 --- a/pom.xml +++ b/pom.xml @@ -32,8 +32,7 @@ http://smallrye.io - 17 - 17 + 17 3.26.3 3.1 diff --git a/service-discovery/consul/src/main/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscovery.java b/service-discovery/consul/src/main/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscovery.java index 03e4a8be..b7761c43 100644 --- a/service-discovery/consul/src/main/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscovery.java +++ b/service-discovery/consul/src/main/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscovery.java @@ -74,8 +74,9 @@ private List toStorkServiceInstances(ServiceEntryList serviceEn Metadata consulMetadata = createConsulMetadata(serviceEntry); String address = service.getAddress(); int port = serviceEntry.getService().getPort(); - if (address == null) { - throw new IllegalArgumentException("Got null address for service " + serviceName); + if (address == null || address.isEmpty() || address.isBlank()) { + //If address not provided, the agent address should be used. See https://developer.hashicorp.com/consul/api-docs/agent/service#address + address = serviceEntry.getNode().getAddress(); } ServiceInstance matching = ServiceInstanceUtils.findMatching(previousInstances, address, port); if (matching != null) { diff --git a/service-discovery/consul/src/test/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscoveryTest.java b/service-discovery/consul/src/test/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscoveryTest.java index 1947fd14..6107959d 100644 --- a/service-discovery/consul/src/test/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscoveryTest.java +++ b/service-discovery/consul/src/test/java/io/smallrye/stork/servicediscovery/consul/ConsulServiceDiscoveryTest.java @@ -8,6 +8,7 @@ import static org.awaitility.Awaitility.await; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -68,7 +69,7 @@ void shouldNotFetchWhenRefreshPeriodNotReached() throws InterruptedException { null); stork = StorkTestUtils.getNewStorkInstance(); List tags = List.of("primary"); - registerService(serviceName, 8406, tags, "example.com"); + registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com"))); AtomicReference> instances = new AtomicReference<>(); @@ -84,7 +85,7 @@ void shouldNotFetchWhenRefreshPeriodNotReached() throws InterruptedException { deregisterServiceInstances(instances.get()); List sTags = List.of("secondary"); - registerService(serviceName, 8506, sTags, "another.example.com"); + registerService(new ConsulServiceOptions(serviceName, 8506, sTags, List.of("another.example.com"))); // when the consul service discovery is called before the end of refreshing period service.getServiceDiscovery().getServiceInstances() @@ -117,7 +118,7 @@ void shouldRefetchWhenRefreshPeriodReached() throws InterruptedException { //Given a service `my-service` registered in consul List tags = List.of("primary"); Map metadata = Maps.newHashMap("meta", "metadata for my-service"); - registerService(serviceName, 8406, tags, "example.com"); + registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com"))); AtomicReference> instances = new AtomicReference<>(); @@ -138,7 +139,7 @@ void shouldRefetchWhenRefreshPeriodReached() throws InterruptedException { //the service settings change in consul List sTags = List.of("secondary"); - registerService(serviceName, 8506, sTags, "another.example.com"); + registerService(new ConsulServiceOptions(serviceName, 8506, sTags, List.of("another.example.com"))); // let's wait until the new services are populated to Stork (from Consul) await().atMost(Duration.ofSeconds(7)) @@ -173,8 +174,8 @@ void shouldDiscoverServiceWithSpecificName() throws InterruptedException { null); stork = StorkTestUtils.getNewStorkInstance(); //Given a service `my-service` registered in consul - registerService("my-consul-service", 8406, null, "consul.com"); - registerService("another-service", 8606, null, "another.example.com"); + registerService(new ConsulServiceOptions("my-consul-service", 8406, null, List.of("consul.com"))); + registerService(new ConsulServiceOptions("another-service", 8606, null, List.of("another.example.com"))); AtomicReference> instances = new AtomicReference<>(); @@ -203,8 +204,8 @@ void shouldHandleTheSecureAttribute() throws InterruptedException { null); stork = StorkTestUtils.getNewStorkInstance(); //Given a service `my-service` registered in consul - registerService("my-consul-service", 8406, null, "consul.com"); - registerService("another-service", 8606, null, "another.example.com"); + registerService(new ConsulServiceOptions("my-consul-service", 8406, null, List.of("consul.com"))); + registerService(new ConsulServiceOptions("another-service", 8606, null, List.of("another.example.com"))); AtomicReference> instances = new AtomicReference<>(); @@ -233,7 +234,7 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException { stork = StorkTestUtils.getNewStorkInstance(); //Given a service `my-service` registered in consul List tags = List.of("primary"); - registerService(serviceName, 8406, tags, "example.com"); + registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com"))); AtomicReference> instances = new AtomicReference<>(); @@ -256,7 +257,7 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException { deregisterServiceInstances(instances.get()); //the service settings change in consul - registerService(serviceName, 8406, tags, "example.com", "another.example.com"); + registerService(new ConsulServiceOptions(serviceName, 8406, tags, List.of("example.com", "another.example.com"))); // let's wait until the new services are populated to Stork (from Consul) await().atMost(Duration.ofSeconds(10)).until( @@ -285,13 +286,48 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException { .hasValueSatisfying(instance -> assertThat(instance.getId()).isNotEqualTo(serviceId)); } - private void registerService(String serviceName, int port, List tags, - String... addresses) throws InterruptedException { - CountDownLatch latch = new CountDownLatch(addresses.length); - for (String address : addresses) { + @Test + void shouldDiscoverServiceWithoutAddress() throws InterruptedException { + //Given a service `my-consul-service` registered in consul and a refresh-period of 5 seconds + String serviceName = "my-consul-service"; + TestConfigProvider.addServiceConfig("my-consul-service", null, "consul", null, + null, Map.of("consul-host", "localhost", "consul-port", String.valueOf(consulPort), "refresh-period", "5", + "application", "my-consul-service"), + null); + stork = StorkTestUtils.getNewStorkInstance(); + registerService(new ConsulServiceOptions("my-consul-service", 8406, null, new ArrayList<>())); + + AtomicReference> instances = new AtomicReference<>(); + + Service service = stork.getService(serviceName); + // call stork service discovery and gather service instances in the cache + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Consul", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + + // Then the service instance is found + assertThat(instances.get()).hasSize(1); + assertThat(instances.get().get(0).getHost()).isEqualTo("127.0.0.1"); + assertThat(instances.get().get(0).getPort()).isEqualTo(8406); + assertThat(instances.get().get(0).isSecure()).isFalse(); + } + + public record ConsulServiceOptions(String serviceName, int port, List tags, List addresses) { + } + + private void registerService(ConsulServiceOptions consulServiceOptions) throws InterruptedException { + if (consulServiceOptions.addresses().isEmpty()) { + consulServiceOptions.addresses.add(""); + } + CountDownLatch latch = new CountDownLatch(consulServiceOptions.addresses().size()); + for (String address : consulServiceOptions.addresses()) { client.registerService( - new ServiceOptions().setId("" + (consulId++)).setName(serviceName).setTags(tags) - .setAddress(address).setPort(port)) + new ServiceOptions().setId("" + (consulId++)).setName(consulServiceOptions.serviceName()) + .setTags(consulServiceOptions.tags()) + .setAddress(address).setPort(consulServiceOptions.port())) .onComplete(result -> { if (result.failed()) { fail("Failed to register service in Consul", result.cause());