Skip to content

Commit

Permalink
v3.0 with utility query support
Browse files Browse the repository at this point in the history
  • Loading branch information
Gary Arora committed Jan 22, 2019
1 parent 2024a8f commit a558602
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 75 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Athena-Express: Simplifying SQL queries on Amazon Athena
# New Document# Athena-Express: Simplifying SQL queries on Amazon Athena

[![NPM](https://nodei.co/npm/athena-express.png?compact=true)](https://nodei.co/npm/athena-express/)

Expand Down Expand Up @@ -298,6 +298,19 @@ exports.handler = async (event, context, callback) => {

<img src="https://image.ibb.co/fpARNA/carbon-2.png" alt="Athena-Express result" width="400">

## More Examples
###### UTILITY queries - Added in v3.0
```javascript
const results = await athenaExpress.query("SHOW TABLES");
console.log(results);
//Output:
{ Items:
[ { row: 'default' },
{ row: 'sampledb' } ] }
```


## Contributors

[Gary Arora](https://twitter.com/AroraGary)
Expand Down
200 changes: 127 additions & 73 deletions lib/athenaExpress.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ const COST_PER_MB = 0.000004768, //Based on $5/TB
BYTES_IN_MB = 1048576,
COST_FOR_10MB = COST_PER_MB * 10;

const readline = require("readline");

module.exports = class AthenaExpress {
constructor(init) {
validateConstructor(init);
this.config = {
athena: new init.aws.Athena({ apiVersion: "2017-05-18" }),
s3:
s3: new init.aws.S3({ apiVersion: "2006-03-01" }),
s3Bucket:
init.s3 ||
`s3://athena-express-${init.aws.config.credentials.accessKeyId
.substring(0, 10)
Expand All @@ -22,42 +25,43 @@ module.exports = class AthenaExpress {
}

async query(query) {
const config = this.config;
let results = {};

if (!this.config)
if (!config)
throw new TypeError("Config object not present in the constructor");

if (!query) throw new TypeError("SQL query is missing");

try {
const queryExecutionId = await startQueryExecution(
query,
this.config
);
const stats = await checkIfExecutionCompleted(
queryExecutionId,
this.config
);
const queryResults = await getQueryResults(
const queryExecutionId = await startQueryExecution(query, config);
const queryStatus = await checkIfExecutionCompleted(
queryExecutionId,
this.config
config
);

results.Items = this.config.formatJson
? cleanUpResults(queryResults)
: queryResults;
const s3Output =
queryStatus.QueryExecution.ResultConfiguration
.OutputLocation,
statementType = queryStatus.QueryExecution.StatementType;

results.Items = await getQueryResultsFromS3({
s3Output,
statementType,
config
});

if (this.config.getStats) {
if (config.getStats) {
const dataInMb = Math.round(
stats.QueryExecution.Statistics.DataScannedInBytes /
queryStatus.QueryExecution.Statistics.DataScannedInBytes /
BYTES_IN_MB
);

results.DataScannedInMB = dataInMb;
results.QueryCostInUSD =
dataInMb > 10 ? dataInMb * COST_PER_MB : COST_FOR_10MB;
results.EngineExecutionTimeInMillis =
stats.QueryExecution.Statistics.EngineExecutionTimeInMillis;
queryStatus.QueryExecution.Statistics.EngineExecutionTimeInMillis;
results.Count = results.Items.length;
}

Expand All @@ -68,25 +72,27 @@ module.exports = class AthenaExpress {
}
};

async function startQueryExecution(query, config) {
function startQueryExecution(query, config) {
const QueryString = query.sql || query;

const params = {
QueryString: query.sql || query,
QueryString,
ResultConfiguration: {
OutputLocation: config.s3
OutputLocation: config.s3Bucket
},
QueryExecutionContext: {
Database: query.db || config.db
}
};
return new Promise(function(resolve, reject) {
let startQueryExecutionRecursively = async function() {
const startQueryExecutionRecursively = async function() {
try {
let data = await config.athena
.startQueryExecution(params)
.promise();
resolve(data.QueryExecutionId);
} catch (err) {
commonAthenaErrors(err)
isCommonAthenaError(err.code)
? setTimeout(() => {
startQueryExecutionRecursively();
}, 2000)
Expand All @@ -97,10 +103,10 @@ async function startQueryExecution(query, config) {
});
}

async function checkIfExecutionCompleted(QueryExecutionId, config) {
function checkIfExecutionCompleted(QueryExecutionId, config) {
let retry = config.retry;
return new Promise(function(resolve, reject) {
let keepCheckingRecursively = async function() {
const keepCheckingRecursively = async function() {
try {
let data = await config.athena
.getQueryExecution({ QueryExecutionId })
Expand All @@ -116,7 +122,7 @@ async function checkIfExecutionCompleted(QueryExecutionId, config) {
}, retry);
}
} catch (err) {
if (commonAthenaErrors(err)) {
if (isCommonAthenaError(err.code)) {
retry = 2000;
setTimeout(() => {
keepCheckingRecursively();
Expand All @@ -128,50 +134,21 @@ async function checkIfExecutionCompleted(QueryExecutionId, config) {
});
}

async function getQueryResults(QueryExecutionId, config) {
return new Promise(function(resolve, reject) {
let gettingQueryResultsRecursively = async function() {
try {
let queryResults = await config.athena
.getQueryResults({ QueryExecutionId })
.promise();
resolve(queryResults.ResultSet.Rows);
} catch (err) {
commonAthenaErrors(err)
? setTimeout(() => {
gettingQueryResultsRecursively();
}, 2000)
: reject(err);
}
};
gettingQueryResultsRecursively();
});
}

function cleanUpResults(results) {
if (!results.length) return results;

let rowIterator = 1,
columnIterator = 0,
cleanedUpObject = {},
cleanedUpResults = [];

const rowCount = results.length,
fieldNames = results[0].Data,
columnCount = fieldNames.length;

for (; rowIterator < rowCount; rowIterator++) {
for (; columnIterator < columnCount; columnIterator++) {
cleanedUpObject[
Object.values(fieldNames[columnIterator])[0]
] = Object.values(results[rowIterator].Data[columnIterator])[0];
}
cleanedUpResults.push(cleanedUpObject);
cleanedUpObject = {};
columnIterator = 0;
function getQueryResultsFromS3(params) {
const s3Params = {
Bucket: params.s3Output.split("/")[2],
Key: params.s3Output.split("/")[3]
},
input = params.config.s3.getObject(s3Params).createReadStream(),
lineReader = readline.createInterface({ input });

if (params.config.formatJson) {
return params.statementType === "DML"
? cleanUpDML(lineReader)
: cleanUpNonDML(lineReader);
} else {
return getRawResultsFromS3(lineReader);
}

return cleanedUpResults;
}

function validateConstructor(init) {
Expand All @@ -188,10 +165,87 @@ function validateConstructor(init) {
}
}

function commonAthenaErrors(err) {
return err.code === "TooManyRequestsException" ||
err.code === "ThrottlingException" ||
err.code === "NetworkingError"
function isCommonAthenaError(err) {
return err === "TooManyRequestsException" ||
err === "ThrottlingException" ||
err === "NetworkingError" ||
err === "UnknownEndpoint"
? true
: false;
}

function isCommonS3Error(err) {
return err === "NetworkingError" ||
err === "StreamContentLengthMismatch" ||
err === "NoSuchKey"
? true
: false;
}

function getRawResultsFromS3(lineReader) {
let rawJson = [];
return new Promise(function(resolve, reject) {
lineReader
.on("line", line => {
rawJson.push(line.trim());
})
.on("close", function() {
resolve(rawJson);
});
z;
});
}

function cleanUpDML(lineReader) {
let headerList = [],
isFirstRecord = true,
cleanJson = [],
noOfColumns = 0,
singleJsonRow = {};

return new Promise(function(resolve, reject) {
lineReader
.on("line", line => {
line = line.substring(1, line.length - 1).split('","');

if (isFirstRecord) {
headerList = line;
isFirstRecord = false;
} else {
singleJsonRow = {};
noOfColumns = line.length;
for (let i = 0; i < noOfColumns; i++) {
singleJsonRow[[headerList[i]]] = line[i];
}
cleanJson.push(singleJsonRow);
}
})
.on("close", function() {
resolve(cleanJson);
});
});
}

function cleanUpNonDML(lineReader) {
let cleanJson = [];
return new Promise(function(resolve, reject) {
lineReader
.on("line", line => {
switch (true) {
case line.indexOf("\t") > 0:
line = line.split("\t");
cleanJson.push({
[line[0].trim()]: line[1].trim()
});
break;
default:
cleanJson.push({
row: line.trim()
});
}
})
.on("close", function() {
resolve(cleanJson);
});
});
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "athena-express",
"version": "2.0.6",
"version": "3.0.0",
"description": "Athena-Express makes it easier to execute SQL queries on Amazon Athena by consolidating & astracting several methods in the AWS SDK",
"main": "./lib/index.js",
"scripts": {
Expand Down

0 comments on commit a558602

Please sign in to comment.