Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved build_global_exposure #9252

Merged
merged 8 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions openquake/baselib/hdf5.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ def check(val):
return check


def _read_csv(fileobj, compositedt):
def _read_csv(fileobj, compositedt, usecols=None):
dic = {}
conv = {}
for name in compositedt.names:
Expand All @@ -882,7 +882,8 @@ def _read_csv(fileobj, compositedt):
else:
dic[name] = dt
df = pandas.read_csv(fileobj, names=compositedt.names, converters=conv,
dtype=dic, keep_default_na=False, na_filter=False)
dtype=dic, usecols=usecols,
keep_default_na=False, na_filter=False)
return df


Expand Down Expand Up @@ -925,21 +926,22 @@ def read_common_header(fnames, sep=','):
common = set(first.strip().split(sep))
else:
common &= set(first.strip().split(sep))
return sorted(common)
return common


# NB: it would be nice to use numpy.loadtxt(
# f, build_dt(dtypedict, header), delimiter=sep, ndmin=1, comments=None)
# however numpy does not support quoting, and "foo,bar" would be split :-(
def read_csv(fname, dtypedict={None: float}, renamedict={}, sep=',',
index=None, errors=None):
index=None, errors=None, usecols=None):
"""
:param fname: a CSV file with an header and float fields
:param dtypedict: a dictionary fieldname -> dtype, None -> default
:param renamedict: aliases for the fields to rename
:param sep: separator (default comma)
:param index: if not None, returns a pandas DataFrame
:param errors: passed to the underlying open function (default None)
:param usecols: columns to read
:returns: an ArrayWrapper, unless there is an index
"""
attrs = {}
Expand All @@ -953,7 +955,7 @@ def read_csv(fname, dtypedict={None: float}, renamedict={}, sep=',',
header = first.strip().split(sep)
dt = build_dt(dtypedict, header, fname)
try:
df = _read_csv(f, dt)
df = _read_csv(f, dt, usecols)
except Exception as exc:
err = find_error(fname, errors, dt)
if err:
Expand Down
56 changes: 47 additions & 9 deletions utils/build_global_exposure
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import os
import logging
import numpy
from openquake.baselib import general, hdf5, sap
import h5py
from openquake.baselib import general, hdf5, sap, performance
from openquake.baselib.parallel import Starmap
from openquake.hazardlib.geo.utils import geohash3
from openquake.commonlib.datastore import build_dstore_log
Expand All @@ -34,7 +36,10 @@ COST_STRUCTURAL_USD LATITUDE LONGITUDE OCCUPANTS_PER_ASSET
OCCUPANTS_PER_ASSET_AVERAGE OCCUPANTS_PER_ASSET_DAY
OCCUPANTS_PER_ASSET_NIGHT OCCUPANTS_PER_ASSET_TRANSIT
TOTAL_AREA_SQM TOTAL_REPL_COST_USD'''.split()}
CONV['ASSET_ID'] = (numpy.string_, 24)
CONV[None] = str
TAGS = {'TAXONOMY': [], 'ID_0': [], 'ID_1': [], 'OCCUPANCY': []}
IGNORE = set('NAME_0 NAME_1 SETTLEMENT'.split())


def add_geohash3(array):
Expand All @@ -49,11 +54,22 @@ def add_geohash3(array):
return out


def fix(arr):
# prepend the country to ASSET_ID and ID_1
ID0 = arr['ID_0']
ID1 = arr['ID_1']
arr['ASSET_ID'] = numpy.char.add(numpy.array(ID0, 'S3'), arr['ASSET_ID'])
for i, (id0, id1) in enumerate(zip(ID0, ID1)):
if not id1.startswith(id0):
ID1[i] = '%s-%s' % (id0, ID1[i])


def exposure_by_geohash(fname, common, monitor):
aw = hdf5.read_csv(fname, CONV, errors='ignore')
aw = hdf5.read_csv(fname + '.bak', CONV, errors='ignore', usecols=common)
if hasattr(aw, 'array') and len(aw.array):
for slc in general.gen_slices(0, len(aw.array), 1_000_000):
arr = add_geohash3(aw.array[slc][common])
arr = add_geohash3(aw.array[slc])
fix(arr)
for gh in numpy.unique(arr['geohash3']):
yield gh, arr[arr['geohash3']==gh]

Expand Down Expand Up @@ -86,30 +102,52 @@ def read_world_exposure(grm_dir, dstore):
exposure, _ = _get_exposure(fname)
csvfiles.extend(exposure.datafiles)

common = hdf5.read_common_header(csvfiles)
common = sorted(hdf5.read_common_header(csvfiles) - IGNORE)
assert common, 'There is no common header subset among %s' % csvfiles

dstore.create_df('exposure', [(c, F32 if c in CONV else hdf5.vstr)
for c in common])
dtlist = [(t, U32) for t in TAGS] + \
[(f, F32) for f in set(CONV)-set(TAGS)-{'ASSET_ID', None}] + \
[('ASSET_ID', h5py.string_dtype('ascii', 25))]
dstore.create_df('exposure', dtlist, 'gzip')
for tagname in TAGS:
dstore.create_dset('tagcol/' + tagname, U32)
slc_dt = numpy.dtype([('gh3', U16), ('start', U32), ('stop', U32)])
dstore.create_dset('exposure/slice_by_gh3', slc_dt, fillvalue=None)

dstore.swmr_on()
smap = Starmap(exposure_by_geohash, [(c, common) for c in csvfiles],
h5=dstore.hdf5)
s = 0
for gh3, arr in smap:
for name in common:
hdf5.extend(dstore[f'exposure/{name}'], arr[name])
if name in TAGS:
TAGS[name].append(arr[name])
else:
hdf5.extend(dstore['exposure/' + name], arr[name])
n = len(arr)
slc = numpy.array([(gh3, s, s + n)], slc_dt)
hdf5.extend(dstore['exposure/slice_by_gh3'], slc)
s += n
print(dstore)
Starmap.shutdown()

for tagname in TAGS:
tagvalues = numpy.concatenate(TAGS[tagname])
uvals, inv = numpy.unique(tagvalues, return_inverse=1)
logging.info('Storing %s[%d]', tagname, len(uvals))
hdf5.extend(dstore[f'exposure/{tagname}'], inv)
dstore['tagcol/' + tagname] = uvals


def main(grm_dir):
"""
Storing global exposure
"""
mon = performance.Monitor(measuremem=True)
dstore, log = build_dstore_log()
with dstore, log:
read_world_exposure(grm_dir, dstore)
with mon:
read_world_exposure(grm_dir, dstore)
logging.info(mon)
main.grm_dir = 'global risk model directory'


Expand Down