-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add citi bike data application #7
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1 @@ | ||
# samples-snowflake | ||
Examples & use-cases around the LocalStack Extension for Snowflake | ||
# |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
.DS_Store | ||
package_modules | ||
node_modules | ||
.snowflake_demo_key | ||
snowflake.log |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
VENV_BIN = python3 -m venv | ||
VENV_DIR ?= .venv | ||
VENV_ACTIVATE = $(VENV_DIR)/bin/activate | ||
VENV_RUN = . $(VENV_ACTIVATE) | ||
|
||
usage: ## Shows usage for this Makefile | ||
@cat Makefile | grep -E '^[a-zA-Z_-]+:.*?## .*$$' | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-15s\033[0m %s\n", $$1, $$2}' | ||
|
||
start: | ||
DOCKER_FLAGS='-e DNS_NAME_PATTERNS_TO_RESOLVE_UPSTREAM=demo-citibike-data.s3.amazonaws.com -e SF_LOG=trace -e SF_CSV_IMPORT_MAX_ROWS=20000' \ | ||
IMAGE_NAME=localstack/snowflake \ | ||
DEBUG=1 \ | ||
localstack start; | ||
|
||
stop: | ||
localstack stop | ||
|
||
install: | ||
npm install | ||
echo "Installed the Node.js dependencies" | ||
|
||
web-start: ## Start the application | ||
npm start | ||
echo "Open the Web application in your browser: http://localhost:3000" | ||
|
||
seed: ## Seed test data into LocalStack Snowflake | ||
# Use these seeds for this quickstart: https://quickstarts.snowflake.com/guide/getting_started_with_snowflake | ||
# snow sql -c local --query "create stage citibike_trips url='s3://snowflake-workshop-lab/citibike-trips/'" | ||
# snow sql -c local --query 'create or replace table trips(tripduration integer,starttime timestamp,stoptime timestamp,start_station_id integer,start_station_name string,start_station_latitude float,start_station_longitude float,end_station_id integer,end_station_name string,end_station_latitude float,end_station_longitude float,bikeid integer,membership_type string,usertype string,birth_year integer,gender integer);' | ||
# snow sql -c local --query 'copy into trips from @citibike_trips file_format=csv PATTERN = '"'"'.*csv.*'"'" | ||
|
||
# see: https://quickstarts.snowflake.com/guide/data_app/index.html#3 | ||
snow sql -c local --query "create stage demo_data url='s3://demo-citibike-data'" | ||
# create 'trips' table and import data from public CSV files | ||
snow sql -c local --query 'create or replace table trips(tripduration integer,starttime timestamp,stoptime timestamp,start_station_id integer,end_station_id integer,bikeid integer,usertype string,birth_year integer,gender integer);' | ||
snow sql -c local --query 'copy into trips from @demo_data file_format=(type=csv skip_header=1) PATTERN = '"'"'trips__0_0_0.*csv.*'"'" | ||
# create 'weather' table and import data from public CSV files | ||
snow sql -c local --query 'create or replace table weather(STATE TEXT,OBSERVATION_DATE DATE,DAY_OF_YEAR NUMBER,TEMP_MIN_F NUMBER,TEMP_MAX_F NUMBER,TEMP_AVG_F NUMBER,TEMP_MIN_C FLOAT,TEMP_MAX_C FLOAT,TEMP_AVG_C FLOAT,TOT_PRECIP_IN NUMBER,TOT_SNOWFALL_IN NUMBER,TOT_SNOWDEPTH_IN NUMBER,TOT_PRECIP_MM NUMBER,TOT_SNOWFALL_MM NUMBER,TOT_SNOWDEPTH_MM NUMBER);' | ||
snow sql -c local --query 'copy into weather from @demo_data file_format=(type=csv skip_header=1) PATTERN = '"'"'weather__0_2_0.*csv.*'"'" | ||
|
||
|
||
.PHONY: usage venv install deploy seed |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
# Running Data Application workloads in LocalStack Snowflake | ||
|
||
Note: This sample application has been copied and adapted from its original version here: https://github.com/Snowflake-Labs/sfguide-data-apps-demo | ||
|
||
## Overview | ||
|
||
The demo features a single-page web application (powered by Node.js) displaying several charts. The chart data is sourced from Snowflake using Citibike data. The web server backend connects to the LocalStack Snowflake emulation running locally via the Node.js Snowflake SDK. | ||
|
||
## Prerequisites | ||
|
||
- [`localstack` CLI](https://docs.localstack.cloud/getting-started/installation/#localstack-cli) with [`LOCALSTACK_AUTH_TOKEN`](https://docs.localstack.cloud/getting-started/auth-token/) environment variable set | ||
- [LocalStack Snowflake emulator](https://snowflake.localstack.cloud/getting-started/installation/) | ||
- [Snowflake CLI](https://github.com/snowflakedb/snowflake-cli) with a `local` profile configured (more details below) | ||
- Node.js & `npm` installed | ||
|
||
## Instructions | ||
|
||
### Start LocalStack | ||
|
||
Start the LocalStack Snowflake emulator using the following command: | ||
|
||
```bash | ||
DOCKER_FLAGS='-e DNS_NAME_PATTERNS_TO_RESOLVE_UPSTREAM=demo-citibike-data.s3.amazonaws.com -e SF_LOG=trace -e SF_CSV_IMPORT_MAX_ROWS=20000' \ | ||
IMAGE_NAME=localstack/snowflake \ | ||
DEBUG=1 \ | ||
localstack start | ||
``` | ||
|
||
In the above command, we set the `DNS_NAME_PATTERNS_TO_RESOLVE_UPSTREAM` environment variable to resolve the `demo-citibike-data.s3.amazonaws.com` domain to the real S3 service, in order to download the Citibike data. We also set the `SF_CSV_IMPORT_MAX_ROWS` to `20000` to limit the number of rows imported from the CSV file (for testing purposes). | ||
|
||
### Install the dependencies | ||
|
||
Run the following command to install the dependencies: | ||
|
||
```bash | ||
npm install | ||
``` | ||
|
||
### Configure Snowflake CLI | ||
|
||
To seed the Citibike data into Snowflake, you need to configure the Snowflake CLI with a `local` profile. You can use the following command to create a new profile: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I've noticed that we're using the profile name (no strong preference, but maybe we could unify the docs around |
||
|
||
```bash | ||
snow connection add \ | ||
--connection-name local \ | ||
--user test \ | ||
--password test \ | ||
--account test \ | ||
--host snowflake.localhost.localstack.cloud | ||
``` | ||
|
||
To test the connection, you can run the following command: | ||
|
||
```bash | ||
snow connection test --connection local | ||
``` | ||
|
||
### Seed the Citibike data | ||
|
||
Run the following command to seed the Citibike data into Snowflake: | ||
|
||
```bash | ||
snow sql -c local --query "create stage demo_data url='s3://demo-citibike-data'" | ||
snow sql -c local --query 'create or replace table trips(tripduration integer,starttime timestamp,stoptime timestamp,start_station_id integer,end_station_id integer,bikeid integer,usertype string,birth_year integer,gender integer);' | ||
snow sql -c local --query 'copy into trips from @demo_data file_format=(type=csv skip_header=1) PATTERN = '"'"'trips__0_0_0.*csv.*'"'" | ||
snow sql -c local --query 'create or replace table weather(STATE TEXT,OBSERVATION_DATE DATE,DAY_OF_YEAR NUMBER,TEMP_MIN_F NUMBER,TEMP_MAX_F NUMBER,TEMP_AVG_F NUMBER,TEMP_MIN_C FLOAT,TEMP_MAX_C FLOAT,TEMP_AVG_C FLOAT,TOT_PRECIP_IN NUMBER,TOT_SNOWFALL_IN NUMBER,TOT_SNOWDEPTH_IN NUMBER,TOT_PRECIP_MM NUMBER,TOT_SNOWFALL_MM NUMBER,TOT_SNOWDEPTH_MM NUMBER);' | ||
snow sql -c local --query 'copy into weather from @demo_data file_format=(type=csv skip_header=1) PATTERN = '"'"'weather__0_2_0.*csv.*'"'" | ||
``` | ||
|
||
### Start the web server | ||
|
||
Start the web server using the following command: | ||
|
||
```bash | ||
npm start | ||
``` | ||
|
||
You can now access the web application at http://localhost:3000. | ||
|
||
## License | ||
|
||
This sample application is published under the Apache 2.0 License (see [`LICENSE`](../LICENSE) file). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
const http = require('http'); | ||
const express = require('express'); | ||
const path = require('path'); | ||
const morgan = require('morgan'); | ||
const timeout = require('connect-timeout') | ||
const {PORT = 3000} = process.env | ||
|
||
// create express app, set up json parsing and logging | ||
const app = express(); | ||
app.use(timeout('60s')); | ||
app.use(express.json()); | ||
app.use(morgan('dev')) | ||
|
||
// static assets directory | ||
app.use(express.static(path.join(__dirname, 'static'))); | ||
|
||
// custom middleware for parsing querystring params | ||
const {parseDates, | ||
randomizer} = require('./middleware/parse_dates'); | ||
app.use(parseDates); | ||
app.use(randomizer); | ||
|
||
// api routes | ||
const {getTripsByMonth, | ||
getTripsByDayOfWeek, | ||
getTripsByTemperature} = require('./routes/api/trips'); | ||
|
||
// router (homepage) | ||
app.get('/', (req,res) => { | ||
res.sendFile(path.join(__dirname+'/views/index.html')); | ||
}); | ||
|
||
// router (api calls) | ||
app.get('/trips/monthly', getTripsByMonth); | ||
app.get('/trips/day_of_week', getTripsByDayOfWeek); | ||
app.get('/trips/temperature', getTripsByTemperature); | ||
|
||
// create server | ||
const server = http.createServer(app); | ||
server.listen(PORT); | ||
console.debug('...server listening on port ' + PORT); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* | ||
* Configuration File Setup | ||
* 1. Create a copy of this file named "config.js" in same directory | ||
* 2. Edit values in <>s in "config.js" corresponding to your snowflake demo | ||
* instance | ||
*/ | ||
var config = { | ||
// snowflake_account: 'test', | ||
snowflake_user: 'test', | ||
// note: this is only a generated test key, can be shared publicly | ||
snowflake_private_key: `-----BEGIN RSA PRIVATE KEY----- | ||
MIICXQIBAAKBgQC4Rca7RU+O5oTE87fquFk0Ik7VYQJTBcQ84nauz//5OyH+ILXFsRHBaokMPOCONRAZrGbTZ7/sJ6OH/WYtKpELJKEXL5r35trHGmgGZDw6x6ht+CKXO+yt9HP8cQ2BVwdF/vRwhgTmAG4PLZwY3MP3DJ+oEpEgvWDp97n4hwmfsQIDAQABAoGAQFxOnxYERZsKAGhHcnnU3jqlYi2xzCGVetZ2MXSAqSIYK1RtaJEB7JYzK80PeVvbNbxbZbc091yY52SADYJyieOm2GjeIt7FwMM6EX6u/gINxEIfkTR/5+6m6xlaV/IWTlsHUgKTq+R+/ahqUsfuitCUycC92BwEib6YKW0aiAkCQQDqUCg/aIcdtPlFSv8U+8LFfgyrZfhdiLbmOi3oJWl93gHgKhNnpnshodXPVx7aP4zBwADBMwtP3cQJ2+b3wLIDAkEAyVPzy5jdqYjxiv/aUcYhnLUf1I5qhvbqvzxFIEasGPlN8nvAY2ZJI0F/OstTrf8pXlDUxknrtppp00XsGg4zOwJBAN9GtrOFdYt3UjkXd+6U4Uq0DHqfVoY6qp7EPc6DJ/0KuprTPV59o8OupUFIcVvjRsuxnIZ9j3/xgMcsRvE7K+UCQGQAgGOMDeaNMDVz+tSNjtqRGTtydjWN5nKRFGEA2bEZ/H7Ku3hkMUYC3ZitsGYIDVtc2SOZSi0MrC4WWD1k+ksCQQCFHz01BskO44wsrzDg7G6q8/ok+an4ayr1n3q0IrpV8JrANkm6VF9LltFvfnt2zDFe6+Ceb52Poqs4Cm5LJYmX | ||
-----END RSA PRIVATE KEY-----`, | ||
snowflake_database: 'test', | ||
snowflake_schema: 'test', | ||
snowflake_warehouse: 'test', | ||
snowflake_url: 'http://snowflake.localhost.localstack.cloud:4566', | ||
snowflake_host: 'snowflake.localhost.localstack.cloud', | ||
pool_max: 200 | ||
} | ||
module.exports = config; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
const snowflake = require('snowflake-sdk') | ||
const genericPool = require("generic-pool"); | ||
const config = require('../config') | ||
const crypto = require('crypto') | ||
|
||
const factory = { | ||
create: () => { | ||
return new Promise((resolve, reject) => { | ||
// Create Connection | ||
const connection = snowflake.createConnection({ | ||
timeout: 30 * 1000, | ||
account: config.snowflake_account, | ||
username: config.snowflake_user, | ||
authenticator: 'SNOWFLAKE_JWT', | ||
privateKey: crypto.createPrivateKey({ | ||
key: config.snowflake_private_key.trim(), | ||
format: 'pem' | ||
}).export({ | ||
format: 'pem', | ||
type: 'pkcs8' | ||
}), | ||
database: config.snowflake_database, | ||
warehouse: config.snowflake_warehouse, | ||
accessUrl: config.snowflake_url, | ||
host: config.snowflake_host, | ||
clientSessionKeepAlive: true | ||
}); | ||
// Try to connect to Snowflake, and check whether the connection was successful. | ||
connection.connect((err, conn) => { | ||
if (err) { | ||
console.error('Unable to connect: ' + err.message); | ||
reject(new Error(err.message)); | ||
} else { | ||
console.log('Successfully connected to Snowflake, ID:', conn.getId()); | ||
resolve(conn); | ||
} | ||
}); | ||
}); | ||
}, | ||
destroy: (connection) => { | ||
return new Promise((resolve, reject) => { | ||
connection.destroy((err, conn) => { | ||
if (err) { | ||
console.error('Unable to disconnect: ' + err.message); | ||
} else { | ||
console.log('Disconnected connection with id: ' + conn.getId()); | ||
} | ||
resolve(); // Always resolve for destroy | ||
}); | ||
}); | ||
}, | ||
validate: (connection) => { | ||
return new Promise((resolve, reject) => { | ||
resolve(connection.isUp()); | ||
}); | ||
} | ||
}; | ||
|
||
const opts = { | ||
max: config.pool_max, // Maximum size of the pool, | ||
min: 2, // Minimum size of the pool, | ||
testOnBorrow: false, // Validate connection before acquiring it | ||
acquireTimeoutMillis: 60000, // Timeout to acquire connection | ||
evictionRunIntervalMillis: 900000, // Check every 15 min for ideal connection | ||
numTestsPerEvictionRun: 4, // Check only 4 connections every 15 min | ||
idleTimeoutMillis: 10800000 // Evict only if connection is idle for 3 hrs | ||
}; | ||
|
||
const myPool = genericPool.createPool(factory, opts); | ||
|
||
const query = (sqlText, bindParams = []) => { | ||
return new Promise((resolve, reject) => { | ||
// Acquire connection from pool | ||
myPool.acquire().then(connection => { | ||
// Execute the query | ||
connection.execute({ | ||
sqlText: sqlText, | ||
binds: bindParams, | ||
complete: (err, stmt, rows) => { | ||
// console.log(`Conn: ${connection.getId()} fetched ${rows && rows.length} rows`); | ||
// Return result | ||
err ? reject(new Error(err.message)) : resolve(rows); | ||
// Return connection back to pool | ||
myPool.release(connection); | ||
} | ||
}); | ||
}).catch(err => reject(new Error(err.message))); | ||
}); | ||
} | ||
|
||
module.exports = { query }; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
const moment = require('moment') | ||
|
||
function randomValueBetween(min, max) { | ||
return Math.random() * (max - min) + min; | ||
} | ||
|
||
// utility fn to get two random days in successive order | ||
function randomizeDates() { | ||
// min and max dates | ||
const minDate = new Date('2019-05-01'); | ||
const maxDate = new Date('2020-05-01'); | ||
// randomize two dates | ||
var randomDate = randomValueBetween(minDate.getTime(), maxDate.getTime()); | ||
var randomDate2 = randomValueBetween(minDate.getTime(), maxDate.getTime()); | ||
// format dates | ||
randomDateFormatted = moment(new Date(randomDate)).format('L'); | ||
randomDateFormatted2 = moment(new Date(randomDate2)).format('L'); | ||
// return dates with earlier date first | ||
if (randomDate > randomDate2) { | ||
return [randomDateFormatted2, randomDateFormatted]; | ||
} | ||
return [randomDateFormatted, randomDateFormatted2]; | ||
} | ||
|
||
|
||
module.exports = { | ||
// pull out query string params for start and end of date ranges | ||
parseDates: (req, res, next) => { | ||
if ("start" in req.query && "end" in req.query) { | ||
req.start_range = req.query.start; | ||
req.end_range = req.query.end; | ||
} else if ("start" in req.query && "end" in req.query === false) { | ||
console.debug("Using only start for date range not implemented."); | ||
res.status(400).end(); | ||
return; | ||
} else if ("end" in req.query && "start" in req.query === false) { | ||
console.debug("Using only end for date range not implemented."); | ||
res.status(400).end(); | ||
return; | ||
} else { | ||
req.start_range = null; | ||
req.end_range = null; | ||
} | ||
next(); | ||
}, | ||
randomizer: (req, res, next) => { | ||
if ("random" in req.query && req.query.random === 'true') { | ||
// randomize dates between 6/1/2013 and 5/01/2020 | ||
randomDates = randomizeDates(); | ||
req.start_range = randomDates[0]; | ||
req.end_range = randomDates[1]; | ||
} | ||
next(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a short top-level README could help, actually - maybe we could add something in here in a follow-up iteration. (but not critical, shouldn't block the merge..)