Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading GOES ABI and GLM from AWS #217

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions siphon/aws_goes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""
USAGE:

from datetime import datetime
from glmtools.feeds.aws import GOESArchiveDownloader, GOESProduct, save_s3_product
startdate = datetime(2018,3,27,23,59,0)
enddate = datetime(2018,3,28,0,1,0)

outpath = '/data/GLM-L2-LCFA_G16_s20180328/'

arc = GOESArchiveDownloader()
ABI_prods = arc.get_range(startdate, enddate, GOESProduct(typ='ABI',
channel=14, sector='conus'))
GLM_prods = arc.get_range(startdate, enddate, GOESProduct(typ='GLM'))

# for s3obj in ABI_prods:
# save_s3_product(s3obj, outpath)
# for s3obj in GLM_prods:
# save_s3_product(s3obj, outpath)
"""


import itertools
from tempfile import NamedTemporaryFile
import os
from datetime import datetime, timedelta

Check warning on line 26 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L23-L26

Added lines #L23 - L26 were not covered by tests

import boto3
import botocore
from botocore.client import Config
from netCDF4 import Dataset

Check warning on line 31 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L28-L31

Added lines #L28 - L31 were not covered by tests


def gen_day_chunks(start, end):

Check warning on line 34 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L34

Added line #L34 was not covered by tests
"""
Given end > start, yield the start, then the day boundaries in between,
and finally the end.
"""
one_day = timedelta(days=1)
remainder = end-start
if remainder.total_seconds() <= 0:
raise ValueError("end time must be larger than start time")
last = start
while remainder.total_seconds() > 0:
yield last
start_next_day = datetime(last.year, last.month, last.day) + one_day
last = start_next_day
remainder = end - last
yield end

Check warning on line 49 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L39-L49

Added lines #L39 - L49 were not covered by tests


def gen_hour_chunks(start, end):

Check warning on line 52 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L52

Added line #L52 was not covered by tests
"""
Given end > start, yield the start, then the hour boundaries in between,
and finally the end.
"""
one_hour = timedelta(days=0, hours=1)
remainder = end-start
if remainder.total_seconds() <= 0:
raise ValueError("end time must be larger than start time")
last = start
while remainder.total_seconds() > 0:
yield last
start_next = datetime(last.year, last.month, last.day, last.hour) + one_hour
last = start_next
remainder = end - last
yield end

Check warning on line 67 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L57-L67

Added lines #L57 - L67 were not covered by tests


def pairwise(iterable):

Check warning on line 70 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L70

Added line #L70 was not covered by tests
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)

Check warning on line 74 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L72-L74

Added lines #L72 - L74 were not covered by tests


class GOESArchiveDownloader(object):
def __init__(self, bucket='noaa-goes16'):
s3 = boto3.resource('s3', config=Config(

Check warning on line 79 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L77-L79

Added lines #L77 - L79 were not covered by tests
signature_version=botocore.UNSIGNED,
user_agent_extra='Resource'))
self._bucket = s3.Bucket(bucket)

Check warning on line 82 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L82

Added line #L82 was not covered by tests

def _get_iter(self, start, product):

Check warning on line 84 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L84

Added line #L84 was not covered by tests
# Set up the 'path' part, which is only the basic instrument category
prod_prefix = product.prefix(start)

Check warning on line 86 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L86

Added line #L86 was not covered by tests
# And the full path including the first part of the filename
start_marker = product.with_start_time(start)
print(prod_prefix, start_marker)
return self._bucket.objects.filter(Marker=start_marker, Prefix=prod_prefix)

Check warning on line 90 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L88-L90

Added lines #L88 - L90 were not covered by tests

def get_next(self, time, product):
return next(iter(self._get_iter(time, product)))

Check warning on line 93 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L92-L93

Added lines #L92 - L93 were not covered by tests

def get_range_in_hour_chunks(self, start, end, product):

Check warning on line 95 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L95

Added line #L95 was not covered by tests
""" We have to go in hourly chunks because the prefix structure
will only filter properly if we do one hour at a time. Otherwise,
the comparison of obj.key <= end_key across hour or day boundaries
will fail, especially for ABI products, where there is a channel to
select.
"""
path, prod_mode, nc_basename = product.key_components()
print("prod_mode is", prod_mode)
end_key = product.with_start_time(end)

Check warning on line 104 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L102-L104

Added lines #L102 - L104 were not covered by tests

# Get a list of files that have the proper prefix up to the hour
return list(itertools.takewhile(lambda obj: (obj.key <= end_key),

Check warning on line 107 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L107

Added line #L107 was not covered by tests
self._get_iter(start, product)))

def get_range(self, start, end, product):
in_range = []
for t0, t1 in pairwise(gen_day_chunks(start, end)):
this_range = self.get_range_in_hour_chunks(t0, t1, product)
in_range.extend(this_range)
return in_range

Check warning on line 115 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L110-L115

Added lines #L110 - L115 were not covered by tests


class GOESProduct(object):
def __init__(self, **kwargs):
self.sector = 'conus'
self.satellite = 'goes16'
self.typ = 'ABI'
self.channel = 1
self.mode = 3
self.keypath = '{prod_id}'
self.__dict__.update(kwargs)

Check warning on line 126 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L118-L126

Added lines #L118 - L126 were not covered by tests

def key_components(self):
env = 'OR'
sat = {'goes16': 'G16', 'goes17': 'G17'}[self.satellite]

Check warning on line 130 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L128-L130

Added lines #L128 - L130 were not covered by tests

if self.typ == 'ABI':
sectors = {'conus': 'C', 'meso1': 'M1', 'meso2': 'M2', 'full': 'F'}
sector = sectors[self.sector]
prod_id = 'ABI-L1b-Rad{sector}'.format(sector=sector)
prod_code = '-M{mode}C{channel:02d}'.format(sector=sector,

Check warning on line 136 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L132-L136

Added lines #L132 - L136 were not covered by tests
mode=self.mode,
channel=self.channel)
prod_mode = prod_id + prod_code
elif self.typ == 'GLM':
prod_id = 'GLM-L2-LCFA'
prod_mode = prod_id

Check warning on line 142 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L139-L142

Added lines #L139 - L142 were not covered by tests
else:
raise ValueError('Unhandled data type: {}'.format(self.typ))
path = prod_id
nc_basename = '{env}_{prodid}_{sat}'.format(env=env, prodid=prod_mode,

Check warning on line 146 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L144-L146

Added lines #L144 - L146 were not covered by tests
sat=sat)
return path, prod_mode, nc_basename

Check warning on line 148 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L148

Added line #L148 was not covered by tests

def prefix(self, time):
path, prod_mode, nc_basename = self.key_components()
return path + '/{time:%Y/%j/%H}/'.format(time=time) + nc_basename

Check warning on line 152 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L150-L152

Added lines #L150 - L152 were not covered by tests

def __str__(self):
path, prod_mode, nc_basename = self.key_components()
return path

Check warning on line 156 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L154-L156

Added lines #L154 - L156 were not covered by tests

__repr__ = __str__

Check warning on line 158 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L158

Added line #L158 was not covered by tests

def with_start_time(self, time):
path, prod_mode, nc_basename = self.key_components()
base = self.prefix(time)
return (base + '_s{time:%Y%j%H%M%S}').format(time=time)

Check warning on line 163 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L160-L163

Added lines #L160 - L163 were not covered by tests


def netcdf_from_s3(s3obj):

Check warning on line 166 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L166

Added line #L166 was not covered by tests
""" Download the data and open (in memory) with netCDF """
with NamedTemporaryFile(suffix='.nc') as temp:

Check warning on line 168 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L168

Added line #L168 was not covered by tests
# Create a temporary netCDF file to work around bug in netCDF C 4.4.1.1
# We shouldn't actually need any file on disk.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an existing bug report for this? Is this fixed now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fixed now.

nc_temp = Dataset(temp.name, 'w').close()
return Dataset(temp.name, memory=s3obj.get()['Body'].read())

Check warning on line 172 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L171-L172

Added lines #L171 - L172 were not covered by tests


def save_s3_product(s3obj, path):

Check warning on line 175 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L175

Added line #L175 was not covered by tests
""" Save an S3 object to path. The filename is extracted from the S3 object
it will overwrite a previous file with the same name.
"""
obj = s3obj
filename = obj.key.split('/')[-1]
outfile = os.path.join(path, filename)
print(filename)
with open(outfile, 'wb') as f:
data = obj.get()['Body'].read()
f.write(data)

Check warning on line 185 in siphon/aws_goes.py

View check run for this annotation

Codecov / codecov/patch

siphon/aws_goes.py#L179-L185

Added lines #L179 - L185 were not covered by tests