From 9e25d0552822d927f2a062f596595491d02bb6f5 Mon Sep 17 00:00:00 2001 From: horn Date: Fri, 11 Sep 2020 09:50:37 +0000 Subject: [PATCH 1/5] checkpointing metadata collection --- pygac_fdr/metadata.py | 124 ++++++++++++++++++++++++++++++------------ 1 file changed, 89 insertions(+), 35 deletions(-) diff --git a/pygac_fdr/metadata.py b/pygac_fdr/metadata.py index 0cf5663..23261a5 100644 --- a/pygac_fdr/metadata.py +++ b/pygac_fdr/metadata.py @@ -20,13 +20,17 @@ from datetime import datetime from enum import IntEnum +import os import logging import netCDF4 +import sqlalchemy import numpy as np import pandas as pd -import sqlite3 import xarray as xr from xarray.coding.times import encode_cf_datetime +from sqlalchemy import Column, Integer, Float, String, DateTime +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker LOG = logging.getLogger(__package__) @@ -101,6 +105,31 @@ class QualityFlags(IntEnum): 'fill_value': None} ] +CHECKPOINT_STEP = 100 +CHECKPOINT_DB = "_checkpoint.db" +Base = declarative_base() + +class FileMetadata(Base): + """Object relational model for SQL database.""" + __tablename__ = 'metadata' + + platform = Column(String) + level_1 = Column(Integer) + start_time = Column(DateTime) + end_time = Column(DateTime) + along_track = Column(Integer) + filename = Column(String, primary_key=True) + orbit_number_start = Column(Integer) + orbit_number_end = Column(Integer) + equator_crossing_longitude_1 = Column(Float) + equator_crossing_time_1 = Column(DateTime) + equator_crossing_longitude_2 = Column(Float) + equator_crossing_time_2 = Column(DateTime) + midnight_line = Column(Float) + overlap_free_start = Column(Float) + overlap_free_end = Column(Float) + global_quality_flag = Column(Integer) + class MetadataCollector: """Collect and complement metadata from level 1c files. @@ -123,6 +152,8 @@ def get_metadata(self, filenames): """Collect and complement metadata from the given level 1c files.""" LOG.info('Collecting metadata') df = pd.DataFrame(self._collect_metadata(filenames)) + + df = pd.read_sql_table("metadata", self.db_engine) df.sort_values(by=['start_time', 'end_time'], inplace=True) # Set quality flags @@ -138,47 +169,70 @@ def get_metadata(self, filenames): def save_sql(self, mda, dbfile, if_exists): """Save metadata to sqlite database.""" - con = sqlite3.connect(dbfile) - mda.to_sql(name='metadata', con=con, if_exists=if_exists) - con.commit() - con.close() + engine = sqlalchemy.create_engine('sqlite:///{0}'.format(dbfile)) + mda.to_sql(FileMetadata.__tablename__, engine, if_exists=if_exists) def read_sql(self, dbfile): """Read metadata from sqlite database.""" - with sqlite3.connect(dbfile) as con: - mda = pd.read_sql('select * from metadata', con) - mda = mda.set_index(['platform', 'level_1']) - mda.fillna(value=np.nan, inplace=True) - for col in mda.columns: - if 'time' in col: - mda[col] = mda[col].astype('datetime64[ns]') + engine = sqlalchemy.create_engine('sqlite:///{0}'.format(dbfile)) + mda = pd.read_sql_table(FileMetadata.__tablename__, engine, + index_col=['platform', 'level_1']) return mda + def _extract_fromfile(self, filename): + LOG.debug('Collecting metadata from {}'.format(filename)) + with xr.open_dataset(filename) as ds: + midnight_line = np.float64(self._get_midnight_line(ds['acq_time'])) + eq_cross_lons, eq_cross_times = self._get_equator_crossings(ds) + metadata = FileMetadata( + platform=ds.attrs['platform'].split('>')[-1].strip(), + level_1=ds['acq_time'].values[0], + start_time=ds['acq_time'].values[-1], + end_time=ds.dims['y'], + along_track=os.path.basename(filename), + filename=ds.attrs['orbit_number_start'], + orbit_number_start=ds.attrs['orbit_number_end'], + orbit_number_end=eq_cross_lons[0], + equator_crossing_longitude_1=eq_cross_times[0], + equator_crossing_time_1=eq_cross_times[0], + equator_crossing_longitude_2=eq_cross_lons[1], + equator_crossing_time_2=eq_cross_times[1], + midnight_line=midnight_line, + overlap_free_start=np.nan, + overlap_free_end=np.nan, + global_quality_flag=QualityFlags.OK + ) + return metadata + def _collect_metadata(self, filenames): """Collect metadata from the given level 1c files.""" - records = [] - for filename in filenames: - LOG.debug('Collecting metadata from {}'.format(filename)) - with xr.open_dataset(filename) as ds: - midnight_line = np.float64(self._get_midnight_line(ds['acq_time'])) - eq_cross_lons, eq_cross_times = self._get_equator_crossings(ds) - rec = {'platform': ds.attrs['platform'].split('>')[-1].strip(), - 'start_time': ds['acq_time'].values[0], - 'end_time': ds['acq_time'].values[-1], - 'along_track': ds.dims['y'], - 'filename': filename, - 'orbit_number_start': ds.attrs['orbit_number_start'], - 'orbit_number_end': ds.attrs['orbit_number_end'], - 'equator_crossing_longitude_1': eq_cross_lons[0], - 'equator_crossing_time_1': eq_cross_times[0], - 'equator_crossing_longitude_2': eq_cross_lons[1], - 'equator_crossing_time_2': eq_cross_times[1], - 'midnight_line': midnight_line, - 'overlap_free_start': np.nan, - 'overlap_free_end': np.nan, - 'global_quality_flag': QualityFlags.OK} - records.append(rec) - return records + if os.path.isfile(CHECKPOINT_DB): + LOG.info("Restarting from checkpoint.") + db_url = 'sqlite:///{0}'.format(CHECKPOINT_DB) + engine = sqlalchemy.create_engine(db_url) + Base.metadata.create_all(engine) + # open session + Session = sessionmaker(bind=engine) + session = Session() + # get set of processed files in case of a restart + wanted = set(map(os.path.basename, filenames)) + done = set( + filename for filename, in session.query(FileMetadata.filename) + ) + todo = wanted.difference(done) + for i, filename in enumerate(todo): + metadata = self._extract_fromfile(filename) + session.add(metadata) + if i % CHECKPOINT_STEP == 0: + session.commit() + # clear session cache + session.expire_all() + session.commit() + session.close() + # load data into memory and remove the checkpoint database + metadata = pd.read_sql_table("metadata", self.db_engine) + os.remove(CHECKPOINT_DB) + return pd.read_sql_table("metadata", self.db_engine) def _get_midnight_line(self, acq_time): """Find scanline where the UTC date increases by one day. From 89c050a1a119120cc41771217dd265900f82b8a1 Mon Sep 17 00:00:00 2001 From: horn Date: Fri, 11 Sep 2020 10:05:29 +0000 Subject: [PATCH 2/5] fix collect metadata --- pygac_fdr/metadata.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pygac_fdr/metadata.py b/pygac_fdr/metadata.py index 23261a5..b7d3e37 100644 --- a/pygac_fdr/metadata.py +++ b/pygac_fdr/metadata.py @@ -153,7 +153,6 @@ def get_metadata(self, filenames): LOG.info('Collecting metadata') df = pd.DataFrame(self._collect_metadata(filenames)) - df = pd.read_sql_table("metadata", self.db_engine) df.sort_values(by=['start_time', 'end_time'], inplace=True) # Set quality flags @@ -230,9 +229,9 @@ def _collect_metadata(self, filenames): session.commit() session.close() # load data into memory and remove the checkpoint database - metadata = pd.read_sql_table("metadata", self.db_engine) + metadata = pd.read_sql_table("metadata", engine) os.remove(CHECKPOINT_DB) - return pd.read_sql_table("metadata", self.db_engine) + return metadata def _get_midnight_line(self, acq_time): """Find scanline where the UTC date increases by one day. From 30a27d1089d6a6dd2ddc555a9d66d6a5dfd6aa88 Mon Sep 17 00:00:00 2001 From: horn Date: Fri, 11 Sep 2020 10:21:08 +0000 Subject: [PATCH 3/5] Fix table name bug. --- pygac_fdr/metadata.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/pygac_fdr/metadata.py b/pygac_fdr/metadata.py index b7d3e37..487b9c7 100644 --- a/pygac_fdr/metadata.py +++ b/pygac_fdr/metadata.py @@ -114,7 +114,6 @@ class FileMetadata(Base): __tablename__ = 'metadata' platform = Column(String) - level_1 = Column(Integer) start_time = Column(DateTime) end_time = Column(DateTime) along_track = Column(Integer) @@ -152,7 +151,6 @@ def get_metadata(self, filenames): """Collect and complement metadata from the given level 1c files.""" LOG.info('Collecting metadata') df = pd.DataFrame(self._collect_metadata(filenames)) - df.sort_values(by=['start_time', 'end_time'], inplace=True) # Set quality flags @@ -169,32 +167,31 @@ def get_metadata(self, filenames): def save_sql(self, mda, dbfile, if_exists): """Save metadata to sqlite database.""" engine = sqlalchemy.create_engine('sqlite:///{0}'.format(dbfile)) - mda.to_sql(FileMetadata.__tablename__, engine, if_exists=if_exists) + mda.to_sql(name="metadata", con=engine, if_exists=if_exists) def read_sql(self, dbfile): """Read metadata from sqlite database.""" engine = sqlalchemy.create_engine('sqlite:///{0}'.format(dbfile)) - mda = pd.read_sql_table(FileMetadata.__tablename__, engine, - index_col=['platform', 'level_1']) + mda = pd.read_sql_table("metadata", engine, index_col=['platform', 'level_1']) return mda def _extract_fromfile(self, filename): + """Extract metadata from a single file.""" LOG.debug('Collecting metadata from {}'.format(filename)) with xr.open_dataset(filename) as ds: midnight_line = np.float64(self._get_midnight_line(ds['acq_time'])) eq_cross_lons, eq_cross_times = self._get_equator_crossings(ds) metadata = FileMetadata( platform=ds.attrs['platform'].split('>')[-1].strip(), - level_1=ds['acq_time'].values[0], - start_time=ds['acq_time'].values[-1], - end_time=ds.dims['y'], - along_track=os.path.basename(filename), - filename=ds.attrs['orbit_number_start'], - orbit_number_start=ds.attrs['orbit_number_end'], - orbit_number_end=eq_cross_lons[0], - equator_crossing_longitude_1=eq_cross_times[0], + start_time=ds['acq_time'].values[0], + end_time=ds['acq_time'].values[-1], + along_track=ds.dims['y'], + filename=filename, + orbit_number_start=ds.attrs['orbit_number_start'], + orbit_number_end=ds.attrs['orbit_number_end'], + equator_crossing_longitude_1=eq_cross_lons[0], equator_crossing_time_1=eq_cross_times[0], - equator_crossing_longitude_2=eq_cross_lons[1], + equator_crossing_longitude_2=eq_cross_lons[0], equator_crossing_time_2=eq_cross_times[1], midnight_line=midnight_line, overlap_free_start=np.nan, @@ -214,7 +211,7 @@ def _collect_metadata(self, filenames): Session = sessionmaker(bind=engine) session = Session() # get set of processed files in case of a restart - wanted = set(map(os.path.basename, filenames)) + wanted = set(filenames) done = set( filename for filename, in session.query(FileMetadata.filename) ) @@ -229,7 +226,7 @@ def _collect_metadata(self, filenames): session.commit() session.close() # load data into memory and remove the checkpoint database - metadata = pd.read_sql_table("metadata", engine) + metadata = pd.read_sql_table(FileMetadata.__tablename__, engine) os.remove(CHECKPOINT_DB) return metadata From 469ec846b9fe744346c652ae64cacaef8e31b0a5 Mon Sep 17 00:00:00 2001 From: horn Date: Fri, 11 Sep 2020 10:36:04 +0000 Subject: [PATCH 4/5] fix whitespaces --- pygac_fdr/metadata.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pygac_fdr/metadata.py b/pygac_fdr/metadata.py index 487b9c7..1da5f20 100644 --- a/pygac_fdr/metadata.py +++ b/pygac_fdr/metadata.py @@ -109,10 +109,11 @@ class QualityFlags(IntEnum): CHECKPOINT_DB = "_checkpoint.db" Base = declarative_base() + class FileMetadata(Base): """Object relational model for SQL database.""" __tablename__ = 'metadata' - + platform = Column(String) start_time = Column(DateTime) end_time = Column(DateTime) From 3228e764fdabcca5912ba2706c702ec033eeb196 Mon Sep 17 00:00:00 2001 From: horn Date: Fri, 11 Sep 2020 11:04:46 +0000 Subject: [PATCH 5/5] add sqlalchemy to requirements --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 202901f..015aa1f 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,8 @@ if __name__ == '__main__': requires = ['setuptools_scm', 'numpy', 'xarray >=0.15.1', 'pandas >=1.0.3', 'netCDF4', - 'h5py', 'pygac >=1.3.1', 'satpy >=0.21.0', 'pyyaml', 'trollsift'] + 'h5py', 'pygac >=1.3.1', 'satpy >=0.21.0', 'pyyaml', 'trollsift', + 'sqlalchemy'] README = open('README.md', 'r').read() setup(name='pygac-fdr', description='Python package for creating a Fundamental Data Record (FDR) of AVHRR GAC '