From 5e7eb0dc254f378525df0dd430fb04ef9411168f Mon Sep 17 00:00:00 2001 From: Michele Simionato Date: Sun, 26 Nov 2023 08:28:58 +0100 Subject: [PATCH 1/4] Better parallelization of the CSV files --- utils/build_global_exposure | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/utils/build_global_exposure b/utils/build_global_exposure index d5348e99403b..33a94c446a44 100755 --- a/utils/build_global_exposure +++ b/utils/build_global_exposure @@ -18,7 +18,9 @@ # along with OpenQuake. If not, see . import os +import io import logging +import pandas import numpy import h5py from openquake.baselib import general, hdf5, sap, performance @@ -37,7 +39,8 @@ OCCUPANTS_PER_ASSET_AVERAGE OCCUPANTS_PER_ASSET_DAY OCCUPANTS_PER_ASSET_NIGHT OCCUPANTS_PER_ASSET_TRANSIT TOTAL_AREA_SQM TOTAL_REPL_COST_USD'''.split()} CONV['ASSET_ID'] = (numpy.string_, 24) -CONV[None] = str +for f in (None, 'ID_1'): + CONV[f] = str TAGS = {'TAXONOMY': [], 'ID_0': [], 'ID_1': [], 'OCCUPANCY': []} IGNORE = set('NAME_0 NAME_1 SETTLEMENT'.split()) @@ -64,14 +67,18 @@ def fix(arr): ID1[i] = '%s-%s' % (id0, ID1[i]) -def exposure_by_geohash(fname, common, monitor): - aw = hdf5.read_csv(fname, CONV, errors='ignore', usecols=common) - if hasattr(aw, 'array') and len(aw.array): - for slc in general.gen_slices(0, len(aw.array), 1_000_000): - arr = add_geohash3(aw.array[slc]) - fix(arr) - for gh in numpy.unique(arr['geohash3']): - yield gh, arr[arr['geohash3']==gh] +def exposure_by_geohash(lines, names, common, monitor): + df = pandas.read_csv(io.StringIO('\r\n'.join(lines)), names=names, + dtype=CONV, usecols=common) + if len(df): + dt = hdf5.build_dt(CONV, names, '') + array = numpy.zeros(len(df), dt) + for col in df.columns: + array[col] = df[col].to_numpy() + array = add_geohash3(array) + fix(array) + for gh in numpy.unique(array['geohash3']): + yield gh, array[array['geohash3']==gh] def collect_exposures(grm_dir): @@ -89,6 +96,15 @@ def collect_exposures(grm_dir): return out +def gen_chunks(fnames, fields, csize=1_000_000): + for fname in fnames: + lines = list(open(fname, newline='', encoding='utf-8-sig', + errors='ignore')) + header = [col.strip() for col in lines[0].split(',')] + for block in general.block_splitter(lines[1:], csize): + yield block, header, fields + + def read_world_exposure(grm_dir, dstore): """ Read the exposure files for the entire world (assume some conventions @@ -115,7 +131,7 @@ def read_world_exposure(grm_dir, dstore): dstore.create_dset('exposure/slice_by_gh3', slc_dt, fillvalue=None) dstore.swmr_on() - smap = Starmap(exposure_by_geohash, [(c, common) for c in csvfiles], + smap = Starmap(exposure_by_geohash, gen_chunks(csvfiles, common), h5=dstore.hdf5) s = 0 for gh3, arr in smap: From 4df29b1cd15741f36bc93d5369a36c5d63ccbd5a Mon Sep 17 00:00:00 2001 From: Michele Simionato Date: Mon, 27 Nov 2023 07:02:37 +0100 Subject: [PATCH 2/4] Compressing better --- utils/build_global_exposure | 52 +++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/utils/build_global_exposure b/utils/build_global_exposure index 746d65047be2..d406b2e31722 100755 --- a/utils/build_global_exposure +++ b/utils/build_global_exposure @@ -68,20 +68,6 @@ def fix(arr): ID1[i] = '%s-%s' % (id0, ID1[i]) -def exposure_by_geohash(lines, names, common, monitor): - df = pandas.read_csv(io.BytesIO(zlib.decompress(lines)), names=names, - dtype=CONV, usecols=common) - if len(df): - dt = hdf5.build_dt(CONV, names, '') - array = numpy.zeros(len(df), dt) - for col in df.columns: - array[col] = df[col].to_numpy() - array = add_geohash3(array) - fix(array) - for gh in numpy.unique(array['geohash3']): - yield gh, array[array['geohash3']==gh] - - def collect_exposures(grm_dir): """ Collect the files of kind Exposure_.xml @@ -97,14 +83,33 @@ def collect_exposures(grm_dir): return out -def gen_chunks(fnames, fields, csize=200_000): - for fname in fnames: - lines = list(open(fname + '.bak', newline='', encoding='utf-8-sig', - errors='ignore')) - header = [col.strip() for col in lines[0].split(',')] - for block in general.block_splitter(lines[1:], csize): - comp = zlib.compress('\r\n'.join(block).encode('utf8')) - yield comp, header, fields +def exposure_by_geohash(lines, names, common, monitor): + if isinstance(lines, bytes): + data = io.BytesIO(lines) + else: + data = io.StringIO(lines) + df = pandas.read_csv(data, names=names, dtype=CONV, usecols=common) + dt = hdf5.build_dt(CONV, names, '') + array = numpy.zeros(len(df), dt) + for col in df.columns: + array[col] = df[col].to_numpy() + array = add_geohash3(array) + fix(array) + for gh in numpy.unique(array['geohash3']): + yield gh, array[array['geohash3']==gh] + + +def gen_tasks(fname, fields, monitor): + lines = list(open(fname + '.bak', newline='', encoding='utf-8-sig', + errors='ignore')) + header = [col.strip() for col in lines[0].split(',')] + for i, block in enumerate(general.block_splitter(lines[1:], 200_000)): + data = '\r\n'.join(block) + if i == 0: + yield from exposure_by_geohash(data, header, fields, monitor) + else: + data = zlib.compress(data.encode('utf8')) + yield exposure_by_geohash, data, header, fields def read_world_exposure(grm_dir, dstore): @@ -133,7 +138,8 @@ def read_world_exposure(grm_dir, dstore): dstore.create_dset('exposure/slice_by_gh3', slc_dt, fillvalue=None) dstore.swmr_on() - smap = Starmap(exposure_by_geohash, gen_chunks(csvfiles, common), + Starmap.num_cores = 8 + smap = Starmap(gen_tasks, [(c, common) for c in csvfiles], h5=dstore.hdf5) s = 0 for gh3, arr in smap: From 3b3b842d8e28aa018be55da6f19dd86670611ec2 Mon Sep 17 00:00:00 2001 From: Michele Simionato Date: Mon, 27 Nov 2023 07:13:58 +0100 Subject: [PATCH 3/4] Smart compress --- utils/build_global_exposure | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/utils/build_global_exposure b/utils/build_global_exposure index d406b2e31722..7a46affe22a0 100755 --- a/utils/build_global_exposure +++ b/utils/build_global_exposure @@ -85,7 +85,7 @@ def collect_exposures(grm_dir): def exposure_by_geohash(lines, names, common, monitor): if isinstance(lines, bytes): - data = io.BytesIO(lines) + data = io.BytesIO(zlib.decompress(lines)) else: data = io.StringIO(lines) df = pandas.read_csv(data, names=names, dtype=CONV, usecols=common) @@ -100,14 +100,16 @@ def exposure_by_geohash(lines, names, common, monitor): def gen_tasks(fname, fields, monitor): - lines = list(open(fname + '.bak', newline='', encoding='utf-8-sig', - errors='ignore')) + f = open(fname + '.bak', newline='', encoding='utf-8-sig', errors='ignore') + with f: + lines = list(f) header = [col.strip() for col in lines[0].split(',')] for i, block in enumerate(general.block_splitter(lines[1:], 200_000)): data = '\r\n'.join(block) if i == 0: yield from exposure_by_geohash(data, header, fields, monitor) else: + print(fname) data = zlib.compress(data.encode('utf8')) yield exposure_by_geohash, data, header, fields From 1ca582d1a1d69c49f254f0114b5c20793616bb92 Mon Sep 17 00:00:00 2001 From: Michele Simionato Date: Mon, 27 Nov 2023 07:22:38 +0100 Subject: [PATCH 4/4] Cleanup [ci skip] --- utils/build_global_exposure | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/utils/build_global_exposure b/utils/build_global_exposure index 7a46affe22a0..bd04b7730418 100755 --- a/utils/build_global_exposure +++ b/utils/build_global_exposure @@ -100,7 +100,7 @@ def exposure_by_geohash(lines, names, common, monitor): def gen_tasks(fname, fields, monitor): - f = open(fname + '.bak', newline='', encoding='utf-8-sig', errors='ignore') + f = open(fname, newline='', encoding='utf-8-sig', errors='ignore') with f: lines = list(f) header = [col.strip() for col in lines[0].split(',')] @@ -140,7 +140,6 @@ def read_world_exposure(grm_dir, dstore): dstore.create_dset('exposure/slice_by_gh3', slc_dt, fillvalue=None) dstore.swmr_on() - Starmap.num_cores = 8 smap = Starmap(gen_tasks, [(c, common) for c in csvfiles], h5=dstore.hdf5) s = 0