diff --git a/examples/daily/aggregate_regional.py b/examples/daily/aggregate_regional.py index af6080f..e842311 100644 --- a/examples/daily/aggregate_regional.py +++ b/examples/daily/aggregate_regional.py @@ -8,32 +8,34 @@ The code is licensed under the MIT license. """ -from datetime import datetime -import matplotlib.pyplot as plt -from meteostat import Stations, Daily +if __name__ == '__main__': -# Configuration -Daily.max_threads = 5 + from datetime import datetime + import matplotlib.pyplot as plt + from meteostat import Stations, Daily -# Time period -start = datetime(1980, 1, 1) -end = datetime(2019, 12, 31) + # Configuration + Daily.cores = 12 -# Get random weather stations in the US -stations = Stations() -stations = stations.region('US') -stations = stations.inventory('daily', (start, end)) -stations = stations.fetch(limit=20, sample=True) + # Time period + start = datetime(1980, 1, 1) + end = datetime(2019, 12, 31) -# Get daily data -data = Daily(stations, start, end) + # Get random weather stations in the US + stations = Stations() + stations = stations.region('US') + stations = stations.inventory('daily', (start, end)) + stations = stations.fetch(limit=50, sample=True) -# Normalize & aggregate -data = data.normalize().aggregate('1Y', spatial=True).fetch() + # Get daily data + data = Daily(stations, start, end) -# Chart title -TITLE = 'Average US Annual Temperature from 1980 to 2019' + # Normalize & aggregate + data = data.normalize().aggregate('1Y', spatial=True).fetch() -# Plot chart -data.plot(y=['tavg'], title=TITLE) -plt.show() + # Chart title + TITLE = 'Average US Annual Temperature from 1980 to 2019' + + # Plot chart + data.plot(y=['tavg'], title=TITLE) + plt.show() diff --git a/examples/hourly/performance.py b/examples/hourly/performance.py new file mode 100644 index 0000000..a04191d --- /dev/null +++ b/examples/hourly/performance.py @@ -0,0 +1,34 @@ +""" +Example: Hourly point data performance + +Meteorological data provided by Meteostat (https://dev.meteostat.net) +under the terms of the Creative Commons Attribution-NonCommercial +4.0 International Public License. + +The code is licensed under the MIT license. +""" + +if __name__ == '__main__': + + from timeit import default_timer as timer + + # Get start time + s = timer() + + # Run script + from datetime import datetime + from meteostat import Hourly + + Hourly.cores = 12 + + start = datetime(1960, 1, 1) + end = datetime(2021, 1, 1, 23, 59) + + data = Hourly('10637', start, end, timezone='Europe/Berlin') + data = data.fetch() + + # Get end time + e = timer() + + # Print performance + print(e - s) diff --git a/meteostat/__init__.py b/meteostat/__init__.py index 27f4a17..566681d 100644 --- a/meteostat/__init__.py +++ b/meteostat/__init__.py @@ -12,8 +12,10 @@ """ __appname__ = 'meteostat' -__version__ = '1.4.6' +__version__ = '1.5.0' +from .interface.base import Base +from .interface.timeseries import Timeseries from .interface.stations import Stations from .interface.point import Point from .interface.hourly import Hourly diff --git a/meteostat/core/cache.py b/meteostat/core/cache.py index f0cf59a..3b843ff 100644 --- a/meteostat/core/cache.py +++ b/meteostat/core/cache.py @@ -42,7 +42,10 @@ def file_in_cache( # Make sure the cache directory exists if not os.path.exists(directory): - os.makedirs(directory) + try: + os.makedirs(directory) + except FileExistsError: + pass # Return the file path if it exists if os.path.isfile(path) and time.time() - \ diff --git a/meteostat/core/config.py b/meteostat/core/config.py deleted file mode 100644 index 5c0b8cb..0000000 --- a/meteostat/core/config.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -Core Class - Configuration - -Meteorological data provided by Meteostat (https://dev.meteostat.net) -under the terms of the Creative Commons Attribution-NonCommercial -4.0 International Public License. - -The code is licensed under the MIT license. -""" - -import os - -# Base URL of the Meteostat bulk data interface -endpoint: str = 'https://bulk.meteostat.net/v2/' - -# Location of the cache directory -cache_dir: str = os.path.expanduser( - '~') + os.sep + '.meteostat' + os.sep + 'cache' - -# Maximum age of a cached file in seconds -max_age: int = 24 * 60 * 60 - -# Maximum number of threads used for downloading files -max_threads: int = 1 diff --git a/meteostat/core/loader.py b/meteostat/core/loader.py index 9a179c0..efb1441 100644 --- a/meteostat/core/loader.py +++ b/meteostat/core/loader.py @@ -9,6 +9,7 @@ """ from urllib.error import HTTPError +from multiprocessing import Pool from multiprocessing.pool import ThreadPool from typing import Callable import pandas as pd @@ -18,27 +19,49 @@ def processing_handler( datasets: list, load: Callable[[dict], None], - max_threads: int + cores: int, + threads: int ) -> None: """ - Load multiple datasets simultaneously + Load multiple datasets (simultaneously) """ - # Single-thread processing - if max_threads < 2: + # Data output + output = [] - for dataset in datasets: - load(*dataset) + # Multi-core processing + if cores > 1 and len(datasets) > 1: + + # Create process pool + with Pool(cores) as pool: + + # Process datasets in pool + output = pool.starmap(load, datasets) + + # Wait for Pool to finish + pool.close() + pool.join() # Multi-thread processing + elif threads > 1 and len(datasets) > 1: + + # Create process pool + with ThreadPool(cores) as pool: + + # Process datasets in pool + output = pool.starmap(load, datasets) + + # Wait for Pool to finish + pool.close() + pool.join() + + # Single-thread processing else: - pool = ThreadPool(max_threads) - pool.starmap(load, datasets) + for dataset in datasets: + output.append(load(*dataset)) - # Wait for Pool to finish - pool.close() - pool.join() + return pd.concat(output) def load_handler( diff --git a/meteostat/interface/base.py b/meteostat/interface/base.py index d258175..35e6461 100644 --- a/meteostat/interface/base.py +++ b/meteostat/interface/base.py @@ -8,6 +8,8 @@ The code is licensed under the MIT license. """ +import os + class Base: @@ -15,5 +17,18 @@ class Base: Base class that provides features which are used across the package """ - # Import configuration - from meteostat.core.config import endpoint, cache_dir, max_age, max_threads + # Base URL of the Meteostat bulk data interface + endpoint: str = 'https://bulk.meteostat.net/v2/' + + # Location of the cache directory + cache_dir: str = os.path.expanduser( + '~') + os.sep + '.meteostat' + os.sep + 'cache' + + # Maximum age of a cached file in seconds + max_age: int = 24 * 60 * 60 + + # Number of cores used for processing files + cores: int = 1 + + # Number of threads used for processing files + threads: int = 1 diff --git a/meteostat/interface/daily.py b/meteostat/interface/daily.py index feebbbe..4e21b0b 100644 --- a/meteostat/interface/daily.py +++ b/meteostat/interface/daily.py @@ -16,11 +16,11 @@ from meteostat.core.loader import processing_handler, load_handler from meteostat.utilities.validations import validate_series from meteostat.utilities.aggregations import degree_mean, weighted_average -from meteostat.interface.base import Base +from meteostat.interface.timeseries import Timeseries from meteostat.interface.point import Point -class Daily(Base): +class Daily(Timeseries): """ Retrieve daily weather observations for one or multiple weather stations or @@ -30,21 +30,6 @@ class Daily(Base): # The cache subdirectory cache_subdir: str = 'daily' - # The list of weather Stations - _stations: pd.Index = None - - # The start date - _start: datetime = None - - # The end date - _end: datetime = None - - # Include model data? - _model: bool = True - - # The data frame - _data: pd.DataFrame = pd.DataFrame() - # Default frequency _freq: str = '1D' @@ -143,14 +128,11 @@ def _load( # Get time index time = df.index.get_level_values('time') - # Filter & append - self._data = self._data.append( - df.loc[(time >= self._start) & (time <= self._end)]) + # Filter & return + return df.loc[(time >= self._start) & (time <= self._end)] - else: - - # Append - self._data = self._data.append(df) + # Return + return df def _get_data(self) -> None: """ @@ -168,12 +150,11 @@ def _get_data(self) -> None: )) # Data Processing - processing_handler(datasets, self._load, self.max_threads) + return processing_handler( + datasets, self._load, self.cores, self.threads) - else: - - # Empty DataFrame - self._data = pd.DataFrame(columns=[*self._types]) + # Empty DataFrame + return pd.DataFrame(columns=[*self._types]) def _resolve_point( self, @@ -263,7 +244,7 @@ def __init__( self._model = model # Get data for all weather stations - self._get_data() + self._data = self._get_data() # Interpolate data if isinstance(loc, Point): @@ -279,14 +260,3 @@ def expected_rows(self) -> int: """ return (self._end - self._start).days + 1 - - # Import methods - from meteostat.series.normalize import normalize - from meteostat.series.interpolate import interpolate - from meteostat.series.aggregate import aggregate - from meteostat.series.convert import convert - from meteostat.series.coverage import coverage - from meteostat.series.count import count - from meteostat.series.fetch import fetch - from meteostat.series.stations import stations - from meteostat.core.cache import clear_cache diff --git a/meteostat/interface/hourly.py b/meteostat/interface/hourly.py index 11f56b6..6f918df 100644 --- a/meteostat/interface/hourly.py +++ b/meteostat/interface/hourly.py @@ -18,11 +18,11 @@ from meteostat.core.loader import processing_handler, load_handler from meteostat.utilities.validations import validate_series from meteostat.utilities.aggregations import degree_mean, weighted_average -from meteostat.interface.base import Base +from meteostat.interface.timeseries import Timeseries from meteostat.interface.point import Point -class Hourly(Base): +class Hourly(Timeseries): """ Retrieve hourly weather observations for one or multiple weather stations or @@ -35,24 +35,9 @@ class Hourly(Base): # Specify if the library should use chunks or full dumps chunked: bool = True - # The list of weather Stations - _stations: pd.Index = None - - # The start date - _start: datetime = None - - # The end date - _end: datetime = None - # The time zone _timezone: str = None - # Include model data? - _model: bool = True - - # The data frame - _data: pd.DataFrame = pd.DataFrame() - # Default frequency _freq: str = '1H' @@ -200,14 +185,11 @@ def _load( # Get time index time = df.index.get_level_values('time') - # Filter & append - self._data = self._data.append( - df.loc[(time >= self._start) & (time <= self._end)]) + # Filter & return + return df.loc[(time >= self._start) & (time <= self._end)] - else: - - # Append - self._data = self._data.append(df) + # Return + return df def _get_data(self) -> None: """ @@ -237,12 +219,10 @@ def _get_data(self) -> None: )) # Data Processing - processing_handler(datasets, self._load, self.max_threads) - - else: + return processing_handler( + datasets, self._load, self.cores, self.threads) - # Empty DataFrame - self._data = pd.DataFrame(columns=[*self._types]) + return pd.DataFrame(columns=[*self._types]) def _resolve_point( self, @@ -260,8 +240,8 @@ def _resolve_point( if method == 'nearest': - self._data = self._data.groupby( - pd.Grouper(level='time', freq=self._freq)).agg('first') + self._data = self._data.groupby(pd.Grouper( + level='time', freq=self._freq)).agg('first') else: @@ -328,7 +308,7 @@ def __init__( self._model = model # Get data for all weather stations - self._get_data() + self._data = self._get_data() # Interpolate data if isinstance(loc, Point): @@ -344,14 +324,3 @@ def expected_rows(self) -> int: """ return floor((self._end - self._start).total_seconds() / 3600) + 1 - - # Import methods - from meteostat.series.normalize import normalize - from meteostat.series.interpolate import interpolate - from meteostat.series.aggregate import aggregate - from meteostat.series.convert import convert - from meteostat.series.coverage import coverage - from meteostat.series.count import count - from meteostat.series.fetch import fetch - from meteostat.series.stations import stations - from meteostat.core.cache import clear_cache diff --git a/meteostat/interface/monthly.py b/meteostat/interface/monthly.py index 47593ce..57774d4 100644 --- a/meteostat/interface/monthly.py +++ b/meteostat/interface/monthly.py @@ -16,11 +16,11 @@ from meteostat.core.loader import processing_handler, load_handler from meteostat.utilities.validations import validate_series from meteostat.utilities.aggregations import degree_mean, weighted_average -from meteostat.interface.base import Base +from meteostat.interface.timeseries import Timeseries from meteostat.interface.point import Point -class Monthly(Base): +class Monthly(Timeseries): """ Retrieve monthly weather data for one or multiple weather stations or @@ -30,21 +30,6 @@ class Monthly(Base): # The cache subdirectory cache_subdir: str = 'monthly' - # The list of weather Stations - _stations: pd.Index = None - - # The start date - _start: datetime = None - - # The end date - _end: datetime = None - - # Include model data? - _model: bool = True - - # The data frame - _data: pd.DataFrame = pd.DataFrame() - # Default frequency _freq: str = '1MS' @@ -144,14 +129,11 @@ def _load( # Get time index time = df.index.get_level_values('time') - # Filter & append - self._data = self._data.append( - df.loc[(time >= self._start) & (time <= self._end)]) + # Filter & return + return df.loc[(time >= self._start) & (time <= self._end)] - else: - - # Append - self._data = self._data.append(df) + # Return + return df def _get_data(self) -> None: """ @@ -169,12 +151,10 @@ def _get_data(self) -> None: )) # Data Processing - processing_handler(datasets, self._load, self.max_threads) + return processing_handler(datasets, self._load, self.cores, self.threads) - else: - - # Empty DataFrame - self._data = pd.DataFrame(columns=[*self._types]) + # Empty DataFrame + return pd.DataFrame(columns=[*self._types]) def _resolve_point( self, @@ -265,7 +245,7 @@ def __init__( self._model = model # Get data for all weather stations - self._get_data() + self._data = self._get_data() # Interpolate data if isinstance(loc, Point): @@ -282,14 +262,3 @@ def expected_rows(self) -> int: return ((self._end.year - self._start.year) * 12 + self._end.month - self._start.month) + 1 - - # Import methods - from meteostat.series.normalize import normalize - from meteostat.series.interpolate import interpolate - from meteostat.series.aggregate import aggregate - from meteostat.series.convert import convert - from meteostat.series.coverage import coverage - from meteostat.series.count import count - from meteostat.series.fetch import fetch - from meteostat.series.stations import stations - from meteostat.core.cache import clear_cache diff --git a/meteostat/interface/normals.py b/meteostat/interface/normals.py index 4a345e1..a00cc89 100644 --- a/meteostat/interface/normals.py +++ b/meteostat/interface/normals.py @@ -117,17 +117,10 @@ def _load( # Get time index end = df.index.get_level_values('end') - # Filter & append - self._data = self._data.append( - df.loc[end == self._end]) + # Filter & return + return df.loc[end == self._end] - else: - - # Append - if self._data.index.size == 0: - self._data = df - else: - self._data = self._data.append(df) + return df def _get_data(self) -> None: """ @@ -145,12 +138,11 @@ def _get_data(self) -> None: )) # Data Processing - processing_handler(datasets, self._load, self.max_threads) - - else: + return processing_handler( + datasets, self._load, self.cores, self.threads) - # Empty DataFrame - self._data = pd.DataFrame(columns=[*self._types]) + # Empty DataFrame + return pd.DataFrame(columns=[*self._types]) def _resolve_point( self, @@ -247,7 +239,7 @@ def __init__( self._end = end # Get data for all weather stations - self._get_data() + self._data = self._get_data() # Interpolate data if isinstance(loc, Point): diff --git a/meteostat/interface/point.py b/meteostat/interface/point.py index 6e50a6a..3004db4 100644 --- a/meteostat/interface/point.py +++ b/meteostat/interface/point.py @@ -40,6 +40,9 @@ class Point: # Altitude Weight weight_alt: float = 0.4 + # The list of weather stations + stations: pd.Index = None + # The latitude _lat: float = None @@ -106,6 +109,9 @@ def get_stations(self, freq: str = None, start: datetime = None, # Sort by score (descending) stations = stations.sort_values('score', ascending=False) + # Capture result + self.stations = stations.index[:self.max_count] + return stations.head(self.max_count) @property diff --git a/meteostat/interface/stations.py b/meteostat/interface/stations.py index e65d6c3..849da8b 100644 --- a/meteostat/interface/stations.py +++ b/meteostat/interface/stations.py @@ -8,10 +8,10 @@ The code is licensed under the MIT license. """ -from math import cos, sqrt, radians from copy import copy from datetime import datetime, timedelta from typing import Union +import numpy as np import pandas as pd from meteostat.core.cache import get_file_path, file_in_cache from meteostat.core.loader import load_handler @@ -128,19 +128,27 @@ def nearby( temp = copy(self) # Calculate distance between weather station and geo point - def distance(station, point) -> float: - # Earth radius in m + def distance(lat1, lon1, lat2, lon2): + # Earth radius in meters radius = 6371000 - x = (radians(point[1]) - radians(station['longitude'])) * \ - cos(0.5 * (radians(point[0]) + radians(station['latitude']))) - y = (radians(point[0]) - radians(station['latitude'])) + # Degress to radian + lat1, lon1, lat2, lon2 = map(np.deg2rad, [lat1, lon1, lat2, lon2]) - return radius * sqrt(x * x + y * y) + # Deltas + dlat = lat2 - lat1 + dlon = lon2 - lon1 - # Get distance for each stationsd - temp._data['distance'] = temp._data.apply( - lambda station: distance(station, [lat, lon]), axis=1) + # Calculate distance + arch = np.sin(dlat / 2)**2 + np.cos(lat1) * \ + np.cos(lat2) * np.sin(dlon / 2)**2 + arch_sin = 2 * np.arcsin(np.sqrt(arch)) + + return radius * arch_sin + + # Get distance for each station + temp._data['distance'] = distance( + lat, lon, temp._data['latitude'], temp._data['longitude']) # Filter by radius if radius is not None: diff --git a/meteostat/interface/timeseries.py b/meteostat/interface/timeseries.py new file mode 100644 index 0000000..35f36c8 --- /dev/null +++ b/meteostat/interface/timeseries.py @@ -0,0 +1,49 @@ +""" +Timeseries Class + +Meteorological data provided by Meteostat (https://dev.meteostat.net) +under the terms of the Creative Commons Attribution-NonCommercial +4.0 International Public License. + +The code is licensed under the MIT license. +""" + +from datetime import datetime +import pandas as pd +from meteostat.interface.base import Base + + +class Timeseries(Base): + + """ + Timeseries class which provides features which are used across all time series classes + """ + + # The list of weather Stations + _stations: pd.Index = None + + # The list of origin weather Stations + _origin_stations: pd.Index = None + + # The start date + _start: datetime = None + + # The end date + _end: datetime = None + + # Include model data? + _model: bool = True + + # The data frame + _data: pd.DataFrame = pd.DataFrame() + + # Import methods + from meteostat.series.normalize import normalize + from meteostat.series.interpolate import interpolate + from meteostat.series.aggregate import aggregate + from meteostat.series.convert import convert + from meteostat.series.coverage import coverage + from meteostat.series.count import count + from meteostat.series.fetch import fetch + from meteostat.series.stations import stations + from meteostat.core.cache import clear_cache diff --git a/meteostat/series/normalize.py b/meteostat/series/normalize.py index 0a2eec1..506c477 100644 --- a/meteostat/series/normalize.py +++ b/meteostat/series/normalize.py @@ -26,46 +26,48 @@ def normalize(self): # Create temporal instance temp = copy(self) - # Create result DataFrame - result = pd.DataFrame(columns=temp._columns[temp._first_met_col:]) - - # Handle tz-aware date ranges - if hasattr(temp, '_timezone') and temp._timezone is not None: - timezone = pytz.timezone(temp._timezone) - start = temp._start.astimezone(timezone) - end = temp._end.astimezone(timezone) - else: - start = temp._start - end = temp._end - - # Go through list of weather stations - for station in temp._stations: - # Create data frame - df = pd.DataFrame(columns=temp._columns[temp._first_met_col:]) - # Add time series - df['time'] = pd.date_range( - start, - end, - freq=self._freq, - tz=temp._timezone if hasattr(temp, '_timezone') else None) - # Add station ID - df['station'] = station - # Add columns - for column in temp._columns[temp._first_met_col:]: - # Add column to DataFrame - df[column] = NaN - - result = pd.concat([result, df], axis=0) - - # Set index - result = result.set_index(['station', 'time']) - - # Merge data - temp._data = pd.concat([temp._data, result], axis=0).groupby( - ['station', 'time'], as_index=True).first() - - # None -> NaN - temp._data = temp._data.fillna(NaN) + if temp._start and temp._end and temp.coverage() < 1: + + # Create result DataFrame + result = pd.DataFrame(columns=temp._columns[temp._first_met_col:]) + + # Handle tz-aware date ranges + if hasattr(temp, '_timezone') and temp._timezone is not None: + timezone = pytz.timezone(temp._timezone) + start = temp._start.astimezone(timezone) + end = temp._end.astimezone(timezone) + else: + start = temp._start + end = temp._end + + # Go through list of weather stations + for station in temp._stations: + # Create data frame + df = pd.DataFrame(columns=temp._columns[temp._first_met_col:]) + # Add time series + df['time'] = pd.date_range( + start, + end, + freq=self._freq, + tz=temp._timezone if hasattr(temp, '_timezone') else None) + # Add station ID + df['station'] = station + # Add columns + for column in temp._columns[temp._first_met_col:]: + # Add column to DataFrame + df[column] = NaN + + result = pd.concat([result, df], axis=0) + + # Set index + result = result.set_index(['station', 'time']) + + # Merge data + temp._data = pd.concat([temp._data, result], axis=0).groupby( + ['station', 'time'], as_index=True).first() + + # None -> NaN + temp._data = temp._data.fillna(NaN) # Return class instance return temp diff --git a/setup.py b/setup.py index 63e0b49..cfa9d9b 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ # Setup setup( name='meteostat', - version='1.4.6', + version='1.5.0', author='Meteostat', author_email='info@meteostat.net', description='Access and analyze historical weather and climate data with Python.',