Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint while collecting metadata #31

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 86 additions & 35 deletions pygac_fdr/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

from datetime import datetime
from enum import IntEnum
import os
import logging
import netCDF4
import sqlalchemy
import numpy as np
import pandas as pd
import sqlite3
import xarray as xr
from xarray.coding.times import encode_cf_datetime
from sqlalchemy import Column, Integer, Float, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


LOG = logging.getLogger(__package__)
Expand Down Expand Up @@ -101,6 +105,31 @@ class QualityFlags(IntEnum):
'fill_value': None}
]

CHECKPOINT_STEP = 100
CHECKPOINT_DB = "_checkpoint.db"
Base = declarative_base()


class FileMetadata(Base):
"""Object relational model for SQL database."""
__tablename__ = 'metadata'

platform = Column(String)
start_time = Column(DateTime)
end_time = Column(DateTime)
along_track = Column(Integer)
filename = Column(String, primary_key=True)
orbit_number_start = Column(Integer)
orbit_number_end = Column(Integer)
equator_crossing_longitude_1 = Column(Float)
equator_crossing_time_1 = Column(DateTime)
equator_crossing_longitude_2 = Column(Float)
equator_crossing_time_2 = Column(DateTime)
midnight_line = Column(Float)
overlap_free_start = Column(Float)
overlap_free_end = Column(Float)
global_quality_flag = Column(Integer)


class MetadataCollector:
"""Collect and complement metadata from level 1c files.
Expand Down Expand Up @@ -138,47 +167,69 @@ def get_metadata(self, filenames):

def save_sql(self, mda, dbfile, if_exists):
"""Save metadata to sqlite database."""
con = sqlite3.connect(dbfile)
mda.to_sql(name='metadata', con=con, if_exists=if_exists)
con.commit()
con.close()
engine = sqlalchemy.create_engine('sqlite:///{0}'.format(dbfile))
mda.to_sql(name="metadata", con=engine, if_exists=if_exists)

def read_sql(self, dbfile):
"""Read metadata from sqlite database."""
with sqlite3.connect(dbfile) as con:
mda = pd.read_sql('select * from metadata', con)
mda = mda.set_index(['platform', 'level_1'])
mda.fillna(value=np.nan, inplace=True)
for col in mda.columns:
if 'time' in col:
mda[col] = mda[col].astype('datetime64[ns]')
Comment on lines -148 to -154
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the new solution convert time columns in the dataframe to datetime64 and replace NULL items with NaN? I don't remember why, but I think this was needed for the overlap computation to work correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice use case for the system tests actually

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit surprised by these lines and did a test and it worked, meaning the types were conserved when writing and re-reading into the DataFrame.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic! Curious how the system tests will behave.

engine = sqlalchemy.create_engine('sqlite:///{0}'.format(dbfile))
mda = pd.read_sql_table("metadata", engine, index_col=['platform', 'level_1'])
return mda

def _extract_fromfile(self, filename):
"""Extract metadata from a single file."""
LOG.debug('Collecting metadata from {}'.format(filename))
with xr.open_dataset(filename) as ds:
midnight_line = np.float64(self._get_midnight_line(ds['acq_time']))
eq_cross_lons, eq_cross_times = self._get_equator_crossings(ds)
metadata = FileMetadata(
platform=ds.attrs['platform'].split('>')[-1].strip(),
start_time=ds['acq_time'].values[0],
end_time=ds['acq_time'].values[-1],
along_track=ds.dims['y'],
filename=filename,
orbit_number_start=ds.attrs['orbit_number_start'],
orbit_number_end=ds.attrs['orbit_number_end'],
equator_crossing_longitude_1=eq_cross_lons[0],
equator_crossing_time_1=eq_cross_times[0],
equator_crossing_longitude_2=eq_cross_lons[0],
equator_crossing_time_2=eq_cross_times[1],
midnight_line=midnight_line,
overlap_free_start=np.nan,
overlap_free_end=np.nan,
global_quality_flag=QualityFlags.OK
)
return metadata

def _collect_metadata(self, filenames):
"""Collect metadata from the given level 1c files."""
records = []
for filename in filenames:
LOG.debug('Collecting metadata from {}'.format(filename))
with xr.open_dataset(filename) as ds:
midnight_line = np.float64(self._get_midnight_line(ds['acq_time']))
eq_cross_lons, eq_cross_times = self._get_equator_crossings(ds)
rec = {'platform': ds.attrs['platform'].split('>')[-1].strip(),
'start_time': ds['acq_time'].values[0],
'end_time': ds['acq_time'].values[-1],
'along_track': ds.dims['y'],
'filename': filename,
'orbit_number_start': ds.attrs['orbit_number_start'],
'orbit_number_end': ds.attrs['orbit_number_end'],
'equator_crossing_longitude_1': eq_cross_lons[0],
'equator_crossing_time_1': eq_cross_times[0],
'equator_crossing_longitude_2': eq_cross_lons[1],
'equator_crossing_time_2': eq_cross_times[1],
'midnight_line': midnight_line,
'overlap_free_start': np.nan,
'overlap_free_end': np.nan,
'global_quality_flag': QualityFlags.OK}
records.append(rec)
return records
if os.path.isfile(CHECKPOINT_DB):
LOG.info("Restarting from checkpoint.")
db_url = 'sqlite:///{0}'.format(CHECKPOINT_DB)
engine = sqlalchemy.create_engine(db_url)
Base.metadata.create_all(engine)
# open session
Session = sessionmaker(bind=engine)
session = Session()
# get set of processed files in case of a restart
wanted = set(filenames)
done = set(
filename for filename, in session.query(FileMetadata.filename)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that , intended?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately yes, because the query returns tuples (of one item) and therefore I need to unpack them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have not yet seen a better way to get directly a list for a 1 column query.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see. What about for filename, _ in ... ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea:

import itertools as it
done = set(it.chain(*session.query(FileMetadata.filename)))

Which is another way to flatten the list of one element tuples.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or another way without any import:

done = set(next(zip(*session.query(FileMetadata.filename))))

)
todo = wanted.difference(done)
for i, filename in enumerate(todo):
metadata = self._extract_fromfile(filename)
session.add(metadata)
if i % CHECKPOINT_STEP == 0:
session.commit()
# clear session cache
session.expire_all()
session.commit()
session.close()
# load data into memory and remove the checkpoint database
metadata = pd.read_sql_table(FileMetadata.__tablename__, engine)
os.remove(CHECKPOINT_DB)
return metadata

def _get_midnight_line(self, acq_time):
"""Find scanline where the UTC date increases by one day.
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@

if __name__ == '__main__':
requires = ['setuptools_scm', 'numpy', 'xarray >=0.15.1', 'pandas >=1.0.3', 'netCDF4',
'h5py', 'pygac >=1.3.1', 'satpy >=0.21.0', 'pyyaml', 'trollsift']
'h5py', 'pygac >=1.3.1', 'satpy >=0.21.0', 'pyyaml', 'trollsift',
'sqlalchemy']
README = open('README.md', 'r').read()
setup(name='pygac-fdr',
description='Python package for creating a Fundamental Data Record (FDR) of AVHRR GAC '
Expand Down