-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Checkpoint while collecting metadata #31
base: main
Are you sure you want to change the base?
Changes from all commits
9e25d05
a954e12
89c050a
30a27d1
469ec84
3228e76
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,13 +20,17 @@ | |
|
||
from datetime import datetime | ||
from enum import IntEnum | ||
import os | ||
import logging | ||
import netCDF4 | ||
import sqlalchemy | ||
import numpy as np | ||
import pandas as pd | ||
import sqlite3 | ||
import xarray as xr | ||
from xarray.coding.times import encode_cf_datetime | ||
from sqlalchemy import Column, Integer, Float, String, DateTime | ||
from sqlalchemy.ext.declarative import declarative_base | ||
from sqlalchemy.orm import sessionmaker | ||
|
||
|
||
LOG = logging.getLogger(__package__) | ||
|
@@ -101,6 +105,31 @@ class QualityFlags(IntEnum): | |
'fill_value': None} | ||
] | ||
|
||
CHECKPOINT_STEP = 100 | ||
CHECKPOINT_DB = "_checkpoint.db" | ||
Base = declarative_base() | ||
|
||
|
||
class FileMetadata(Base): | ||
"""Object relational model for SQL database.""" | ||
__tablename__ = 'metadata' | ||
|
||
platform = Column(String) | ||
start_time = Column(DateTime) | ||
end_time = Column(DateTime) | ||
along_track = Column(Integer) | ||
filename = Column(String, primary_key=True) | ||
orbit_number_start = Column(Integer) | ||
orbit_number_end = Column(Integer) | ||
equator_crossing_longitude_1 = Column(Float) | ||
equator_crossing_time_1 = Column(DateTime) | ||
equator_crossing_longitude_2 = Column(Float) | ||
equator_crossing_time_2 = Column(DateTime) | ||
midnight_line = Column(Float) | ||
overlap_free_start = Column(Float) | ||
overlap_free_end = Column(Float) | ||
global_quality_flag = Column(Integer) | ||
|
||
|
||
class MetadataCollector: | ||
"""Collect and complement metadata from level 1c files. | ||
|
@@ -138,47 +167,69 @@ def get_metadata(self, filenames): | |
|
||
def save_sql(self, mda, dbfile, if_exists): | ||
"""Save metadata to sqlite database.""" | ||
con = sqlite3.connect(dbfile) | ||
mda.to_sql(name='metadata', con=con, if_exists=if_exists) | ||
con.commit() | ||
con.close() | ||
engine = sqlalchemy.create_engine('sqlite:///{0}'.format(dbfile)) | ||
mda.to_sql(name="metadata", con=engine, if_exists=if_exists) | ||
|
||
def read_sql(self, dbfile): | ||
"""Read metadata from sqlite database.""" | ||
with sqlite3.connect(dbfile) as con: | ||
mda = pd.read_sql('select * from metadata', con) | ||
mda = mda.set_index(['platform', 'level_1']) | ||
mda.fillna(value=np.nan, inplace=True) | ||
for col in mda.columns: | ||
if 'time' in col: | ||
mda[col] = mda[col].astype('datetime64[ns]') | ||
engine = sqlalchemy.create_engine('sqlite:///{0}'.format(dbfile)) | ||
mda = pd.read_sql_table("metadata", engine, index_col=['platform', 'level_1']) | ||
return mda | ||
|
||
def _extract_fromfile(self, filename): | ||
"""Extract metadata from a single file.""" | ||
LOG.debug('Collecting metadata from {}'.format(filename)) | ||
with xr.open_dataset(filename) as ds: | ||
midnight_line = np.float64(self._get_midnight_line(ds['acq_time'])) | ||
eq_cross_lons, eq_cross_times = self._get_equator_crossings(ds) | ||
metadata = FileMetadata( | ||
platform=ds.attrs['platform'].split('>')[-1].strip(), | ||
start_time=ds['acq_time'].values[0], | ||
end_time=ds['acq_time'].values[-1], | ||
along_track=ds.dims['y'], | ||
filename=filename, | ||
orbit_number_start=ds.attrs['orbit_number_start'], | ||
orbit_number_end=ds.attrs['orbit_number_end'], | ||
equator_crossing_longitude_1=eq_cross_lons[0], | ||
equator_crossing_time_1=eq_cross_times[0], | ||
equator_crossing_longitude_2=eq_cross_lons[0], | ||
equator_crossing_time_2=eq_cross_times[1], | ||
midnight_line=midnight_line, | ||
overlap_free_start=np.nan, | ||
overlap_free_end=np.nan, | ||
global_quality_flag=QualityFlags.OK | ||
) | ||
return metadata | ||
|
||
def _collect_metadata(self, filenames): | ||
"""Collect metadata from the given level 1c files.""" | ||
records = [] | ||
for filename in filenames: | ||
LOG.debug('Collecting metadata from {}'.format(filename)) | ||
with xr.open_dataset(filename) as ds: | ||
midnight_line = np.float64(self._get_midnight_line(ds['acq_time'])) | ||
eq_cross_lons, eq_cross_times = self._get_equator_crossings(ds) | ||
rec = {'platform': ds.attrs['platform'].split('>')[-1].strip(), | ||
'start_time': ds['acq_time'].values[0], | ||
'end_time': ds['acq_time'].values[-1], | ||
'along_track': ds.dims['y'], | ||
'filename': filename, | ||
'orbit_number_start': ds.attrs['orbit_number_start'], | ||
'orbit_number_end': ds.attrs['orbit_number_end'], | ||
'equator_crossing_longitude_1': eq_cross_lons[0], | ||
'equator_crossing_time_1': eq_cross_times[0], | ||
'equator_crossing_longitude_2': eq_cross_lons[1], | ||
'equator_crossing_time_2': eq_cross_times[1], | ||
'midnight_line': midnight_line, | ||
'overlap_free_start': np.nan, | ||
'overlap_free_end': np.nan, | ||
'global_quality_flag': QualityFlags.OK} | ||
records.append(rec) | ||
return records | ||
if os.path.isfile(CHECKPOINT_DB): | ||
LOG.info("Restarting from checkpoint.") | ||
db_url = 'sqlite:///{0}'.format(CHECKPOINT_DB) | ||
engine = sqlalchemy.create_engine(db_url) | ||
Base.metadata.create_all(engine) | ||
# open session | ||
Session = sessionmaker(bind=engine) | ||
session = Session() | ||
# get set of processed files in case of a restart | ||
wanted = set(filenames) | ||
done = set( | ||
filename for filename, in session.query(FileMetadata.filename) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately yes, because the query returns tuples (of one item) and therefore I need to unpack them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have not yet seen a better way to get directly a list for a 1 column query. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok I see. What about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another idea: import itertools as it
done = set(it.chain(*session.query(FileMetadata.filename))) Which is another way to flatten the list of one element tuples. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or another way without any import: done = set(next(zip(*session.query(FileMetadata.filename)))) |
||
) | ||
todo = wanted.difference(done) | ||
for i, filename in enumerate(todo): | ||
metadata = self._extract_fromfile(filename) | ||
session.add(metadata) | ||
if i % CHECKPOINT_STEP == 0: | ||
session.commit() | ||
# clear session cache | ||
session.expire_all() | ||
session.commit() | ||
session.close() | ||
# load data into memory and remove the checkpoint database | ||
metadata = pd.read_sql_table(FileMetadata.__tablename__, engine) | ||
os.remove(CHECKPOINT_DB) | ||
return metadata | ||
|
||
def _get_midnight_line(self, acq_time): | ||
"""Find scanline where the UTC date increases by one day. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the new solution convert time columns in the dataframe to
datetime64
and replace NULL items with NaN? I don't remember why, but I think this was needed for the overlap computation to work correctly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a nice use case for the system tests actually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was a bit surprised by these lines and did a test and it worked, meaning the types were conserved when writing and re-reading into the DataFrame.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic! Curious how the system tests will behave.