Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Begin Migration to Python 3 #64

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"_comment": "This doesn't appear to get used in ETL at all? TODO: remove if unnecessary.",
"rds_uri": "postgres://$CYBERGREEN_STATS_RDS_NAME:$CYBERGREEN_STATS_RDS_PASSWORD@$CYBERGREEN_RAW_SCAN_RDS_NAME.crovisjepxcd.eu-west-1.rds.amazonaws.com:5432/$CYBERGREEN_STATS_RDS_NAME",
"redshift_uri": "postgres://$CYBERGREEN_REDSHIFT_USER:$CYBERGREEN_REDSHIFT_PASSWORD@$CYBERGREEN_REDSHIFT_CLUSTER_NAME.cqxchced59ta.eu-west-1.redshift.amazonaws.com:5439/$CYBERGREEN_BUILD_ENV",
"rds_uri": "postgresql://$CYBERGREEN_STATS_RDS_NAME:$CYBERGREEN_STATS_RDS_PASSWORD@$CYBERGREEN_RAW_SCAN_RDS_NAME.crovisjepxcd.eu-west-1.rds.amazonaws.com:5432/$CYBERGREEN_STATS_RDS_NAME",
"redshift_uri": "postgresql://$CYBERGREEN_REDSHIFT_USER:$CYBERGREEN_REDSHIFT_PASSWORD@$CYBERGREEN_REDSHIFT_CLUSTER_NAME.cqxchced59ta.eu-west-1.redshift.amazonaws.com:5439/$CYBERGREEN_BUILD_ENV",
"role_arn": "arn:aws:iam::635396214416:role/RedshiftCopyUnload",
"source_path": "$CYBERGREEN_SOURCE_ROOT",
"dest_path": "$CYBERGREEN_DEST_ROOT",
Expand Down
2 changes: 1 addition & 1 deletion load_asn_ref_data.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Sets up environment to load ASN ref data
source ../env

./load_asn_ref_data.py
python ./load_asn_ref_data.py
161 changes: 99 additions & 62 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import print_function

from datapackage import push_datapackage
from datapackage import push_datapackage, Package, Resource
from psycopg2.extensions import AsIs
from sqlalchemy import create_engine
from sqlalchemy import create_engine, text, MetaData, Table, Integer, Column, TIMESTAMP, String, BigInteger, Text, \
Float, Boolean
from os.path import dirname, join
from string import Template
from textwrap import dedent
Expand All @@ -17,6 +18,9 @@
import csv
import os

from sqlalchemy.dialects import postgresql
from sqlalchemy.sql.ddl import CreateTable

#utils
def rpath(*args):
return join(dirname(__file__), *args)
Expand Down Expand Up @@ -84,10 +88,12 @@ def run(self):
shutil.rmtree(self.tmpdir)


def drop_tables(self, cursor, tables):
def drop_tables(self, conn, tables):
cursor = conn.connect()
for tablename in tables:
statement = text("DROP TABLE IF EXISTS :table CASCADE")
cursor.execute(
"DROP TABLE IF EXISTS %(table)s CASCADE",
statement,
{"table": AsIs(tablename)}
)

Expand Down Expand Up @@ -127,35 +133,55 @@ def upload_manifest(self):

def create_tables(self):
conn = self.connRedshift.connect()
tablenames = [
'dim_risk', 'logentry', 'count'
]
self.drop_tables(conn, tablenames)
create_logentry = dedent('''
CREATE TABLE logentry(
date TIMESTAMP, ip VARCHAR(32), risk INT,
asn BIGINT, country VARCHAR(2)
)
''')
create_risk = dedent('''
CREATE TABLE dim_risk(
id INT, slug VARCHAR(32), title VARCHAR(32),
is_archived BOOLEAN,
taxonomy VARCHAR(16), measurement_units VARCHAR(32),
amplification_factor FLOAT, description TEXT
)
''')
create_count = dedent('''
CREATE TABLE count(
date TIMESTAMP, risk INT, country VARCHAR(2),
asn BIGINT, count INT, count_amplified FLOAT
)
''')
conn.execute(create_risk)
conn.execute(create_logentry)
conn.execute(create_count)
conn.close()
logging.info('Redshift tables created')

transaction = conn.begin()

metadata = MetaData()
logentry_table = Table('logentry', metadata,
Column('id', Integer),
Column('date', TIMESTAMP),
Column('ip', String(32)),
Column('risk', Integer),
Column('asn', BigInteger),
Column('country', String(2)))
statement = CreateTable(logentry_table, if_not_exists=True)
print(statement.compile(dialect=postgresql.dialect()))

conn.execute(statement)
transaction.commit()

transaction = conn.begin()
metadata = MetaData()
risk_table = Table('dim_risk', metadata,
Column('id', Integer),
Column('slug', String(32)),
Column('title', String(32)),
Column('is_archived', Boolean),
Column('taxonomy', String(16)),
Column('measurement_units', String(32)),
Column('amplification_factor', Float),
Column('description', Text))
statement = CreateTable(risk_table, if_not_exists=True)
print(statement.compile(dialect=postgresql.dialect()))

conn.execute(statement)
transaction.commit()

transaction = conn.begin()
metadata = MetaData()
count_table = Table('count', metadata,
Column('date', TIMESTAMP),
Column('risk', Integer),
Column('country', String(2)),
Column('asn', BigInteger),
Column('count', Integer),
Column('count_amplified', Float))
statement = CreateTable(count_table, if_not_exists=True)
print(statement.compile(dialect=postgresql.dialect()))

conn.execute(statement)
transaction.commit()
print('Redshift tables created')


def load_data(self):
Expand All @@ -182,14 +208,18 @@ def load_ref_data(self):
if inv.get('name') == 'risk':
url = inv.get('url')
dp = datapackage.DataPackage(url)
risks = dp.resources[0].data
query = dedent('''
INSERT INTO dim_risk
VALUES (%(id)s, %(slug)s, %(title)s, %(is_archived)s, %(taxonomy)s, %(measurement_units)s, %(amplification_factor)s, %(description)s)''')
risks = dp.resources[0].read(keyed=True)
table_name = 'dim_risk'
columns = ', '.join(['id', 'slug', 'title',
'is_archived', 'taxonomy', 'measurement_units',
'amplification_factor', 'description'])

statement = text(f'INSERT INTO {table_name} {columns} VALUES (:id, :slug, :title, :is_archived, :taxonomy, :measurement_units, :amplification_factor, :description)')

for risk in risks:
# description is too long and not needed here
risk['description']=''
conn.execute(query,risk)
conn.execute(statement,risk)
conn.close()


Expand All @@ -210,9 +240,9 @@ def aggregate(self):
FROM(
SELECT DISTINCT (ip), date_trunc('day', date) AS date, risk, asn, country FROM logentry
) AS foo
GROUP BY date, asn, risk, country HAVING count(*) > %(threshold)s ORDER BY date DESC, country ASC, asn ASC, risk ASC)
GROUP BY date, asn, risk, country HAVING count(*) > :threshold ORDER BY date DESC, country ASC, asn ASC, risk ASC)
''')
conn.execute(query, {'threshold': self.country_count_threshold})
conn.execute(text(query), {'threshold': self.country_count_threshold})
conn.close()


Expand All @@ -224,7 +254,7 @@ def update_amplified_count(self):
SET count_amplified = count*amplification_factor
FROM dim_risk WHERE risk=id
''')
conn.execute(query)
conn.execute(text(query))
conn.close()
logging.info('Aggregation Finished!')

Expand Down Expand Up @@ -289,8 +319,11 @@ def run(self):


def drop_tables(self, tables):
conn = self.connRDS.connect()
transaction = conn.begin()
for tablename in tables:
self.connRDS.execute("DROP TABLE IF EXISTS %(table)s CASCADE",{"table": AsIs(tablename)})
conn.execute(text("DROP TABLE IF EXISTS :table CASCADE"),{"table": AsIs(tablename)})
transaction.commit()


def download_and_load(self):
Expand All @@ -304,7 +337,7 @@ def download_and_load(self):
logging.info('Loading into RDS ...')
# TODO: replace shelling out to psql
copy_command = dedent('''
psql {uri} -c "\COPY fact_count FROM {tmp}/count.csv WITH delimiter as ',' null '' csv;"
psql {uri} -c "\\COPY fact_count FROM {tmp}/count.csv WITH delimiter as ',' null '' csv;"
''')
os.system(copy_command.format(tmp=self.tmpdir,uri=self.config.get('rds_uri')))

Expand All @@ -313,29 +346,33 @@ def load_ref_data_rds(self):
logging.info('Loading reference_data to RDS ...')
conn = self.connRDS.connect()
# creating dim_asn table here with other ref data
conn.execute('DROP TABLE IF EXISTS data__asn___asn CASCADE')
create_asn = 'CREATE TABLE data__asn___asn(number BIGINT, title TEXT, country TEXT)'
transaction = conn.begin()
conn.execute(text('DROP TABLE IF EXISTS data__asn___asn'))
create_asn = text('CREATE TABLE data__asn___asn(number BIGINT, title TEXT, country TEXT)')
conn.execute(create_asn)
transaction.commit()

for url in self.ref_data_urls:
print(url)
# Loading of asn with push_datapackage takes more then 2 hours
# So have to download locally and save (takes ~5 seconds)
if 'asn' not in url:
push_datapackage(descriptor=url, backend='sql', engine=conn)
print(f'Skipping {url}')
# push_datapackage(descriptor=url, backend='sql', engine=conn)
else:
dp = datapackage.DataPackage(url)
# local path will be returned if not found remote one (for tests)
if dp.resources[0].remote_data_path:
r = requests.get(dp.resources[0].remote_data_path)
with open(join(self.tmpdir, 'asn.csv'),"wb") as fp:
fp.write(r.content)
else:
shutil.copy(dp.resources[0].local_data_path,join(self.tmpdir, 'asn.csv'))
# TODO: replace shelling out
copy_command = dedent('''
psql {uri} -c "\COPY data__asn___asn FROM {tmp}/asn.csv WITH delimiter as ',' csv header;"
''')
os.system(copy_command.format(tmp=self.tmpdir,uri=self.config.get('rds_uri')))
if len(dp.resources) > 0:
data = dp.resources[0].read()
import csv
with open(join(self.tmpdir, 'asn.csv'),"w") as fp:
fw = csv.writer(fp)
fw.writerows(data)
copy_command = dedent('''
psql {uri} -c "\\COPY data__asn___asn FROM {tmp}/asn.csv WITH delimiter as ',' csv header;"
''')
print(f'Executing psql command {copy_command.format(tmp=self.tmpdir, uri=self.config.get("rds_uri"))}')
os.system(copy_command.format(tmp=self.tmpdir, uri=self.config.get('rds_uri')))
conn.close()


Expand Down Expand Up @@ -367,11 +404,11 @@ def create_tables(self):
count_amplified FLOAT
)''')

conn.execute(create_risk)
conn.execute(create_country)
conn.execute(create_asn)
conn.execute(create_time)
conn.execute(create_count)
conn.execute(text(create_risk))
conn.execute(text(create_country))
conn.execute(text(create_asn))
conn.execute(text(create_time))
conn.execute(text(create_count))
self.create_or_update_cubes(conn, create_cube)
conn.close()

Expand Down
39 changes: 39 additions & 0 deletions models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from sqlalchemy import Column, String, Integer, BigInteger, Boolean, Float, Text, TIMESTAMP
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class LogEntry(Base):
__tablename__ = 'logentry'

id = Column(Integer)
date = Column(TIMESTAMP)
ip = Column(String(32))
risk = Column(Integer)
asn = Column(BigInteger)
country = Column(String(2))


class Risk(Base):
__tablename__ = "dim_risk"

id = Column(Integer)
slug = Column(String(32))
title = Column(String(32))
is_archived = Column(Boolean)
taxonomy = Column(String(16))
measurement_units = Column(String(32))
amplification_factor = Column(Float)
description = Column(Text)


class Count(Base):
__tablename__ = "count"

date = Column(TIMESTAMP)
risk = Column(Integer)
country = Column(String(2))
asn = Column(BigInteger)
count = Column(Integer)
count_amplified = Column(Float)
14 changes: 8 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
datapackage==0.8.4
jsontableschema-sql==0.8.0
SQLAlchemy==1.2.6
psycopg2==2.7.3.2
boto3==1.4.7
rfc3986==0.4.1
datapackage==1.15.2
tableschema-sql==2.0.1
SQLAlchemy==2.0.19
psycopg2==2.9.7
boto3==1.28.22
rfc3986==2.0.0
sqlalchemy-redshift==0.8.14
redshift_connector==2.0.913
Empty file added tests/__init__.py
Empty file.
Loading