Skip to content

Commit

Permalink
automatic recognition of address scheme in topic route by host.
Browse files Browse the repository at this point in the history
  • Loading branch information
dingshuangxi888 committed Nov 20, 2024
1 parent bf2f2a7 commit b44dd55
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ public static boolean isValidIp(String ip) {
return VALIDATOR.isValid(ip);
}

public static boolean isValidIPv4(String ip) {
return VALIDATOR.isValidInet4Address(ip);
}

public static boolean isValidIPv6(String ip) {
return VALIDATOR.isValidInet6Address(ip);
}

public static boolean isValidCidr(String cidr) {
return isValidIPv4Cidr(cidr) || isValidIPv6Cidr(cidr);
}
Expand Down
20 changes: 20 additions & 0 deletions proxy/src/main/java/org/apache/rocketmq/proxy/common/Address.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.net.HostAndPort;
import java.util.Objects;
import org.apache.rocketmq.common.utils.IPAddressUtils;

public class Address {

Expand All @@ -31,6 +32,11 @@ public enum AddressScheme {
private AddressScheme addressScheme;
private HostAndPort hostAndPort;

public Address(HostAndPort hostAndPort) {
this.addressScheme = buildScheme(hostAndPort);
this.hostAndPort = hostAndPort;
}

public Address(AddressScheme addressScheme, HostAndPort hostAndPort) {
this.addressScheme = addressScheme;
this.hostAndPort = hostAndPort;
Expand All @@ -52,6 +58,20 @@ public void setHostAndPort(HostAndPort hostAndPort) {
this.hostAndPort = hostAndPort;
}

private AddressScheme buildScheme(HostAndPort hostAndPort) {
if (hostAndPort == null) {
return AddressScheme.UNRECOGNIZED;
}
String address = hostAndPort.getHost();
if (IPAddressUtils.isValidIPv4(address)) {
return AddressScheme.IPv4;
}
if (IPAddressUtils.isValidIPv6(address)) {
return AddressScheme.IPv6;
}
return AddressScheme.DOMAIN_NAME;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCom
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
List<Address> addressList = new ArrayList<>();
// AddressScheme is just a placeholder and will not affect topic route result in this case.
addressList.add(new Address(Address.AddressScheme.IPv4, HostAndPort.fromParts(proxyConfig.getRemotingAccessAddr(), proxyConfig.getRemotingListenPort())));
addressList.add(new Address(HostAndPort.fromParts(proxyConfig.getRemotingAccessAddr(), proxyConfig.getRemotingListenPort())));
ProxyTopicRouteData proxyTopicRouteData = messagingProcessor.getTopicRouteDataForProxy(context, addressList, requestHeader.getTopic());
TopicRouteData topicRouteData = proxyTopicRouteData.buildTopicRouteData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public ProxyTopicRouteData(TopicRouteData topicRouteData) {
brokerData.getBrokerAddrs().forEach((brokerId, brokerAddr) -> {
HostAndPort brokerHostAndPort = HostAndPort.fromString(brokerAddr);

proxyBrokerData.getBrokerAddrs().put(brokerId, Lists.newArrayList(new Address(Address.AddressScheme.IPv4, brokerHostAndPort)));
proxyBrokerData.getBrokerAddrs().put(brokerId, Lists.newArrayList(new Address(brokerHostAndPort)));
});
this.brokerDatas.add(proxyBrokerData);
}
Expand All @@ -61,7 +61,7 @@ public ProxyTopicRouteData(TopicRouteData topicRouteData, int port) {
HostAndPort brokerHostAndPort = HostAndPort.fromString(brokerAddr);
HostAndPort proxyHostAndPort = HostAndPort.fromParts(brokerHostAndPort.getHost(), port);

proxyBrokerData.getBrokerAddrs().put(brokerId, Lists.newArrayList(new Address(Address.AddressScheme.IPv4, proxyHostAndPort)));
proxyBrokerData.getBrokerAddrs().put(brokerId, Lists.newArrayList(new Address(proxyHostAndPort)));
});
this.brokerDatas.add(proxyBrokerData);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.apache.rocketmq.proxy.common;

import com.google.common.net.HostAndPort;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class AddressTest {

@Test
public void testConstructorWithIPv4() {
HostAndPort hostAndPort = HostAndPort.fromString("192.168.1.1:8080");
Address address = new Address(hostAndPort);

assertEquals(Address.AddressScheme.IPv4, address.getAddressScheme());
assertEquals(hostAndPort, address.getHostAndPort());
}

@Test
public void testConstructorWithIPv6() {
HostAndPort hostAndPort = HostAndPort.fromString("[2001:db8::1]:8080");
Address address = new Address(hostAndPort);

assertEquals(Address.AddressScheme.IPv6, address.getAddressScheme());
assertEquals(hostAndPort, address.getHostAndPort());
}

@Test
public void testConstructorWithDomainName() {
HostAndPort hostAndPort = HostAndPort.fromString("example.com:8080");
Address address = new Address(hostAndPort);

assertEquals(Address.AddressScheme.DOMAIN_NAME, address.getAddressScheme());
assertEquals(hostAndPort, address.getHostAndPort());
}

@Test
public void testConstructorWithNullHostAndPort() {
Address address = new Address(null);

assertEquals(Address.AddressScheme.UNRECOGNIZED, address.getAddressScheme());
assertNull(address.getHostAndPort());
}
}

0 comments on commit b44dd55

Please sign in to comment.