Skip to content

Commit

Permalink
Risk model tests and API changes (#162)
Browse files Browse the repository at this point in the history
* Risk model related API and test changes
---------

Signed-off-by: Joe Moorhouse <[email protected]>
  • Loading branch information
joemoorhouse authored Nov 16, 2023
1 parent c864e41 commit a860163
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 86 deletions.
16 changes: 12 additions & 4 deletions src/physrisk/api/v1/impact_req_resp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Sequence

from pydantic import BaseModel, Field

Expand All @@ -19,12 +19,20 @@ class AssetImpactRequest(BaseModel):
default_factory=CalcSettings, description="Interpolation method." # type:ignore
)
include_asset_level: bool = Field(True, description="If true, include asset-level impacts.")
include_measures: bool = Field(True, description="If true, include measures.")
include_measures: bool = Field(False, description="If true, include calculation of risk measures.")
include_calc_details: bool = Field(True, description="If true, include impact calculation details.")
scenarios: Optional[Sequence[str]] = Field([], description="Name of scenarios ('rcp8p5')")
years: Optional[Sequence[int]] = Field(
[],
description="""Projection year (2030, 2050, 2080). Any year before 2030,
e.g. 1980, is treated as historical.""",
)
# to be deprecated
scenario: str = Field("rcp8p5", description="Name of scenario ('rcp8p5')")
year: int = Field(
2050,
description="Projection year (2030, 2050, 2080). Any year before 2030, e.g. 1980, is treated as historical.",
[2050],
description="""Projection years (e.g. 2030, 2050, 2080). Any year before 2030,
e.g. 1980, is treated as historical.""",
)


Expand Down
85 changes: 44 additions & 41 deletions src/physrisk/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,62 +274,65 @@ def _get_asset_impacts(
vulnerability_models = (
calc.get_default_vulnerability_models() if vulnerability_models is None else vulnerability_models
)

# we keep API definition of asset separate from internal Asset class; convert by reflection
# based on asset_class:
assets = create_assets(request.assets)
measure_calcs = calc.get_default_risk_measure_calculators()
risk_model = AssetLevelRiskModel(hazard_model, vulnerability_models, measure_calcs)

scenarios = [request.scenario]
years = [request.year]
scenarios = [request.scenario] if request.scenarios is None or len(request.scenarios) == 0 else request.scenarios
years = [request.year] if request.years is None or len(request.years) == 0 else request.years
if request.include_measures:
batch_impacts, measures = risk_model.calculate_risk_measures(assets, scenarios, years)
measure_ids_for_asset, definitions = risk_model.populate_measure_definitions(assets)
risk_measures = _create_risk_measures(measures, measure_ids_for_asset, definitions, assets, scenarios, years)
else:
elif request.include_asset_level:
batch_impacts = risk_model.calculate_impacts(assets, scenarios, years)
risk_measures = None

results = batch_impacts[BatchId(scenarios[0], years[0])]

# note that this does rely on ordering of dictionary (post 3.6)
impacts: Dict[Asset, List[AssetSingleImpact]] = {}
for (asset, _), v in results.items():
# calculation details
if v.event is not None and v.vulnerability is not None:
hazard_exceedance = v.event.to_exceedance_curve()

vulnerability_distribution = VulnerabilityDistrib(
intensity_bin_edges=v.vulnerability.intensity_bins,
impact_bin_edges=v.vulnerability.impact_bins,
prob_matrix=v.vulnerability.prob_matrix,
)

calc_details = AcuteHazardCalculationDetails(
hazard_exceedance=ExceedanceCurve(
values=hazard_exceedance.values, exceed_probabilities=hazard_exceedance.probs
if request.include_asset_level:
results = batch_impacts[BatchId(scenarios[0], years[0])]
# note that this does rely on ordering of dictionary (post 3.6)
impacts: Dict[Asset, List[AssetSingleImpact]] = {}
for (asset, _), v in results.items():
if request.include_calc_details:
if v.event is not None and v.vulnerability is not None:
hazard_exceedance = v.event.to_exceedance_curve()

vulnerability_distribution = VulnerabilityDistrib(
intensity_bin_edges=v.vulnerability.intensity_bins,
impact_bin_edges=v.vulnerability.impact_bins,
prob_matrix=v.vulnerability.prob_matrix,
)

calc_details = AcuteHazardCalculationDetails(
hazard_exceedance=ExceedanceCurve(
values=hazard_exceedance.values, exceed_probabilities=hazard_exceedance.probs
),
hazard_distribution=Distribution(
bin_edges=v.event.intensity_bin_edges, probabilities=v.event.prob
),
vulnerability_distribution=vulnerability_distribution,
)
else:
calc_details = None

impact_exceedance = v.impact.to_exceedance_curve()
hazard_impacts = AssetSingleImpact(
hazard_type=v.impact.hazard_type.__name__,
impact_type=v.impact.impact_type.name,
impact_exceedance=ExceedanceCurve(
values=impact_exceedance.values, exceed_probabilities=impact_exceedance.probs
),
hazard_distribution=Distribution(bin_edges=v.event.intensity_bin_edges, probabilities=v.event.prob),
vulnerability_distribution=vulnerability_distribution,
impact_distribution=Distribution(bin_edges=v.impact.impact_bins, probabilities=v.impact.prob),
impact_mean=v.impact.mean_impact(),
impact_std_deviation=0, # TODO!
calc_details=None if v.event is None else calc_details,
)

impact_exceedance = v.impact.to_exceedance_curve()
hazard_impacts = AssetSingleImpact(
hazard_type=v.impact.hazard_type.__name__,
impact_type=v.impact.impact_type.name,
impact_exceedance=ExceedanceCurve(
values=impact_exceedance.values, exceed_probabilities=impact_exceedance.probs
),
impact_distribution=Distribution(bin_edges=v.impact.impact_bins, probabilities=v.impact.prob),
impact_mean=v.impact.mean_impact(),
impact_std_deviation=0, # TODO!
calc_details=None if v.event is None else calc_details,
)

impacts.setdefault(asset, []).append(hazard_impacts)

asset_impacts = [AssetLevelImpact(asset_id="", impacts=a) for a in impacts.values()]
impacts.setdefault(asset, []).append(hazard_impacts)
asset_impacts = [AssetLevelImpact(asset_id="", impacts=a) for a in impacts.values()]
else:
asset_impacts = None

return AssetImpactResponse(asset_impacts=asset_impacts, risk_measures=risk_measures)

Expand Down
38 changes: 36 additions & 2 deletions src/physrisk/vulnerability_models/real_estate_models.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from collections import defaultdict
from typing import Dict, Tuple
from typing import Dict, List, Tuple

import numpy as np

from physrisk.api.v1.common import VulnerabilityCurve, VulnerabilityCurves
from physrisk.kernel.assets import Asset, RealEstateAsset
from physrisk.kernel.hazard_model import HazardDataRequest, HazardDataResponse, HazardParameterDataResponse
from physrisk.kernel.impact_distrib import ImpactDistrib
from physrisk.kernel.vulnerability_matrix_provider import VulnMatrixProvider
from physrisk.kernel.vulnerability_model import VulnerabilityModel

from ..kernel.hazards import CoastalInundation, RiverineInundation, Wind
from ..kernel.hazards import ChronicHeat, CoastalInundation, RiverineInundation, Wind
from ..kernel.vulnerability_model import (
DeterministicVulnerabilityModel,
VulnerabilityModelBase,
applies_to_events,
checked_beta_distrib,
get_vulnerability_curves_from_resource,
Expand Down Expand Up @@ -151,3 +154,34 @@ def wind_damage(self, v: np.ndarray, v_half: float):
v_thresh = 25.7 # m/s
vn = np.where(v > v_thresh, v - v_thresh, 0) / (v_half - v_thresh)
return vn**3 / (1 + vn**3)


class CoolingModel(VulnerabilityModelBase):
_default_loss_coeff = 200 # W/K
# 200 W/K is a nominal total-asset heat loss coefficient. It is approximately the
# heat loss of a fairly recently built residential property.
# For 2000 degree days of heating required in a year, the corresponding heating requirement
# would be 200 * 2000 * 24 / 1000 = 9600 kWh

def __init__(self):
self.indicator_id = "mean_degree_days/above/index"
self.hazard_type = ChronicHeat

def get_data_requests(self, asset: Asset, *, scenario: str, year: int):
return HazardDataRequest(
self.hazard_type,
asset.longitude,
asset.latitude,
scenario=scenario,
year=year,
indicator_id=self.indicator_id,
)

def get_impact(self, asset: Asset, data_responses: List[HazardDataResponse]) -> ImpactDistrib:
(parameters,) = data_responses
assert isinstance(parameters, HazardParameterDataResponse)
# we interpolate the specific threshold from the different values
# deg_days = parameters.parameter
# heat_loss = deg_days * self._default_loss_coeff * 24 / 1000
raise NotImplementedError()
# return ImpactDistrib(ChronicHeat, )
54 changes: 52 additions & 2 deletions src/test/api/test_impact_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,58 @@ def test_impact_request(self):
"include_asset_level": True,
"include_measures": False,
"include_calc_details": True,
"year": 2080,
"scenario": "rcp8p5",
"years": [2080],
"scenarios": ["rcp8p5"],
}

request = requests.AssetImpactRequest(**request_dict) # type: ignore

curve = np.array([0.0596, 0.333, 0.505, 0.715, 0.864, 1.003, 1.149, 1.163, 1.163])
store = mock_hazard_model_store_inundation(TestData.longitudes, TestData.latitudes, curve)

source_paths = get_default_source_paths(EmbeddedInventory())
vulnerability_models = {
PowerGeneratingAsset: [InundationModel()],
RealEstateAsset: [RealEstateCoastalInundationModel(), RealEstateRiverineInundationModel()],
}

response = requests._get_asset_impacts(
request,
ZarrHazardModel(source_paths=source_paths, reader=ZarrReader(store)),
vulnerability_models=vulnerability_models,
)

self.assertEqual(response.asset_impacts[0].impacts[0].hazard_type, "CoastalInundation")

def test_risk_model_impact_request(self):
"""Tests the risk model functionality of the impact request."""

assets = {
"items": [
{
"asset_class": "RealEstateAsset",
"type": "Buildings/Industrial",
"location": "Asia",
"longitude": TestData.longitudes[0],
"latitude": TestData.latitudes[0],
},
{
"asset_class": "PowerGeneratingAsset",
"type": "Nuclear",
"location": "Asia",
"longitude": TestData.longitudes[1],
"latitude": TestData.latitudes[1],
},
],
}

request_dict = {
"assets": assets,
"include_asset_level": True,
"include_measures": False,
"include_calc_details": True,
"years": [2080],
"scenarios": ["rcp8p5"],
}

request = requests.AssetImpactRequest(**request_dict) # type: ignore
Expand Down
16 changes: 16 additions & 0 deletions src/test/kernel/test_live_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,19 @@ def test_live_exposure():
}
result = requests.post(url + "/api/get_asset_exposure", json=request)
print(result.json())


@pytest.mark.skip("only as example")
def test_live_impacts():
request = {
"assets": {
"items": [
{"asset_class": "Asset", "type": None, "location": None, "latitude": 34.556, "longitude": 69.4787}
]
},
"calc_settings": {"hazard_interp": "floor"},
"scenario": "ssp585",
"year": 2050,
}
result = requests.post(url + "/api/get_asset_exposure", json=request)
print(result.json())
Loading

0 comments on commit a860163

Please sign in to comment.