diff --git a/pom.xml b/pom.xml index ff04ec9..db2f496 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,11 @@ gson 2.8.2 + + com.aliyun + aliyun-java-auth + 0.2.16-beta + diff --git a/src/main/java/com/aliyun/mq/http/MQClient.java b/src/main/java/com/aliyun/mq/http/MQClient.java index e42a0d6..72d2ae8 100755 --- a/src/main/java/com/aliyun/mq/http/MQClient.java +++ b/src/main/java/com/aliyun/mq/http/MQClient.java @@ -1,5 +1,8 @@ package com.aliyun.mq.http; +import com.aliyun.auth.credentials.Credential; +import com.aliyun.auth.credentials.provider.ICredentialProvider; +import com.aliyun.auth.credentials.provider.StaticCredentialProvider; import com.aliyun.mq.http.common.ClientException; import com.aliyun.mq.http.common.http.ClientConfiguration; import com.aliyun.mq.http.common.utils.Utils; @@ -25,7 +28,7 @@ public class MQClient { /** * user info */ - private ServiceCredentials credentials; + private ICredentialProvider credentialProvider; private ClientConfiguration config; private static volatile MQProducer PRODUCER; @@ -37,16 +40,31 @@ private static MQConsumer buildConsumer(String instanceId, String topicName, Str return new MQConsumer(instanceId, topicName, consumer, messageTag, client, credentials, endpoint); } + private static MQConsumer buildConsumer(String instanceId, String topicName, String consumer, String messageTag, ServiceClient client, + ICredentialProvider credentialProvider, URI endpoint) { + return new MQConsumer(instanceId, topicName, consumer, messageTag, client, credentialProvider, endpoint); + } + private static MQProducer buildProducer(String instanceId, String topicName, ServiceClient client, ServiceCredentials credentials, URI endpoint) { return new MQProducer(instanceId, topicName, client, credentials, endpoint); } + private static MQProducer buildProducer(String instanceId, String topicName, ServiceClient client, + ICredentialProvider credentialProvider, URI endpoint) { + return new MQProducer(instanceId, topicName, client, credentialProvider, endpoint); + } + private static MQTransProducer buildTransactionalProducer(String instanceId, String topicName, String groupId, ServiceClient client, ServiceCredentials credentials, URI endpoint) { return new MQTransProducer(instanceId, topicName, groupId, client, credentials, endpoint); } + private static MQTransProducer buildTransactionalProducer(String instanceId, String topicName, String groupId, ServiceClient client, + ICredentialProvider credentialProvider, URI endpoint) { + return new MQTransProducer(instanceId, topicName, groupId, client, credentialProvider, endpoint); + } + /** * init a MQ client with default client config * @@ -92,7 +110,22 @@ public MQClient(String accountEndpoint, String accessId, String accessKey, Strin * @param config defined client config */ public MQClient(String accountEndpoint, String accessId, String accessKey, String securityToken, ClientConfiguration config) { - this.credentials = new ServiceCredentials(accessId, accessKey, securityToken); + this(accountEndpoint, StaticCredentialProvider.create(Credential.builder() + .accessKeyId(accessId) + .accessKeySecret(accessKey) + .securityToken(securityToken) + .build()), config); + } + + /** + * init a MQ client with defined client config and customized credential provider + * + * @param accountEndpoint mq http endpoint, like: http://xxx.mqreset.cn-hangzhou.aliyuncs.com + * @param credentialProvider credentials provisioning + * @param config defined client config + */ + public MQClient(String accountEndpoint, ICredentialProvider credentialProvider, ClientConfiguration config) { + this.credentialProvider = credentialProvider; this.endpoint = Utils.getHttpURI(accountEndpoint); if (config == null) { this.config = new ClientConfiguration(); @@ -131,7 +164,7 @@ public MQProducer getProducer(String topicName) { if (null == PRODUCER) { synchronized (MQClient.class) { if (null == PRODUCER) { - PRODUCER = buildProducer(null, topicName, this.serviceClient, this.credentials, this.endpoint); + PRODUCER = buildProducer(null, topicName, this.serviceClient, this.credentialProvider, this.endpoint); } } } @@ -149,7 +182,7 @@ public MQProducer getProducer(String instanceId, String topicName) { if (null == PRODUCER) { synchronized (MQClient.class) { if (null == PRODUCER) { - PRODUCER = buildProducer(instanceId, topicName, this.serviceClient, this.credentials, this.endpoint); + PRODUCER = buildProducer(instanceId, topicName, this.serviceClient, this.credentialProvider, this.endpoint); } } } @@ -167,7 +200,7 @@ public MQTransProducer getTransProducer(String topicName, String groupId) { if (null == TRANSACTIONAL_PRODUCER) { synchronized (MQClient.class) { if (null == TRANSACTIONAL_PRODUCER) { - TRANSACTIONAL_PRODUCER = buildTransactionalProducer(null, topicName, groupId, this.serviceClient, this.credentials, this.endpoint); + TRANSACTIONAL_PRODUCER = buildTransactionalProducer(null, topicName, groupId, this.serviceClient, this.credentialProvider, this.endpoint); } } } @@ -186,7 +219,7 @@ public MQTransProducer getTransProducer(String instanceId, String topicName, Str if (null == TRANSACTIONAL_PRODUCER) { synchronized (MQClient.class) { if (null == TRANSACTIONAL_PRODUCER) { - TRANSACTIONAL_PRODUCER = buildTransactionalProducer(instanceId, topicName, groupId, this.serviceClient, this.credentials, this.endpoint); + TRANSACTIONAL_PRODUCER = buildTransactionalProducer(instanceId, topicName, groupId, this.serviceClient, this.credentialProvider, this.endpoint); } } } @@ -205,7 +238,7 @@ public MQConsumer getConsumer(String topicName, String consumer, String messageT if (null == CONSUMER) { synchronized (MQClient.class) { if (null == CONSUMER) { - CONSUMER = buildConsumer(null, topicName, consumer, messageTag, this.serviceClient, this.credentials, this.endpoint); + CONSUMER = buildConsumer(null, topicName, consumer, messageTag, this.serviceClient, this.credentialProvider, this.endpoint); } } } @@ -223,7 +256,7 @@ public MQConsumer getConsumer(String topicName, String consumer) { if (null == CONSUMER) { synchronized (MQClient.class) { if (null == CONSUMER) { - CONSUMER = buildConsumer(null, topicName, consumer, null, this.serviceClient, this.credentials, this.endpoint); + CONSUMER = buildConsumer(null, topicName, consumer, null, this.serviceClient, this.credentialProvider, this.endpoint); } } } @@ -243,7 +276,7 @@ public MQConsumer getConsumer(String instanceId, String topicName, String consum if (null == CONSUMER) { synchronized (MQClient.class) { if (null == CONSUMER) { - CONSUMER = buildConsumer(instanceId, topicName, consumer, messageTag, this.serviceClient, this.credentials, this.endpoint); + CONSUMER = buildConsumer(instanceId, topicName, consumer, messageTag, this.serviceClient, this.credentialProvider, this.endpoint); } } } @@ -254,7 +287,7 @@ public MQConsumer getConsumer(String instanceId, String topicName, String consum public String toString() { final StringBuilder sb = new StringBuilder("MQClient{"); sb.append("endpoint=").append(endpoint); - sb.append(", credentials=").append(credentials); + sb.append(", credentialProvider=").append(credentialProvider); sb.append('}'); return sb.toString(); } diff --git a/src/main/java/com/aliyun/mq/http/MQConsumer.java b/src/main/java/com/aliyun/mq/http/MQConsumer.java index a59692d..49d1bff 100644 --- a/src/main/java/com/aliyun/mq/http/MQConsumer.java +++ b/src/main/java/com/aliyun/mq/http/MQConsumer.java @@ -1,5 +1,9 @@ package com.aliyun.mq.http; +import com.aliyun.auth.credentials.Credential; +import com.aliyun.auth.credentials.provider.ICredentialProvider; +import com.aliyun.auth.credentials.provider.StaticCredentialProvider; +import com.aliyun.mq.http.common.utils.ServiceCredentialsWrapper; import com.aliyun.mq.http.model.AsyncCallback; import com.aliyun.mq.http.model.AsyncResult; import com.aliyun.mq.http.common.ClientException; @@ -36,7 +40,7 @@ public class MQConsumer { /** * object content user auth info */ - private final ServiceCredentials credentials; + private final ICredentialProvider credentialProvider; /** * user mq http endpoint, ie: http://uid.mqrest.region.aliyuncs.com/ */ @@ -57,9 +61,27 @@ public class MQConsumer { */ protected MQConsumer(String instanceId, String topicName, String consumer, String messageTag, ServiceClient client, ServiceCredentials credentials, URI endpoint) { + this(instanceId, topicName, consumer, messageTag, client, StaticCredentialProvider.create(Credential.builder() + .accessKeyId(credentials.getAccessKeyId()) + .accessKeySecret(credentials.getAccessKeySecret()) + .securityToken(credentials.getSecurityToken()) + .build()), endpoint); + } + + /** + * @param instanceId, instance id + * @param topicName, topic name + * @param consumer mq cid + * @param messageTag message tag for filter + * @param client, ServiceClient object + * @param credentialProvider, ICredentialProvider object + * @param endpoint, user mq http endpoint, ie: http://uid.mqrest.region.aliyuncs.com/ + */ + protected MQConsumer(String instanceId, String topicName, String consumer, String messageTag, ServiceClient client, + ICredentialProvider credentialProvider, URI endpoint) { this.instanceId = instanceId; this.serviceClient = client; - this.credentials = credentials; + this.credentialProvider = credentialProvider; this.endpoint = endpoint; String uri = endpoint.toString(); @@ -139,6 +161,7 @@ public List consumeMessage(int num, int pollingSecond) request.setInstanceId(this.instanceId); try { + ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials()); ConsumeMessageAction action = new ConsumeMessageAction(serviceClient, credentials, endpoint); request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES); return action.executeWithCustomHeaders(request, null); @@ -179,6 +202,7 @@ public List consumeMessageOrderly(int num, int pollingSecond) request.setTrans("order"); try { + ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials()); ConsumeMessageAction action = new ConsumeMessageAction(serviceClient, credentials, endpoint); request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES); return action.executeWithCustomHeaders(request, null); @@ -211,6 +235,7 @@ public AsyncResult> asyncConsumeMessage(int num, int pollingSecond request.setWaitSeconds(pollingSecond); request.setInstanceId(this.instanceId); + ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials()); ConsumeMessageAction action = new ConsumeMessageAction(serviceClient, credentials, endpoint); request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES); return action.executeWithCustomHeaders(request, callback, null); @@ -225,6 +250,7 @@ public AsyncResult> asyncConsumeMessage(int num, int pollingSecond * @throws ClientException Exception from client */ public void ackMessage(List receiptHandles) throws ServiceException, ClientException { + ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials()); AckMessageAction action = new AckMessageAction(serviceClient, credentials, endpoint); AckMessageRequest request = new AckMessageRequest(); @@ -248,6 +274,7 @@ public void ackMessage(List receiptHandles) throws ServiceException, Cli */ public AsyncResult asyncAckMessage(List receiptHandles, AsyncCallback callback) throws ServiceException, ClientException { + ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials()); AckMessageAction action = new AckMessageAction(serviceClient, credentials, endpoint); AckMessageRequest request = new AckMessageRequest(); diff --git a/src/main/java/com/aliyun/mq/http/MQProducer.java b/src/main/java/com/aliyun/mq/http/MQProducer.java index 8b884c0..4d420ba 100644 --- a/src/main/java/com/aliyun/mq/http/MQProducer.java +++ b/src/main/java/com/aliyun/mq/http/MQProducer.java @@ -1,5 +1,9 @@ package com.aliyun.mq.http; +import com.aliyun.auth.credentials.Credential; +import com.aliyun.auth.credentials.provider.ICredentialProvider; +import com.aliyun.auth.credentials.provider.StaticCredentialProvider; +import com.aliyun.mq.http.common.utils.ServiceCredentialsWrapper; import com.aliyun.mq.http.model.AsyncCallback; import com.aliyun.mq.http.model.AsyncResult; import com.aliyun.mq.http.model.action.PublishMessageAction; @@ -23,7 +27,7 @@ public class MQProducer { /** * object content user auth info */ - protected ServiceCredentials credentials; + protected ICredentialProvider credentialProvider; /** * user mq http endpoint, ie: http://uid.mqrest.region.aliyuncs.com/ */ @@ -42,9 +46,26 @@ public class MQProducer { */ protected MQProducer(String instanceId, String topicName, ServiceClient client, ServiceCredentials credentials, URI endpoint) { + this(instanceId, topicName, client, + StaticCredentialProvider.create(Credential.builder() + .accessKeyId(credentials.getAccessKeyId()) + .accessKeySecret(credentials.getAccessKeySecret()) + .securityToken(credentials.getSecurityToken()) + .build()), endpoint); + } + + /** + * @param instanceId, instance id + * @param topicName, topic name + * @param client, ServiceClient object + * @param credentialProvider, ICredentialProvider object + * @param endpoint, user mq http endpoint, ie: http://uid.mqrest.region.aliyuncs.com/ + */ + protected MQProducer(String instanceId, String topicName, ServiceClient client, + ICredentialProvider credentialProvider, URI endpoint) { this.instanceId = instanceId; this.serviceClient = client; - this.credentials = credentials; + this.credentialProvider = credentialProvider; this.endpoint = endpoint; String uri = endpoint.toString(); @@ -105,6 +126,7 @@ public TopicMessage publishMessage(TopicMessage msg) throws ServiceException, Cl PublishMessageRequest request = new PublishMessageRequest(); request.setMessage(msg); request.setInstanceId(instanceId); + ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials()); PublishMessageAction action = new PublishMessageAction(serviceClient, credentials, endpoint); request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES); return action.executeWithCustomHeaders(request, null); @@ -128,6 +150,7 @@ public AsyncResult asyncPublishMessage(TopicMessage msg, AsyncCall PublishMessageRequest request = new PublishMessageRequest(); request.setMessage(msg); request.setInstanceId(instanceId); + ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials()); PublishMessageAction action = new PublishMessageAction(serviceClient, credentials, endpoint); request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES); return action.executeWithCustomHeaders(request, callback, null); diff --git a/src/main/java/com/aliyun/mq/http/MQTransProducer.java b/src/main/java/com/aliyun/mq/http/MQTransProducer.java index 28ae54b..5c593b9 100644 --- a/src/main/java/com/aliyun/mq/http/MQTransProducer.java +++ b/src/main/java/com/aliyun/mq/http/MQTransProducer.java @@ -1,10 +1,12 @@ package com.aliyun.mq.http; +import com.aliyun.auth.credentials.provider.ICredentialProvider; import com.aliyun.mq.http.common.ClientException; import com.aliyun.mq.http.common.Constants; import com.aliyun.mq.http.common.ServiceException; import com.aliyun.mq.http.common.auth.ServiceCredentials; import com.aliyun.mq.http.common.http.ServiceClient; +import com.aliyun.mq.http.common.utils.ServiceCredentialsWrapper; import com.aliyun.mq.http.model.AsyncCallback; import com.aliyun.mq.http.model.AsyncResult; import com.aliyun.mq.http.model.Message; @@ -36,6 +38,19 @@ protected MQTransProducer(String instanceId, String topicName, String groupId, S this.groupId = groupId; } + /** + * @param instanceId, instance id + * @param topicName, topic name + * @param client, ServiceClient object + * @param credentialProvider, ICredentialProvider object + * @param endpoint, user mq http endpoint, ie: http://uid.mqrest.region.aliyuncs.com/ + */ + protected MQTransProducer(String instanceId, String topicName, String groupId, ServiceClient client, + ICredentialProvider credentialProvider, URI endpoint) { + super(instanceId, topicName, client, credentialProvider, endpoint); + this.groupId = groupId; + } + /** * consume half message to check transaction status, three choice: {@link #commit(String)} , {@link #rollback(String)} * or do nothing (after 10s will get the message again). @@ -58,6 +73,7 @@ public List consumeHalfMessage(int num, int pollingSecond) throws Servi request.setTrans(Constants.PARAM_TRANSACTION_V_POP); try { + ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials()); ConsumeMessageAction action = new ConsumeMessageAction(serviceClient, credentials, endpoint); request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES); return action.executeWithCustomHeaders(request, null); @@ -90,6 +106,7 @@ public AsyncResult> asyncConsumeHalfMessage(int num, int pollingSe request.setConsumer(groupId); request.setTrans(Constants.PARAM_TRANSACTION_V_POP); + ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials()); ConsumeMessageAction action = new ConsumeMessageAction(serviceClient, credentials, endpoint); request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES); return action.executeWithCustomHeaders(request, callback, null); @@ -103,6 +120,7 @@ public AsyncResult> asyncConsumeHalfMessage(int num, int pollingSe * @throws ClientException Exception from client */ public void commit(String handle) throws ServiceException, ClientException { + ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials()); AckMessageAction action = new AckMessageAction(serviceClient, credentials, endpoint); AckMessageRequest request = new AckMessageRequest(); request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES); @@ -122,6 +140,7 @@ public void commit(String handle) throws ServiceException, ClientException { * @throws ClientException Exception from client */ public void rollback(String handle) throws ServiceException, ClientException { + ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials()); AckMessageAction action = new AckMessageAction(serviceClient, credentials, endpoint); AckMessageRequest request = new AckMessageRequest(); request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES); diff --git a/src/main/java/com/aliyun/mq/http/common/utils/ServiceCredentialsWrapper.java b/src/main/java/com/aliyun/mq/http/common/utils/ServiceCredentialsWrapper.java new file mode 100644 index 0000000..f51a7c3 --- /dev/null +++ b/src/main/java/com/aliyun/mq/http/common/utils/ServiceCredentialsWrapper.java @@ -0,0 +1,10 @@ +package com.aliyun.mq.http.common.utils; + +import com.aliyun.auth.credentials.ICredential; +import com.aliyun.mq.http.common.auth.ServiceCredentials; + +public class ServiceCredentialsWrapper { + public static ServiceCredentials WrapServiceCredentials(ICredential credential) { + return new ServiceCredentials(credential.accessKeyId(), credential.accessKeySecret(), credential.securityToken()); + } +}