diff --git a/pom.xml b/pom.xml
index ff04ec9..db2f496 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,11 @@
gson
2.8.2
+
+ com.aliyun
+ aliyun-java-auth
+ 0.2.16-beta
+
diff --git a/src/main/java/com/aliyun/mq/http/MQClient.java b/src/main/java/com/aliyun/mq/http/MQClient.java
index e42a0d6..72d2ae8 100755
--- a/src/main/java/com/aliyun/mq/http/MQClient.java
+++ b/src/main/java/com/aliyun/mq/http/MQClient.java
@@ -1,5 +1,8 @@
package com.aliyun.mq.http;
+import com.aliyun.auth.credentials.Credential;
+import com.aliyun.auth.credentials.provider.ICredentialProvider;
+import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
import com.aliyun.mq.http.common.ClientException;
import com.aliyun.mq.http.common.http.ClientConfiguration;
import com.aliyun.mq.http.common.utils.Utils;
@@ -25,7 +28,7 @@ public class MQClient {
/**
* user info
*/
- private ServiceCredentials credentials;
+ private ICredentialProvider credentialProvider;
private ClientConfiguration config;
private static volatile MQProducer PRODUCER;
@@ -37,16 +40,31 @@ private static MQConsumer buildConsumer(String instanceId, String topicName, Str
return new MQConsumer(instanceId, topicName, consumer, messageTag, client, credentials, endpoint);
}
+ private static MQConsumer buildConsumer(String instanceId, String topicName, String consumer, String messageTag, ServiceClient client,
+ ICredentialProvider credentialProvider, URI endpoint) {
+ return new MQConsumer(instanceId, topicName, consumer, messageTag, client, credentialProvider, endpoint);
+ }
+
private static MQProducer buildProducer(String instanceId, String topicName, ServiceClient client,
ServiceCredentials credentials, URI endpoint) {
return new MQProducer(instanceId, topicName, client, credentials, endpoint);
}
+ private static MQProducer buildProducer(String instanceId, String topicName, ServiceClient client,
+ ICredentialProvider credentialProvider, URI endpoint) {
+ return new MQProducer(instanceId, topicName, client, credentialProvider, endpoint);
+ }
+
private static MQTransProducer buildTransactionalProducer(String instanceId, String topicName, String groupId, ServiceClient client,
ServiceCredentials credentials, URI endpoint) {
return new MQTransProducer(instanceId, topicName, groupId, client, credentials, endpoint);
}
+ private static MQTransProducer buildTransactionalProducer(String instanceId, String topicName, String groupId, ServiceClient client,
+ ICredentialProvider credentialProvider, URI endpoint) {
+ return new MQTransProducer(instanceId, topicName, groupId, client, credentialProvider, endpoint);
+ }
+
/**
* init a MQ client with default client config
*
@@ -92,7 +110,22 @@ public MQClient(String accountEndpoint, String accessId, String accessKey, Strin
* @param config defined client config
*/
public MQClient(String accountEndpoint, String accessId, String accessKey, String securityToken, ClientConfiguration config) {
- this.credentials = new ServiceCredentials(accessId, accessKey, securityToken);
+ this(accountEndpoint, StaticCredentialProvider.create(Credential.builder()
+ .accessKeyId(accessId)
+ .accessKeySecret(accessKey)
+ .securityToken(securityToken)
+ .build()), config);
+ }
+
+ /**
+ * init a MQ client with defined client config and customized credential provider
+ *
+ * @param accountEndpoint mq http endpoint, like: http://xxx.mqreset.cn-hangzhou.aliyuncs.com
+ * @param credentialProvider credentials provisioning
+ * @param config defined client config
+ */
+ public MQClient(String accountEndpoint, ICredentialProvider credentialProvider, ClientConfiguration config) {
+ this.credentialProvider = credentialProvider;
this.endpoint = Utils.getHttpURI(accountEndpoint);
if (config == null) {
this.config = new ClientConfiguration();
@@ -131,7 +164,7 @@ public MQProducer getProducer(String topicName) {
if (null == PRODUCER) {
synchronized (MQClient.class) {
if (null == PRODUCER) {
- PRODUCER = buildProducer(null, topicName, this.serviceClient, this.credentials, this.endpoint);
+ PRODUCER = buildProducer(null, topicName, this.serviceClient, this.credentialProvider, this.endpoint);
}
}
}
@@ -149,7 +182,7 @@ public MQProducer getProducer(String instanceId, String topicName) {
if (null == PRODUCER) {
synchronized (MQClient.class) {
if (null == PRODUCER) {
- PRODUCER = buildProducer(instanceId, topicName, this.serviceClient, this.credentials, this.endpoint);
+ PRODUCER = buildProducer(instanceId, topicName, this.serviceClient, this.credentialProvider, this.endpoint);
}
}
}
@@ -167,7 +200,7 @@ public MQTransProducer getTransProducer(String topicName, String groupId) {
if (null == TRANSACTIONAL_PRODUCER) {
synchronized (MQClient.class) {
if (null == TRANSACTIONAL_PRODUCER) {
- TRANSACTIONAL_PRODUCER = buildTransactionalProducer(null, topicName, groupId, this.serviceClient, this.credentials, this.endpoint);
+ TRANSACTIONAL_PRODUCER = buildTransactionalProducer(null, topicName, groupId, this.serviceClient, this.credentialProvider, this.endpoint);
}
}
}
@@ -186,7 +219,7 @@ public MQTransProducer getTransProducer(String instanceId, String topicName, Str
if (null == TRANSACTIONAL_PRODUCER) {
synchronized (MQClient.class) {
if (null == TRANSACTIONAL_PRODUCER) {
- TRANSACTIONAL_PRODUCER = buildTransactionalProducer(instanceId, topicName, groupId, this.serviceClient, this.credentials, this.endpoint);
+ TRANSACTIONAL_PRODUCER = buildTransactionalProducer(instanceId, topicName, groupId, this.serviceClient, this.credentialProvider, this.endpoint);
}
}
}
@@ -205,7 +238,7 @@ public MQConsumer getConsumer(String topicName, String consumer, String messageT
if (null == CONSUMER) {
synchronized (MQClient.class) {
if (null == CONSUMER) {
- CONSUMER = buildConsumer(null, topicName, consumer, messageTag, this.serviceClient, this.credentials, this.endpoint);
+ CONSUMER = buildConsumer(null, topicName, consumer, messageTag, this.serviceClient, this.credentialProvider, this.endpoint);
}
}
}
@@ -223,7 +256,7 @@ public MQConsumer getConsumer(String topicName, String consumer) {
if (null == CONSUMER) {
synchronized (MQClient.class) {
if (null == CONSUMER) {
- CONSUMER = buildConsumer(null, topicName, consumer, null, this.serviceClient, this.credentials, this.endpoint);
+ CONSUMER = buildConsumer(null, topicName, consumer, null, this.serviceClient, this.credentialProvider, this.endpoint);
}
}
}
@@ -243,7 +276,7 @@ public MQConsumer getConsumer(String instanceId, String topicName, String consum
if (null == CONSUMER) {
synchronized (MQClient.class) {
if (null == CONSUMER) {
- CONSUMER = buildConsumer(instanceId, topicName, consumer, messageTag, this.serviceClient, this.credentials, this.endpoint);
+ CONSUMER = buildConsumer(instanceId, topicName, consumer, messageTag, this.serviceClient, this.credentialProvider, this.endpoint);
}
}
}
@@ -254,7 +287,7 @@ public MQConsumer getConsumer(String instanceId, String topicName, String consum
public String toString() {
final StringBuilder sb = new StringBuilder("MQClient{");
sb.append("endpoint=").append(endpoint);
- sb.append(", credentials=").append(credentials);
+ sb.append(", credentialProvider=").append(credentialProvider);
sb.append('}');
return sb.toString();
}
diff --git a/src/main/java/com/aliyun/mq/http/MQConsumer.java b/src/main/java/com/aliyun/mq/http/MQConsumer.java
index a59692d..49d1bff 100644
--- a/src/main/java/com/aliyun/mq/http/MQConsumer.java
+++ b/src/main/java/com/aliyun/mq/http/MQConsumer.java
@@ -1,5 +1,9 @@
package com.aliyun.mq.http;
+import com.aliyun.auth.credentials.Credential;
+import com.aliyun.auth.credentials.provider.ICredentialProvider;
+import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
+import com.aliyun.mq.http.common.utils.ServiceCredentialsWrapper;
import com.aliyun.mq.http.model.AsyncCallback;
import com.aliyun.mq.http.model.AsyncResult;
import com.aliyun.mq.http.common.ClientException;
@@ -36,7 +40,7 @@ public class MQConsumer {
/**
* object content user auth info
*/
- private final ServiceCredentials credentials;
+ private final ICredentialProvider credentialProvider;
/**
* user mq http endpoint, ie: http://uid.mqrest.region.aliyuncs.com/
*/
@@ -57,9 +61,27 @@ public class MQConsumer {
*/
protected MQConsumer(String instanceId, String topicName, String consumer, String messageTag, ServiceClient client,
ServiceCredentials credentials, URI endpoint) {
+ this(instanceId, topicName, consumer, messageTag, client, StaticCredentialProvider.create(Credential.builder()
+ .accessKeyId(credentials.getAccessKeyId())
+ .accessKeySecret(credentials.getAccessKeySecret())
+ .securityToken(credentials.getSecurityToken())
+ .build()), endpoint);
+ }
+
+ /**
+ * @param instanceId, instance id
+ * @param topicName, topic name
+ * @param consumer mq cid
+ * @param messageTag message tag for filter
+ * @param client, ServiceClient object
+ * @param credentialProvider, ICredentialProvider object
+ * @param endpoint, user mq http endpoint, ie: http://uid.mqrest.region.aliyuncs.com/
+ */
+ protected MQConsumer(String instanceId, String topicName, String consumer, String messageTag, ServiceClient client,
+ ICredentialProvider credentialProvider, URI endpoint) {
this.instanceId = instanceId;
this.serviceClient = client;
- this.credentials = credentials;
+ this.credentialProvider = credentialProvider;
this.endpoint = endpoint;
String uri = endpoint.toString();
@@ -139,6 +161,7 @@ public List consumeMessage(int num, int pollingSecond)
request.setInstanceId(this.instanceId);
try {
+ ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials());
ConsumeMessageAction action = new ConsumeMessageAction(serviceClient, credentials, endpoint);
request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES);
return action.executeWithCustomHeaders(request, null);
@@ -179,6 +202,7 @@ public List consumeMessageOrderly(int num, int pollingSecond)
request.setTrans("order");
try {
+ ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials());
ConsumeMessageAction action = new ConsumeMessageAction(serviceClient, credentials, endpoint);
request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES);
return action.executeWithCustomHeaders(request, null);
@@ -211,6 +235,7 @@ public AsyncResult> asyncConsumeMessage(int num, int pollingSecond
request.setWaitSeconds(pollingSecond);
request.setInstanceId(this.instanceId);
+ ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials());
ConsumeMessageAction action = new ConsumeMessageAction(serviceClient, credentials, endpoint);
request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES);
return action.executeWithCustomHeaders(request, callback, null);
@@ -225,6 +250,7 @@ public AsyncResult> asyncConsumeMessage(int num, int pollingSecond
* @throws ClientException Exception from client
*/
public void ackMessage(List receiptHandles) throws ServiceException, ClientException {
+ ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials());
AckMessageAction action = new AckMessageAction(serviceClient, credentials, endpoint);
AckMessageRequest request = new AckMessageRequest();
@@ -248,6 +274,7 @@ public void ackMessage(List receiptHandles) throws ServiceException, Cli
*/
public AsyncResult asyncAckMessage(List receiptHandles, AsyncCallback callback)
throws ServiceException, ClientException {
+ ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials());
AckMessageAction action = new AckMessageAction(serviceClient, credentials, endpoint);
AckMessageRequest request = new AckMessageRequest();
diff --git a/src/main/java/com/aliyun/mq/http/MQProducer.java b/src/main/java/com/aliyun/mq/http/MQProducer.java
index 8b884c0..4d420ba 100644
--- a/src/main/java/com/aliyun/mq/http/MQProducer.java
+++ b/src/main/java/com/aliyun/mq/http/MQProducer.java
@@ -1,5 +1,9 @@
package com.aliyun.mq.http;
+import com.aliyun.auth.credentials.Credential;
+import com.aliyun.auth.credentials.provider.ICredentialProvider;
+import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
+import com.aliyun.mq.http.common.utils.ServiceCredentialsWrapper;
import com.aliyun.mq.http.model.AsyncCallback;
import com.aliyun.mq.http.model.AsyncResult;
import com.aliyun.mq.http.model.action.PublishMessageAction;
@@ -23,7 +27,7 @@ public class MQProducer {
/**
* object content user auth info
*/
- protected ServiceCredentials credentials;
+ protected ICredentialProvider credentialProvider;
/**
* user mq http endpoint, ie: http://uid.mqrest.region.aliyuncs.com/
*/
@@ -42,9 +46,26 @@ public class MQProducer {
*/
protected MQProducer(String instanceId, String topicName, ServiceClient client,
ServiceCredentials credentials, URI endpoint) {
+ this(instanceId, topicName, client,
+ StaticCredentialProvider.create(Credential.builder()
+ .accessKeyId(credentials.getAccessKeyId())
+ .accessKeySecret(credentials.getAccessKeySecret())
+ .securityToken(credentials.getSecurityToken())
+ .build()), endpoint);
+ }
+
+ /**
+ * @param instanceId, instance id
+ * @param topicName, topic name
+ * @param client, ServiceClient object
+ * @param credentialProvider, ICredentialProvider object
+ * @param endpoint, user mq http endpoint, ie: http://uid.mqrest.region.aliyuncs.com/
+ */
+ protected MQProducer(String instanceId, String topicName, ServiceClient client,
+ ICredentialProvider credentialProvider, URI endpoint) {
this.instanceId = instanceId;
this.serviceClient = client;
- this.credentials = credentials;
+ this.credentialProvider = credentialProvider;
this.endpoint = endpoint;
String uri = endpoint.toString();
@@ -105,6 +126,7 @@ public TopicMessage publishMessage(TopicMessage msg) throws ServiceException, Cl
PublishMessageRequest request = new PublishMessageRequest();
request.setMessage(msg);
request.setInstanceId(instanceId);
+ ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials());
PublishMessageAction action = new PublishMessageAction(serviceClient, credentials, endpoint);
request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES);
return action.executeWithCustomHeaders(request, null);
@@ -128,6 +150,7 @@ public AsyncResult asyncPublishMessage(TopicMessage msg, AsyncCall
PublishMessageRequest request = new PublishMessageRequest();
request.setMessage(msg);
request.setInstanceId(instanceId);
+ ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials());
PublishMessageAction action = new PublishMessageAction(serviceClient, credentials, endpoint);
request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES);
return action.executeWithCustomHeaders(request, callback, null);
diff --git a/src/main/java/com/aliyun/mq/http/MQTransProducer.java b/src/main/java/com/aliyun/mq/http/MQTransProducer.java
index 28ae54b..5c593b9 100644
--- a/src/main/java/com/aliyun/mq/http/MQTransProducer.java
+++ b/src/main/java/com/aliyun/mq/http/MQTransProducer.java
@@ -1,10 +1,12 @@
package com.aliyun.mq.http;
+import com.aliyun.auth.credentials.provider.ICredentialProvider;
import com.aliyun.mq.http.common.ClientException;
import com.aliyun.mq.http.common.Constants;
import com.aliyun.mq.http.common.ServiceException;
import com.aliyun.mq.http.common.auth.ServiceCredentials;
import com.aliyun.mq.http.common.http.ServiceClient;
+import com.aliyun.mq.http.common.utils.ServiceCredentialsWrapper;
import com.aliyun.mq.http.model.AsyncCallback;
import com.aliyun.mq.http.model.AsyncResult;
import com.aliyun.mq.http.model.Message;
@@ -36,6 +38,19 @@ protected MQTransProducer(String instanceId, String topicName, String groupId, S
this.groupId = groupId;
}
+ /**
+ * @param instanceId, instance id
+ * @param topicName, topic name
+ * @param client, ServiceClient object
+ * @param credentialProvider, ICredentialProvider object
+ * @param endpoint, user mq http endpoint, ie: http://uid.mqrest.region.aliyuncs.com/
+ */
+ protected MQTransProducer(String instanceId, String topicName, String groupId, ServiceClient client,
+ ICredentialProvider credentialProvider, URI endpoint) {
+ super(instanceId, topicName, client, credentialProvider, endpoint);
+ this.groupId = groupId;
+ }
+
/**
* consume half message to check transaction status, three choice: {@link #commit(String)} , {@link #rollback(String)}
* or do nothing (after 10s will get the message again).
@@ -58,6 +73,7 @@ public List consumeHalfMessage(int num, int pollingSecond) throws Servi
request.setTrans(Constants.PARAM_TRANSACTION_V_POP);
try {
+ ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials());
ConsumeMessageAction action = new ConsumeMessageAction(serviceClient, credentials, endpoint);
request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES);
return action.executeWithCustomHeaders(request, null);
@@ -90,6 +106,7 @@ public AsyncResult> asyncConsumeHalfMessage(int num, int pollingSe
request.setConsumer(groupId);
request.setTrans(Constants.PARAM_TRANSACTION_V_POP);
+ ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials());
ConsumeMessageAction action = new ConsumeMessageAction(serviceClient, credentials, endpoint);
request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES);
return action.executeWithCustomHeaders(request, callback, null);
@@ -103,6 +120,7 @@ public AsyncResult> asyncConsumeHalfMessage(int num, int pollingSe
* @throws ClientException Exception from client
*/
public void commit(String handle) throws ServiceException, ClientException {
+ ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials());
AckMessageAction action = new AckMessageAction(serviceClient, credentials, endpoint);
AckMessageRequest request = new AckMessageRequest();
request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES);
@@ -122,6 +140,7 @@ public void commit(String handle) throws ServiceException, ClientException {
* @throws ClientException Exception from client
*/
public void rollback(String handle) throws ServiceException, ClientException {
+ ServiceCredentials credentials = ServiceCredentialsWrapper.WrapServiceCredentials(credentialProvider.getCredentials());
AckMessageAction action = new AckMessageAction(serviceClient, credentials, endpoint);
AckMessageRequest request = new AckMessageRequest();
request.setRequestPath(topicURL + "/" + Constants.LOCATION_MESSAGES);
diff --git a/src/main/java/com/aliyun/mq/http/common/utils/ServiceCredentialsWrapper.java b/src/main/java/com/aliyun/mq/http/common/utils/ServiceCredentialsWrapper.java
new file mode 100644
index 0000000..f51a7c3
--- /dev/null
+++ b/src/main/java/com/aliyun/mq/http/common/utils/ServiceCredentialsWrapper.java
@@ -0,0 +1,10 @@
+package com.aliyun.mq.http.common.utils;
+
+import com.aliyun.auth.credentials.ICredential;
+import com.aliyun.mq.http.common.auth.ServiceCredentials;
+
+public class ServiceCredentialsWrapper {
+ public static ServiceCredentials WrapServiceCredentials(ICredential credential) {
+ return new ServiceCredentials(credential.accessKeyId(), credential.accessKeySecret(), credential.securityToken());
+ }
+}