Access data stored in Amazon DynamoDB with Apache Hadoop, Apache Hive, and Apache Spark
This is a fork from awslabs/emr-dynamodb-connector, including features specific to ScyllaDB Alternator and to the needs of the ScyllaDB Migrator.
- Add
DynamoDBConstants.CUSTOM_CLIENT_BUILDER_TRANSFORMER
that allows users to pass in the Hadoop job configuration the name of a class that implementsDynamoDbClientBuilderTransformer
to customize the underlying DynamoDB client. - Automatically filter out expired items of tables where TTL is enabled. To use it, set
DynamoDBConstants.TTL_ATTRIBUTE_NAME
in the job configuration to the TTL attribute name of the table. - Change the strategy that splits Hadoop jobs into tasks (in class
DynamoDBInputFormat
) as follows:- Break down the data into chunks of 100 MB instead of 1 GB;
- Set the number of task mappers to the number of scan segments by default;
- Do not constrain the number of task mappers based on the estimated memory of the worker nodes.
- Add support for excluding a set of Scan segments.
- Load the AWS session token from the credentials provided by Spark.
The complete changelog can be viewed here: master...scylla-5.x.
You can use this connector to access data in Amazon DynamoDB using Apache Hadoop, Apache Hive, and Apache Spark in Amazon EMR. You can process data directly in DynamoDB using these frameworks, or join data in DynamoDB with data in Amazon S3, Amazon RDS, or other storage layers that can be accessed by Amazon EMR.
- Using Apache Hive in Amazon EMR with Amazon DynamoDB
- Accessing data in Amazon DynamoDB with Apache Spark
Currently, the connector supports the following data types:
Hive type | Default DynamoDB type | Alternate DynamoDb type(s) |
---|---|---|
string | string (S) | |
bigint or double | number (N) | |
binary | binary (B) | |
boolean | boolean (BOOL) | |
array | list (L) | number set (NS), string set (SS), binary set (BS) |
map<string,string> | item (ITEM) | map (M) |
map<string,?> | map (M) | |
struct | map (M) |
The connector can serialize null values as DynamoDB null type (NULL).
For more information, see Hive Commands Examples for Exporting, Importing, and Querying Data in DynamoDB in the Amazon DynamoDB Developer Guide.
An implementation of Apache Hadoop InputFormat interface and OutputFormat are included, which allows DynamoDB AttributeValues to be directly ingested by MapReduce jobs. For an example of how to use these classes, see Set Up a Hive Table to Run Hive Commands in the Amazon EMR Release Guide, as well as their usage in the Import/Export tool classes in DynamoDBExport.java and DynamoDBImport.java.
This simple tool that makes use of the InputFormat and OutputFormat implementations provides an easy way to import to and export data from DynamoDB.
Currently the project builds against Hive 2.3.0, 1.2.1, and 1.0.0. Set this by using the hive1.version
,
hive1.2.version
and hive2.version
properties in the root Maven pom.xml
, respectively.
You need Java 8 and Maven.
After cloning, run mvn clean install
.
Syntax to create a table using the DynamoDBStorageHandler class:
CREATE EXTERNAL TABLE hive_tablename (
hive_column1_name column1_datatype,
hive_column2_name column2_datatype
)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES (
"dynamodb.table.name" = "dynamodb_tablename",
"dynamodb.column.mapping" =
"hive_column1_name:dynamodb_attribute1_name,hive_column2_name:dynamodb_attribute2_name",
"dynamodb.type.mapping" =
"hive_column1_name:dynamodb_attribute1_type_abbreviation",
"dynamodb.null.serialization" = "true"
);
dynamodb.type.mapping
and dynamodb.null.serialization
are optional parameters.
Hive query will automatically choose the most suitable secondary index if there is any based on the search condition. For an index that can be chosen, it should have following properties:
- It has all its index keys in Hive query search condition;
- It contains all the DynamoDB attributes mentioned in
dynamodb.column.mapping
. (If you have to map more columns than index attributes in your Hive table but still want to use an index when running queries that only select the attributes within that index, consider create another Hive table and narrow down the mappings to only include the index attributes. Use that table for reading the index attributes to reduce table scans)
Using the DynamoDBInputFormat and DynamoDBOutputFormat classes with spark-shell
:
$ spark-shell --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar
...
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable
var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.input.tableName", "myDynamoDBTable")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
var orders = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
orders.count()
java -cp target/emr-dynamodb-tools-4.2.0-SNAPSHOT.jar org.apache.hadoop.dynamodb.tools.DynamoDBExport /where/output/should/go my-dynamo-table-name
java -cp target/emr-dynamodb-tools-4.2.0-SNAPSHOT.jar org.apache.hadoop.dynamodb.tools.DynamoDBImport /where/input/data/is my-dynamo-table-name
export <path> <table-name> [<read-ratio>] [<total-segment-count>]
read-ratio: maximum percent of the specified DynamoDB table's read capacity to use for export
total-segments: number of desired MapReduce splits to use for the export
import <path> <table-name> [<write-ratio>]
write-ratio: maximum percent of the specified DynamoDB table's write capacity to use for import
To depend on the specific components in your projects, add one (or both) of the following to your
pom.xml
.
<dependency>
<groupId>com.scylladb.alternator</groupId>
<artifactId>emr-dynamodb-hadoop</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>com.scylladb.alternator</groupId>
<artifactId>emr-dynamodb-hive</artifactId>
<version>5.5.0</version>
</dependency>
-
If you find a bug or would like to see an improvement, open an issue.
Check first to make sure there isn't one already open. We'll do our best to respond to issues and review pull-requests
-
Want to fix it yourself? Open a pull request!
If adding new functionality, include new, passing unit tests, as well as documentation. Also include a snippet in your pull request showing that all current unit tests pass. Tests are ran by default when invoking any goal for maven that results in the
package
goal being executed (mvn clean install
will run them and produce output showing such). -
Follow the Google Java Style Guide
Style is enforced at build time using the Apache Maven Checkstyle Plugin.
Our branch scylla-5.x
is based off commit 07391dd0937bdbb20b86ec79444798df11b0711f
. It contains backwards compatible changes only.
Breaking changes will have to go to a new branch, scylla-6.x
.
We may occasionally merge the upstream changes to our fork.
- Create a new GitHub Release. Its associated tag should be the project current version as set in the root
pom.xml
file without the-SNAPSHOT
suffix. - Write or generate the release notes to document the bug fixes, new features, or breaking changes.
- Publish the release.
- A GitHub workflow will be automatically triggered. It will publish the artifacts to Sonatype.
- Manually bump the release in all the
pom.xml
files by running a command like:The new release version should be higher than the version you just released, and it should have themvn versions:set -DnewVersion=5.6.7-SNAPSHOT
-SNAPSHOT
suffix. - Commit the new version.