Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of AWS authentication using the Assume Role feature - … #20

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 117 additions & 2 deletions component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,109 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>${aws.java.sqs.sdk.version.range}</version>
</dependency>
<!-- Additional dependency for AWS Role Delegation -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
</dependency>

<!-- These are OSGi bundles that might be useful to avoid embedding them in dependant jars
The only ones that are required for this project are
- httpclient-osgi-4.5.12.jar
- httpcore-osgi-4.4.13.jar

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-osgi</artifactId>
<version>${aws-java-sdk-osgi.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>software.amazon.ion</groupId>
<artifactId>ion-java</artifactId>
<version>1.5.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient-osgi</artifactId>
<version>${apache.httpclient.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-osgi</artifactId>
<version>${apache.httpclient.core.version}</version>
<scope>provided</scope>
</dependency>
-->

<!--3rd party dependencies for sqs sdk-->
<dependency>
Expand Down Expand Up @@ -209,11 +310,12 @@
<instructions>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Bundle-Name>${project.artifactId}</Bundle-Name>
<!-- We need a recent version of org.apache.http.* so we add the version range -->
<Private-Package>
com.amazonaws.*,
com.fasterxml.jackson.core.*,
com.fasterxml.jackson.dataformat.cbor.*,
org.apache.http.*,
org.apache.http.*;version="[4.4.12,4.5.0)",
org.apache.commons.codec.*,
org.joda.time.*,
org.joda.convert.*,
Expand All @@ -226,7 +328,20 @@
<Export-Package>
io.siddhi.extension.io.sqs.*;version="${project.version}",
</Export-Package>
<!-- We need to import the latest org.apache.http.* packages -->
<Import-Package>
org.apache.http.auth.*;version="${apache.httpclient.version.range}",
org.apache.http.cookie.*;version="${apache.httpclient.version.range}",
org.apache.http.conn.*;version="${apache.httpclient.version.range}",
org.apache.http.client.*;version="${apache.httpclient.version.range}",
org.apache.http.entity.mime.*;version="${apache.httpclient.version.range}",
org.apache.http.entity.mime.content.*;version="${apache.httpclient.version.range}",
org.apache.http.impl.auth.*;version="${apache.httpclient.version.range}",
org.apache.http.impl.cookie.*;version="${apache.httpclient.version.range}",
org.apache.http.impl.conn.*;version="${apache.httpclient.version.range}",
org.apache.http.impl.execchain.*;version="${apache.httpclient.version.range}",
org.apache.http.impl.client.*;version="${apache.httpclient.version.range}",
org.apache.http.osgi.services.*;version="${apache.httpclient.version.range}",
com.amazonaws.*;version="${aws.java.sqs.sdk.version.range}",
io.siddhi.annotation.*;version="${siddhi.version.range}",
io.siddhi.core.*;version="${siddhi.version.range}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@

package io.siddhi.extension.io.sqs.api;

import java.util.Optional;

import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;

import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.util.transport.OptionHolder;
Expand All @@ -39,16 +48,32 @@
public class SQSBuilder {
private AmazonSQS amazonSQS;
private SQSConfig sqsConfig;

/** If there is the Role Arn as a system property, use it */
private static final String ENV_ROLE_ARN = "AWS_SQS_ROLE_ARN";
/** If there is the Role Session Name as a system property, use it */
private static final String ENV_ROLE_SESSION_NAME = "AWS_SQS_ROLE_SESSION_NAME";


public SQSBuilder(SQSConfig sqsConfig) {
this.sqsConfig = sqsConfig;
BasicAWSCredentials credentials = new BasicAWSCredentials(sqsConfig.getAccessKey(), sqsConfig.getSecretKey());

try {
this.amazonSQS = AmazonSQSClientBuilder.standard()
if (!sqsConfig.isDelegationEnabled()) {
//Standard auth
this.amazonSQS = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.withRegion(sqsConfig.getRegion())
.build();
} else {
//Assume Role auth
EndpointConfiguration endpointConfiguration = new EndpointConfiguration("sqs."
+ sqsConfig.getRegion() + ".amazonaws.com", sqsConfig.getRegion());
this.amazonSQS = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(endpointConfiguration)
.withCredentials(this.getSTSAssumeRoleSessionCredentialsProvider()).build();
}
} catch (SdkClientException e) {
throw new SiddhiAppRuntimeException(
"Failed to create SQS client due to invalid configuration. " + e.getMessage(), e);
Expand All @@ -71,4 +96,59 @@ public SQSMessagePublisher buildSinkPublisher(OptionHolder optionHolder, boolean

return null;
}

/**
* Return credentials to be used for exchanging long term credentials with temporary ones
* @return
*/
private AWSCredentialsProvider getSTSAssumeRoleSessionCredentialsProvider() {
return new STSAssumeRoleSessionCredentialsProvider.Builder(
this.envRoleArn(),
this.envRoleSessionName()
).withStsClient(getAWSSecurityTokenService()).build();
}

/**
* Return an instance of STS configured with long term credentials
* @return
*/
private AWSSecurityTokenService getAWSSecurityTokenService() {

AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(envCredentials());

return AWSSecurityTokenServiceClientBuilder.standard()
.withRegion(this.sqsConfig.getRegion())
.withCredentials(credentialsProvider)
.build();
}

/**
* Retrieve the configured short-term credentials.
* TODO Offer the possibility of reading them from environment system properties,
* just like Role Arn and Role Session Name below
* @return Short Term Credentials
*/
private AWSCredentials envCredentials() {
return new BasicAWSCredentials(sqsConfig.getAccessKey(), sqsConfig.getSecretKey());
}

/**
* Retrieve the Role Arn as a system environment property.
* If not defined, use the configured one
* @return AWS Role Arn
*/
private String envRoleArn() {
Optional<String> roleArn = Optional.ofNullable(System.getenv(ENV_ROLE_ARN));
return roleArn.orElse(sqsConfig.getRoleArn());
}

/**
* Retrieve the Role Session Name as a system environment property.
* If not defined, use the configured one
* @return AWS Role Arn
*/
private String envRoleSessionName() {
Optional<String> roleSessionName = Optional.ofNullable(System.getenv(ENV_ROLE_SESSION_NAME));
return roleSessionName.orElse(sqsConfig.getRoleSessionName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
import io.siddhi.query.api.exception.SiddhiAppValidationException;

/**
* SQS Sink Extension
* SQS_ar (Assume Role) Sink Extension
*/

@Extension(
name = "sqs",
name = "sqs_ar",
namespace = "sink",
description = "SQS sink allows users to connect and publish messages to an AWS SQS Queue. It has the" +
" ability to only publish Text messages",
Expand Down Expand Up @@ -72,6 +72,27 @@
description = "Amazon Web Service Region",
type = DataType.STRING
),
@Parameter(
name = SQSConstants.ROLE_ARN_NAME,
description = "Amazon Web Service Role ARN for role delegation",
type = DataType.STRING,
optional = true,
defaultValue = "none"
),
@Parameter(
name = SQSConstants.ROLE_SESSION_NAME,
description = "Amazon Web Service Role Session Name for role delegation",
type = DataType.STRING,
optional = true,
defaultValue = "none"
),
@Parameter(
name = SQSConstants.USE_DELEGATION_NAME,
description = "Enable Role Delegation",
type = DataType.BOOL,
optional = true,
defaultValue = "" + SQSConstants.DEFAULT_USE_DELEGATION
),
@Parameter(
name = SQSConstants.MESSAGE_GROUP_ID_NAME,
description = "ID of the group that the message belong to(only applicable for FIFO Queues)",
Expand Down Expand Up @@ -100,10 +121,13 @@
},
examples = {
@Example(
syntax = "@sink(type='sqs'," +
syntax = "@sink(type='sqs_ar'," +
"queue='https://amazon.sqs.queue.url'," +
"access.key='aws.access.key'," +
"secret.key='aws.secret.key'," +
"use.delegation='true'," +
"role.arn='arn:aws:iam::123456789012:role/some-role-name'," +
"role.session.name='some-session-name'," +
"region='us-east-1'," +
"delay.interval='5'," +
"message.group.id='group-1',@map(type='xml') )" +
Expand All @@ -116,10 +140,13 @@
"queue using provided configurations and send the message to the queue.\n"
),
@Example(
syntax = "@sink(type='sqs'," +
syntax = "@sink(type='sqs_ar'," +
"queue='https://amazon.sqs.queue.fifo'," +
"access.key='aws.access.key'," +
"secret.key='aws.secret.key'," +
"use.delegation='true'," +
"role.arn='arn:aws:iam::123456789012:role/some-role-name'," +
"role.session.name='some-session-name'," +
"region='us-east-1'," +
"delay.interval='5'," +
"deduplication.id='{{deduplicationID}}'," +
Expand Down Expand Up @@ -204,6 +231,17 @@ protected StateFactory init(StreamDefinition streamDefinition, OptionHolder opti
throw new SiddhiAppValidationException("Access key and Secret key are mandatory parameters for" +
" the SQS client");
}

// START Customisation to support delegation (a.k.a. "Assume Role")
if (this.sinkConfig.getRoleArn() == null || sinkConfig.getRoleArn().isEmpty()) {
this.sinkConfig.setRoleArn(configReader.readConfig(SQSConstants.ROLE_ARN_NAME, null));
}

if (this.sinkConfig.getRoleSessionName() == null || sinkConfig.getRoleSessionName().isEmpty()) {
this.sinkConfig.setRoleSessionName(configReader.readConfig(SQSConstants.ROLE_SESSION_NAME, null));
}
// END Customisation to support delegation (a.k.a. "Assume Role")

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ public SQSSinkConfig(OptionHolder optionHolder) {
case SQSConstants.REGION_NAME:
super.setRegion(optionHolder.validateAndGetStaticValue(key));
break;
// START Customisation to support delegation (a.k.a. "Assume Role")
case SQSConstants.USE_DELEGATION_NAME:
super.setUseDelegation(getBooleanValue(optionHolder, key));
break;
case SQSConstants.ROLE_ARN_NAME:
super.setRoleArn(optionHolder.validateAndGetStaticValue(key));
break;
case SQSConstants.ROLE_SESSION_NAME:
super.setRoleSessionName(optionHolder.validateAndGetStaticValue(key));
break;
// END Customisation to support delegation (a.k.a. "Assume Role")
case SQSConstants.DELAY_INTERVAL_NAME:
this.setDelayIntervalTime(getIntegerOptionValue(optionHolder, key));
break;
Expand Down
Loading