Johannes Heisig 2024-07-17
In this notebook we show how to access, download, and pre-process
European air quality data. The EEA recently switched to storing its
measurements in Parquet
file format, allowing better compression,
faster downloads, and more efficient downstream analysis.
# load functions for download and preprocessing
source("R/functions.R")
EEA measurement station data are organized into single files for each combination of station, pollutant, and dataset. Pollutants are represented by numeric codes (e.g 7 = Ozone). A full list is available in the EEA Air Quality Vocabulary. There are three types of datasets according to the time of recording:
- [1] up-to-date, unverified data from 2023 until now (E2a)
- [2] verified data from between 2013 and 2022 (E1a)
- [3] historical Airbase data from between 2002 and 2012
To retrieve station measurement data via the EEA Air Quality Download
Service we need to
follow a 2-step process. First, station_data_urls()
sends an
API-request to collect the URLs of all files relevant to the user
according to filter preferences (countries, pollutants, datasets). Then
download_station_data()
writes the files to a local directory. The
download can be run in parallel by setting cores > 1
, which speeds up
the task tremendously.
Let’s download verified air quality data for 2 countries and 4 pollutants of interest:
- Countries: Ireland, Luxembourg
- Pollutants: SO2, NO2, O3, PM10, PM2.5
- Datasets: E1a, E2a
countries = c("IE","LU")
pollutants = c(1, 7, 8, 5, 6001)
datasets = c(1,2)
dl_dir = "tests/download"
pq_files = station_data_urls(country = countries,
pollutant = pollutants,
dataset = datasets) |>
download_station_data(dir = dl_dir,
cores = 6)
length(pq_files)
[1] 750
Using Apache Arrow, we can read the headers of all Parquet file in our download directory to get an idea of our data structure.
library(arrow)
(data = open_dataset(dl_dir))
FileSystemDataset with 750 Parquet files
12 columns
Samplingpoint: string
Pollutant: int32
Start: timestamp[ns] not null
End: timestamp[ns] not null
Value: decimal128(38, 18)
Unit: string
AggType: string
Validity: int32 not null
Verification: int32 not null
ResultTime: timestamp[ns] not null
DataCapture: decimal128(38, 18)
FkObservationLog: string
After downloading we have the raw Parquet files on our disk. They carry redundant information and their structure (single time series per station and pollutant) needs to be improved for further analysis. Here we filter and combine the data so it becomes analysis-ready. First, we discard observations, that do not fulfill our requirements regarding validity and verification. We then add some station meta data (like the station code) and rearrange the tables to have a single time series per station with a column for each measured pollutant (PM10, PM2.5, O3, NO2).
Quality filter:
keep_validity
: which validity classes should be included?- currently 1
keep_verification
: which verification classes should be included?- currently 1 & 2
prep_dir = "tests/01_hourly"
station_meta = read_parquet("AQ_stations/EEA_stations_meta_SamplingPoint.parquet")
preprocess_station_data(dir = dl_dir,
out_dir = prep_dir,
station_meta = station_meta,
keep_validity = 1,
keep_verification = c(1,2,3))
IE - LU -
preprocess_station_data()
works on all stations from one country at a
time, so we get one Parquet file per country as output. See below the
new structure of our data and the list of countries we processed.
prep_data = open_dataset(prep_dir)
list.files(prep_dir)
[1] "IE_hourly.parquet" "LU_hourly.parquet"