Skip to content

Commit

Permalink
Merge pull request #9253 from gem/expo3
Browse files Browse the repository at this point in the history
Improved `build_global_exposure`/2
  • Loading branch information
micheles authored Nov 27, 2023
2 parents 12f2cc3 + 1ca582d commit ccecdb8
Showing 1 changed file with 37 additions and 12 deletions.
49 changes: 37 additions & 12 deletions utils/build_global_exposure
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import os
import io
import zlib
import logging
import pandas
import numpy
import h5py
from openquake.baselib import general, hdf5, sap, performance
Expand All @@ -37,7 +40,8 @@ OCCUPANTS_PER_ASSET_AVERAGE OCCUPANTS_PER_ASSET_DAY
OCCUPANTS_PER_ASSET_NIGHT OCCUPANTS_PER_ASSET_TRANSIT
TOTAL_AREA_SQM TOTAL_REPL_COST_USD'''.split()}
CONV['ASSET_ID'] = (numpy.string_, 24)
CONV[None] = str
for f in (None, 'ID_1'):
CONV[f] = str
TAGS = {'TAXONOMY': [], 'ID_0': [], 'ID_1': [], 'OCCUPANCY': []}
IGNORE = set('NAME_0 NAME_1 SETTLEMENT'.split())

Expand All @@ -64,16 +68,6 @@ def fix(arr):
ID1[i] = '%s-%s' % (id0, ID1[i])


def exposure_by_geohash(fname, common, monitor):
aw = hdf5.read_csv(fname + '.bak', CONV, errors='ignore', usecols=common)
if hasattr(aw, 'array') and len(aw.array):
for slc in general.gen_slices(0, len(aw.array), 1_000_000):
arr = add_geohash3(aw.array[slc])
fix(arr)
for gh in numpy.unique(arr['geohash3']):
yield gh, arr[arr['geohash3']==gh]


def collect_exposures(grm_dir):
"""
Collect the files of kind Exposure_<Country>.xml
Expand All @@ -89,6 +83,37 @@ def collect_exposures(grm_dir):
return out


def exposure_by_geohash(lines, names, common, monitor):
if isinstance(lines, bytes):
data = io.BytesIO(zlib.decompress(lines))
else:
data = io.StringIO(lines)
df = pandas.read_csv(data, names=names, dtype=CONV, usecols=common)
dt = hdf5.build_dt(CONV, names, '<BytesIO>')
array = numpy.zeros(len(df), dt)
for col in df.columns:
array[col] = df[col].to_numpy()
array = add_geohash3(array)
fix(array)
for gh in numpy.unique(array['geohash3']):
yield gh, array[array['geohash3']==gh]


def gen_tasks(fname, fields, monitor):
f = open(fname, newline='', encoding='utf-8-sig', errors='ignore')
with f:
lines = list(f)
header = [col.strip() for col in lines[0].split(',')]
for i, block in enumerate(general.block_splitter(lines[1:], 200_000)):
data = '\r\n'.join(block)
if i == 0:
yield from exposure_by_geohash(data, header, fields, monitor)
else:
print(fname)
data = zlib.compress(data.encode('utf8'))
yield exposure_by_geohash, data, header, fields


def read_world_exposure(grm_dir, dstore):
"""
Read the exposure files for the entire world (assume some conventions
Expand All @@ -115,7 +140,7 @@ def read_world_exposure(grm_dir, dstore):
dstore.create_dset('exposure/slice_by_gh3', slc_dt, fillvalue=None)

dstore.swmr_on()
smap = Starmap(exposure_by_geohash, [(c, common) for c in csvfiles],
smap = Starmap(gen_tasks, [(c, common) for c in csvfiles],
h5=dstore.hdf5)
s = 0
for gh3, arr in smap:
Expand Down

0 comments on commit ccecdb8

Please sign in to comment.