Skip to content

Commit

Permalink
Extend type support for dates and timestamps
Browse files Browse the repository at this point in the history
Optionally (defaults to false) cast date, timestamp, and timestamp with
time zone data types to JS Date objects. Query method doesn't expose the
data type forcing developers that want to operate with Date objects to
track which columns are dates and perform the cast themselves.

Dates and timestamps are  casted to JS Date types using Luxon library.
The code uses the method to transform SQL formats, consistent with
Presto/Trino documentation.

The code uses Luxon, and as such, it requires ICU full extension to
work. ICU full mode is available by default starting on Node v13, but
needs to be configured for previous versions of Node.
  • Loading branch information
Serafin Sedano authored and ssedano committed Aug 18, 2021
1 parent 084133a commit 8633c7e
Show file tree
Hide file tree
Showing 6 changed files with 1,260 additions and 8 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ const athenaExpress = new AthenaExpress(athenaExpressConfig);
|skipResults | boolean | `false` | For a unique requirement where a user may only want to execute the query in Athena and store the results in S3 but NOT fetch those results in that moment. <br />Perhaps to be retrieved later or simply stored in S3 for auditing/logging purposes. <br />To retrieve the results, you can simply pass the `QueryExecutionId` into athena-express as such: `athenaExpress.query("ab493e66-138f-4b78-a187-51f43fd5f0eb")` |
|waitForResults | boolean | `true` | When low latency is the objective, you can skip waiting for a query to be completed in Athena. Returns `QueryExecutionId`, which you can pass into athena-express later as such: `athenaExpress.query("ab493e66-138f-4b78-a187-51f43fd5f0eb")` <br /> Not to be confused with `skipResults`, which actually waits for the query to be completed before returning `QueryExecutionId` and other stats. `waitForResults` is meant for fire-and-forget kind of operations. <br /> |
|catalog | string | `null` | The catalog to which the query results belong |
useUtcDates | boolean | (default false) | Cast Date and Timestamp types to Javascript Date objects with timezone in UTC. Set this option if your code runs in machines configured with UTC as their timezone or if you want to work with Date objects in UTC timezone. Useful when your Dates and Timestamp columns are representing UTC date and time and you want to work with Date objects instead of String objects



Expand Down
1 change: 1 addition & 0 deletions lib/athenaExpress.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ module.exports = class AthenaExpress {
QueryExecutionId: null,
pagination: Number(init.pagination) || 0,
NextToken: init.nextToken || null,
useUtcDates: init.useUtcDates !== false,
};
}

Expand Down
39 changes: 34 additions & 5 deletions lib/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//helpers.js

const readline = require("readline"),
csv = require("csvtojson");
csv = require("csvtojson"),
{ DateTime } = require("luxon");

let s3Metadata = null;

Expand Down Expand Up @@ -119,7 +120,7 @@ async function getQueryResultsFromS3(params) {
const input = params.config.s3.getObject(s3Params).createReadStream();
if (params.config.formatJson) {
return {
items: await cleanUpDML(input, params.config.ignoreEmpty),
items: await cleanUpDML(input, params.config),
};
} else {
return { items: await getRawResultsFromS3(input) };
Expand Down Expand Up @@ -182,8 +183,9 @@ function getDataTypes() {
});
}

async function cleanUpDML(input, ignoreEmpty) {
async function cleanUpDML(input, config) {
let cleanJson = [];
const ignoreEmpty = config.ignoreEmpty;
const dataTypes = await getDataTypes();
return new Promise(function (resolve) {
input.pipe(
Expand All @@ -194,7 +196,8 @@ async function cleanUpDML(input, ignoreEmpty) {
cleanJson.push(
addDataType(
JSON.parse(data.toString("utf8")),
dataTypes
dataTypes,
config.useUtcDates
)
);
})
Expand All @@ -205,7 +208,7 @@ async function cleanUpDML(input, ignoreEmpty) {
});
}

function addDataType(input, dataTypes) {
function addDataType(input, dataTypes, useUtcDates) {
let updatedObjectWithDataType = {};

for (const key in input) {
Expand All @@ -226,6 +229,18 @@ function addDataType(input, dataTypes) {
case "double":
updatedObjectWithDataType[key] = Number(input[key]);
break;
case "date":
case "timestamp":
case "timestamp with time zone":
// Avoid excessive object creation by reusing the cast
var inputDate;
if (useUtcDates &&
(inputDate = DateTime.fromSQL(input[key], { zone: 'UTC' })) && !inputDate.invalid) {
updatedObjectWithDataType[key] = inputDate.toJSDate();
} else {
updatedObjectWithDataType[key] = input[key];
}
break;
default:
updatedObjectWithDataType[key] = input[key];
}
Expand Down Expand Up @@ -266,6 +281,9 @@ function validateConstructor(init) {
if (!init)
throw new TypeError("Config object not present in the constructor");

if (init.useUtcDates && !icuSupport(process))
throw new TypeError("Error in configuration: useUtcDates setting requires full ICU support");

try {
let aws = init.s3 ? init.s3 : init.aws.config.credentials.accessKeyId;
let athena = new init.aws.Athena({
Expand All @@ -287,6 +305,17 @@ function isCommonAthenaError(err) {
: false;
}

function icuSupport(proc) {
if (proc && proc.versions && proc.versions['node']) {
let majorVersion = proc.versions['node'].match(/^(\d+)\./)[1];
if (parseInt(majorVersion) >= 13) {
return true;
} else {
return proc.versions.hasOwnProperty('icu');
}
}
}

const lowerCaseKeys = (obj) =>
Object.keys(obj).reduce((acc, key) => {
acc[key.toLowerCase()] = obj[key];
Expand Down
Loading

0 comments on commit 8633c7e

Please sign in to comment.