diff --git a/src/physrisk/api/v1/impact_req_resp.py b/src/physrisk/api/v1/impact_req_resp.py index 3a483e30..f6c27ed0 100644 --- a/src/physrisk/api/v1/impact_req_resp.py +++ b/src/physrisk/api/v1/impact_req_resp.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Sequence from pydantic import BaseModel, Field @@ -19,12 +19,20 @@ class AssetImpactRequest(BaseModel): default_factory=CalcSettings, description="Interpolation method." # type:ignore ) include_asset_level: bool = Field(True, description="If true, include asset-level impacts.") - include_measures: bool = Field(True, description="If true, include measures.") + include_measures: bool = Field(False, description="If true, include calculation of risk measures.") include_calc_details: bool = Field(True, description="If true, include impact calculation details.") + scenarios: Optional[Sequence[str]] = Field([], description="Name of scenarios ('rcp8p5')") + years: Optional[Sequence[int]] = Field( + [], + description="""Projection year (2030, 2050, 2080). Any year before 2030, + e.g. 1980, is treated as historical.""", + ) + # to be deprecated scenario: str = Field("rcp8p5", description="Name of scenario ('rcp8p5')") year: int = Field( - 2050, - description="Projection year (2030, 2050, 2080). Any year before 2030, e.g. 1980, is treated as historical.", + [2050], + description="""Projection years (e.g. 2030, 2050, 2080). Any year before 2030, + e.g. 1980, is treated as historical.""", ) diff --git a/src/physrisk/requests.py b/src/physrisk/requests.py index 903e51d4..e2a1463b 100644 --- a/src/physrisk/requests.py +++ b/src/physrisk/requests.py @@ -274,62 +274,65 @@ def _get_asset_impacts( vulnerability_models = ( calc.get_default_vulnerability_models() if vulnerability_models is None else vulnerability_models ) - # we keep API definition of asset separate from internal Asset class; convert by reflection # based on asset_class: assets = create_assets(request.assets) measure_calcs = calc.get_default_risk_measure_calculators() risk_model = AssetLevelRiskModel(hazard_model, vulnerability_models, measure_calcs) - scenarios = [request.scenario] - years = [request.year] + scenarios = [request.scenario] if request.scenarios is None or len(request.scenarios) == 0 else request.scenarios + years = [request.year] if request.years is None or len(request.years) == 0 else request.years if request.include_measures: batch_impacts, measures = risk_model.calculate_risk_measures(assets, scenarios, years) measure_ids_for_asset, definitions = risk_model.populate_measure_definitions(assets) risk_measures = _create_risk_measures(measures, measure_ids_for_asset, definitions, assets, scenarios, years) - else: + elif request.include_asset_level: batch_impacts = risk_model.calculate_impacts(assets, scenarios, years) risk_measures = None - results = batch_impacts[BatchId(scenarios[0], years[0])] - - # note that this does rely on ordering of dictionary (post 3.6) - impacts: Dict[Asset, List[AssetSingleImpact]] = {} - for (asset, _), v in results.items(): - # calculation details - if v.event is not None and v.vulnerability is not None: - hazard_exceedance = v.event.to_exceedance_curve() - - vulnerability_distribution = VulnerabilityDistrib( - intensity_bin_edges=v.vulnerability.intensity_bins, - impact_bin_edges=v.vulnerability.impact_bins, - prob_matrix=v.vulnerability.prob_matrix, - ) - - calc_details = AcuteHazardCalculationDetails( - hazard_exceedance=ExceedanceCurve( - values=hazard_exceedance.values, exceed_probabilities=hazard_exceedance.probs + if request.include_asset_level: + results = batch_impacts[BatchId(scenarios[0], years[0])] + # note that this does rely on ordering of dictionary (post 3.6) + impacts: Dict[Asset, List[AssetSingleImpact]] = {} + for (asset, _), v in results.items(): + if request.include_calc_details: + if v.event is not None and v.vulnerability is not None: + hazard_exceedance = v.event.to_exceedance_curve() + + vulnerability_distribution = VulnerabilityDistrib( + intensity_bin_edges=v.vulnerability.intensity_bins, + impact_bin_edges=v.vulnerability.impact_bins, + prob_matrix=v.vulnerability.prob_matrix, + ) + + calc_details = AcuteHazardCalculationDetails( + hazard_exceedance=ExceedanceCurve( + values=hazard_exceedance.values, exceed_probabilities=hazard_exceedance.probs + ), + hazard_distribution=Distribution( + bin_edges=v.event.intensity_bin_edges, probabilities=v.event.prob + ), + vulnerability_distribution=vulnerability_distribution, + ) + else: + calc_details = None + + impact_exceedance = v.impact.to_exceedance_curve() + hazard_impacts = AssetSingleImpact( + hazard_type=v.impact.hazard_type.__name__, + impact_type=v.impact.impact_type.name, + impact_exceedance=ExceedanceCurve( + values=impact_exceedance.values, exceed_probabilities=impact_exceedance.probs ), - hazard_distribution=Distribution(bin_edges=v.event.intensity_bin_edges, probabilities=v.event.prob), - vulnerability_distribution=vulnerability_distribution, + impact_distribution=Distribution(bin_edges=v.impact.impact_bins, probabilities=v.impact.prob), + impact_mean=v.impact.mean_impact(), + impact_std_deviation=0, # TODO! + calc_details=None if v.event is None else calc_details, ) - - impact_exceedance = v.impact.to_exceedance_curve() - hazard_impacts = AssetSingleImpact( - hazard_type=v.impact.hazard_type.__name__, - impact_type=v.impact.impact_type.name, - impact_exceedance=ExceedanceCurve( - values=impact_exceedance.values, exceed_probabilities=impact_exceedance.probs - ), - impact_distribution=Distribution(bin_edges=v.impact.impact_bins, probabilities=v.impact.prob), - impact_mean=v.impact.mean_impact(), - impact_std_deviation=0, # TODO! - calc_details=None if v.event is None else calc_details, - ) - - impacts.setdefault(asset, []).append(hazard_impacts) - - asset_impacts = [AssetLevelImpact(asset_id="", impacts=a) for a in impacts.values()] + impacts.setdefault(asset, []).append(hazard_impacts) + asset_impacts = [AssetLevelImpact(asset_id="", impacts=a) for a in impacts.values()] + else: + asset_impacts = None return AssetImpactResponse(asset_impacts=asset_impacts, risk_measures=risk_measures) diff --git a/src/physrisk/vulnerability_models/real_estate_models.py b/src/physrisk/vulnerability_models/real_estate_models.py index 32e2efca..8d4cff23 100644 --- a/src/physrisk/vulnerability_models/real_estate_models.py +++ b/src/physrisk/vulnerability_models/real_estate_models.py @@ -1,16 +1,19 @@ from collections import defaultdict -from typing import Dict, Tuple +from typing import Dict, List, Tuple import numpy as np from physrisk.api.v1.common import VulnerabilityCurve, VulnerabilityCurves from physrisk.kernel.assets import Asset, RealEstateAsset +from physrisk.kernel.hazard_model import HazardDataRequest, HazardDataResponse, HazardParameterDataResponse +from physrisk.kernel.impact_distrib import ImpactDistrib from physrisk.kernel.vulnerability_matrix_provider import VulnMatrixProvider from physrisk.kernel.vulnerability_model import VulnerabilityModel -from ..kernel.hazards import CoastalInundation, RiverineInundation, Wind +from ..kernel.hazards import ChronicHeat, CoastalInundation, RiverineInundation, Wind from ..kernel.vulnerability_model import ( DeterministicVulnerabilityModel, + VulnerabilityModelBase, applies_to_events, checked_beta_distrib, get_vulnerability_curves_from_resource, @@ -151,3 +154,34 @@ def wind_damage(self, v: np.ndarray, v_half: float): v_thresh = 25.7 # m/s vn = np.where(v > v_thresh, v - v_thresh, 0) / (v_half - v_thresh) return vn**3 / (1 + vn**3) + + +class CoolingModel(VulnerabilityModelBase): + _default_loss_coeff = 200 # W/K + # 200 W/K is a nominal total-asset heat loss coefficient. It is approximately the + # heat loss of a fairly recently built residential property. + # For 2000 degree days of heating required in a year, the corresponding heating requirement + # would be 200 * 2000 * 24 / 1000 = 9600 kWh + + def __init__(self): + self.indicator_id = "mean_degree_days/above/index" + self.hazard_type = ChronicHeat + + def get_data_requests(self, asset: Asset, *, scenario: str, year: int): + return HazardDataRequest( + self.hazard_type, + asset.longitude, + asset.latitude, + scenario=scenario, + year=year, + indicator_id=self.indicator_id, + ) + + def get_impact(self, asset: Asset, data_responses: List[HazardDataResponse]) -> ImpactDistrib: + (parameters,) = data_responses + assert isinstance(parameters, HazardParameterDataResponse) + # we interpolate the specific threshold from the different values + # deg_days = parameters.parameter + # heat_loss = deg_days * self._default_loss_coeff * 24 / 1000 + raise NotImplementedError() + # return ImpactDistrib(ChronicHeat, ) diff --git a/src/test/api/test_impact_requests.py b/src/test/api/test_impact_requests.py index 4ab770d6..feed8d32 100644 --- a/src/test/api/test_impact_requests.py +++ b/src/test/api/test_impact_requests.py @@ -72,8 +72,58 @@ def test_impact_request(self): "include_asset_level": True, "include_measures": False, "include_calc_details": True, - "year": 2080, - "scenario": "rcp8p5", + "years": [2080], + "scenarios": ["rcp8p5"], + } + + request = requests.AssetImpactRequest(**request_dict) # type: ignore + + curve = np.array([0.0596, 0.333, 0.505, 0.715, 0.864, 1.003, 1.149, 1.163, 1.163]) + store = mock_hazard_model_store_inundation(TestData.longitudes, TestData.latitudes, curve) + + source_paths = get_default_source_paths(EmbeddedInventory()) + vulnerability_models = { + PowerGeneratingAsset: [InundationModel()], + RealEstateAsset: [RealEstateCoastalInundationModel(), RealEstateRiverineInundationModel()], + } + + response = requests._get_asset_impacts( + request, + ZarrHazardModel(source_paths=source_paths, reader=ZarrReader(store)), + vulnerability_models=vulnerability_models, + ) + + self.assertEqual(response.asset_impacts[0].impacts[0].hazard_type, "CoastalInundation") + + def test_risk_model_impact_request(self): + """Tests the risk model functionality of the impact request.""" + + assets = { + "items": [ + { + "asset_class": "RealEstateAsset", + "type": "Buildings/Industrial", + "location": "Asia", + "longitude": TestData.longitudes[0], + "latitude": TestData.latitudes[0], + }, + { + "asset_class": "PowerGeneratingAsset", + "type": "Nuclear", + "location": "Asia", + "longitude": TestData.longitudes[1], + "latitude": TestData.latitudes[1], + }, + ], + } + + request_dict = { + "assets": assets, + "include_asset_level": True, + "include_measures": False, + "include_calc_details": True, + "years": [2080], + "scenarios": ["rcp8p5"], } request = requests.AssetImpactRequest(**request_dict) # type: ignore diff --git a/src/test/kernel/test_live_services.py b/src/test/kernel/test_live_services.py index f4adab07..79b7313d 100644 --- a/src/test/kernel/test_live_services.py +++ b/src/test/kernel/test_live_services.py @@ -19,3 +19,19 @@ def test_live_exposure(): } result = requests.post(url + "/api/get_asset_exposure", json=request) print(result.json()) + + +@pytest.mark.skip("only as example") +def test_live_impacts(): + request = { + "assets": { + "items": [ + {"asset_class": "Asset", "type": None, "location": None, "latitude": 34.556, "longitude": 69.4787} + ] + }, + "calc_settings": {"hazard_interp": "floor"}, + "scenario": "ssp585", + "year": 2050, + } + result = requests.post(url + "/api/get_asset_exposure", json=request) + print(result.json()) diff --git a/src/test/risk_models/test_risk_models.py b/src/test/risk_models/test_risk_models.py index c2a01ab0..e1a92c5b 100644 --- a/src/test/risk_models/test_risk_models.py +++ b/src/test/risk_models/test_risk_models.py @@ -2,10 +2,11 @@ import test.data.hazard_model_store as hms import unittest from test.data.hazard_model_store import TestData, ZarrStoreMocker -from typing import NamedTuple +from typing import NamedTuple, Sequence import numpy as np +from physrisk import requests from physrisk.api.v1.impact_req_resp import RiskMeasureKey, RiskMeasures from physrisk.data.pregenerated_hazard_model import ZarrHazardModel from physrisk.hazard_models.core_hazards import get_default_source_paths @@ -19,10 +20,72 @@ class TestRiskModels(unittest.TestCase): def test_risk_indicator_model(self): - source_paths = get_default_source_paths() scenarios = ["rcp8p5"] years = [2050] + assets = self._create_assets() + hazard_model = self._create_hazard_model(scenarios, years) + + model = AssetLevelRiskModel( + hazard_model, get_default_vulnerability_models(), {RealEstateAsset: RealEstateToyRiskMeasures()} + ) + measure_ids_for_asset, definitions = model.populate_measure_definitions(assets) + _, measures = model.calculate_risk_measures(assets, prosp_scens=scenarios, years=years) + + # how to get a score using the MeasureKey + measure = measures[MeasureKey(assets[0], scenarios[0], years[0], RiverineInundation)] + score = measure.score + measure_0 = measure.measure_0 + np.testing.assert_allclose([measure_0], [0.0896857]) + + # packing up the risk measures, e.g. for JSON transmission: + risk_measures = _create_risk_measures(measures, measure_ids_for_asset, definitions, assets, scenarios, years) + # we still have a key, but no asset: + key = RiskMeasureKey( + hazard_type="RiverineInundation", + scenario_id=scenarios[0], + year=str(years[0]), + measure_id=risk_measures.score_based_measure_set_defn.measure_set_id, + ) + item = next(m for m in risk_measures.measures_for_assets if m.key == key) + score2 = item.scores[0] + measure_0_2 = item.measures_0[0] + assert score == score2 + assert measure_0 == measure_0_2 + self.interpret_risk_measures(risk_measures) + + # example_json = risk_measures.model_dump_json() + + def _create_assets(self): + # assets = [ + # RealEstateAsset(lat, lon, location="Asia", type="Buildings/Industrial") + # for lon, lat in zip(TestData.longitudes[0:1], TestData.latitudes[0:1]) + # ] + + assets = [ + RealEstateAsset(TestData.latitudes[0], TestData.longitudes[0], location="Asia", type="Buildings/Industrial") + for i in range(2) + ] + return assets + + def _create_assets_json(self, assets: Sequence[RealEstateAsset]): + assets_dict = { + "items": [ + { + "asset_class": type(asset).__name__, + "type": asset.type, + "location": asset.location, + "longitude": asset.longitude, + "latitude": asset.latitude, + } + for asset in assets + ], + } + return assets_dict + + def _create_hazard_model(self, scenarios, years): + source_paths = get_default_source_paths() + def sp_riverine(scenario, year): return source_paths[RiverineInundation](indicator_id="flood_depth", scenario=scenario, year=year) @@ -60,45 +123,35 @@ def sp_wind(scenario, year): TestData.wind_intensities_2, ) - hazard_model = ZarrHazardModel(source_paths=get_default_source_paths(), store=mocker.store) - - assets = [ - RealEstateAsset(lat, lon, location="Asia", type="Buildings/Industrial") - for lon, lat in zip(TestData.longitudes[0:1], TestData.latitudes[0:1]) - ] + return ZarrHazardModel(source_paths=get_default_source_paths(), store=mocker.store) - assets = [ - RealEstateAsset(TestData.latitudes[0], TestData.longitudes[0], location="Asia", type="Buildings/Industrial") - for i in range(2) - ] + def test_via_requests(self): + scenarios = ["rcp8p5"] + years = [2050] - model = AssetLevelRiskModel( - hazard_model, get_default_vulnerability_models(), {RealEstateAsset: RealEstateToyRiskMeasures()} + assets = self._create_assets() + hazard_model = self._create_hazard_model(scenarios, years) + + request_dict = { + "assets": self._create_assets_json(assets), + "include_asset_level": False, + "include_measures": True, + "include_calc_details": False, + "years": years, + "scenarios": scenarios, + } + + request = requests.AssetImpactRequest(**request_dict) + response = requests._get_asset_impacts( + request, + hazard_model, + vulnerability_models=get_default_vulnerability_models(), ) - measure_ids_for_asset, definitions = model.populate_measure_definitions(assets) - _, measures = model.calculate_risk_measures(assets, prosp_scens=scenarios, years=years) - - # how to get a score using the MeasureKey - measure = measures[MeasureKey(assets[0], scenarios[0], years[0], RiverineInundation)] - score = measure.score - measure_0 = measure.measure_0 - - # packing up the risk measures, e.g. for JSON transmission: - risk_measures = _create_risk_measures(measures, measure_ids_for_asset, definitions, assets, scenarios, years) - # we still have a key, but no asset: - key = RiskMeasureKey( - hazard_type="RiverineInundation", - scenario_id=scenarios[0], - year=str(years[0]), - measure_id=risk_measures.score_based_measure_set_defn.measure_set_id, + res = next( + ma for ma in response.risk_measures.measures_for_assets if ma.key.hazard_type == "RiverineInundation" ) - item = next(m for m in risk_measures.measures_for_assets if m.key == key) - score2 = item.scores[0] - measure_0_2 = item.measures_0[0] - assert score == score2 - assert measure_0 == measure_0_2 - self.interpret_risk_measures(risk_measures) - # example_json = risk_measures.model_dump_json() + np.testing.assert_allclose(res.measures_0, [0.0896856974, 0.0896856974]) + # json_str = json.dumps(response.model_dump(), cls=NumpyArrayEncoder) def interpret_risk_measures(self, risk_measure: RiskMeasures): class Key(NamedTuple): # hashable key for looking up measures