Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend type support for dates and timestamps #67

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ const athenaExpress = new AthenaExpress(athenaExpressConfig);
|skipResults | boolean | `false` | For a unique requirement where a user may only want to execute the query in Athena and store the results in S3 but NOT fetch those results in that moment. <br />Perhaps to be retrieved later or simply stored in S3 for auditing/logging purposes. <br />To retrieve the results, you can simply pass the `QueryExecutionId` into athena-express as such: `athenaExpress.query("ab493e66-138f-4b78-a187-51f43fd5f0eb")` |
|waitForResults | boolean | `true` | When low latency is the objective, you can skip waiting for a query to be completed in Athena. Returns `QueryExecutionId`, which you can pass into athena-express later as such: `athenaExpress.query("ab493e66-138f-4b78-a187-51f43fd5f0eb")` <br /> Not to be confused with `skipResults`, which actually waits for the query to be completed before returning `QueryExecutionId` and other stats. `waitForResults` is meant for fire-and-forget kind of operations. <br /> |
|catalog | string | `null` | The catalog to which the query results belong |
useUtcDates | boolean | (default false) | Cast Date and Timestamp types to Javascript Date objects with timezone in UTC. Set this option if your code runs in machines configured with UTC as their timezone or if you want to work with Date objects in UTC timezone. Useful when your Dates and Timestamp columns are representing UTC date and time and you want to work with Date objects instead of String objects



Expand Down
1 change: 1 addition & 0 deletions lib/athenaExpress.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ module.exports = class AthenaExpress {
QueryExecutionId: null,
pagination: Number(init.pagination) || 0,
NextToken: init.nextToken || null,
useUtcDates: init.useUtcDates !== false,
};
}

Expand Down
39 changes: 34 additions & 5 deletions lib/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//helpers.js

const readline = require("readline"),
csv = require("csvtojson");
csv = require("csvtojson"),
{ DateTime } = require("luxon");

let s3Metadata = null;

Expand Down Expand Up @@ -119,7 +120,7 @@ async function getQueryResultsFromS3(params) {
const input = params.config.s3.getObject(s3Params).createReadStream();
if (params.config.formatJson) {
return {
items: await cleanUpDML(input, params.config.ignoreEmpty),
items: await cleanUpDML(input, params.config),
};
} else {
return { items: await getRawResultsFromS3(input) };
Expand Down Expand Up @@ -182,8 +183,9 @@ function getDataTypes() {
});
}

async function cleanUpDML(input, ignoreEmpty) {
async function cleanUpDML(input, config) {
let cleanJson = [];
const ignoreEmpty = config.ignoreEmpty;
const dataTypes = await getDataTypes();
return new Promise(function (resolve) {
input.pipe(
Expand All @@ -194,7 +196,8 @@ async function cleanUpDML(input, ignoreEmpty) {
cleanJson.push(
addDataType(
JSON.parse(data.toString("utf8")),
dataTypes
dataTypes,
config.useUtcDates
)
);
})
Expand All @@ -205,7 +208,7 @@ async function cleanUpDML(input, ignoreEmpty) {
});
}

function addDataType(input, dataTypes) {
function addDataType(input, dataTypes, useUtcDates) {
let updatedObjectWithDataType = {};

for (const key in input) {
Expand All @@ -226,6 +229,18 @@ function addDataType(input, dataTypes) {
case "double":
updatedObjectWithDataType[key] = Number(input[key]);
break;
case "date":
case "timestamp":
case "timestamp with time zone":
// Avoid excessive object creation by reusing the cast
var inputDate;
if (useUtcDates &&
(inputDate = DateTime.fromSQL(input[key], { zone: 'UTC' })) && !inputDate.invalid) {
updatedObjectWithDataType[key] = inputDate.toJSDate();
} else {
updatedObjectWithDataType[key] = input[key];
}
break;
default:
updatedObjectWithDataType[key] = input[key];
}
Expand Down Expand Up @@ -266,6 +281,9 @@ function validateConstructor(init) {
if (!init)
throw new TypeError("Config object not present in the constructor");

if (init.useUtcDates && !icuSupport(process))
throw new TypeError("Error in configuration: useUtcDates setting requires full ICU support");

try {
let aws = init.s3 ? init.s3 : init.aws.config.credentials.accessKeyId;
let athena = new init.aws.Athena({
Expand All @@ -287,6 +305,17 @@ function isCommonAthenaError(err) {
: false;
}

function icuSupport(proc) {
if (proc && proc.versions && proc.versions['node']) {
let majorVersion = proc.versions['node'].match(/^(\d+)\./)[1];
if (parseInt(majorVersion) >= 13) {
return true;
} else {
return proc.versions.hasOwnProperty('icu');
}
}
}

const lowerCaseKeys = (obj) =>
Object.keys(obj).reduce((acc, key) => {
acc[key.toLowerCase()] = obj[key];
Expand Down
Loading