diff --git a/markovs_household/data/appliance.py b/markovs_household/data/appliance.py index 424d012..9942b4e 100644 --- a/markovs_household/data/appliance.py +++ b/markovs_household/data/appliance.py @@ -3,6 +3,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime, timedelta +from math import ceil from typing import ClassVar, List from markovs_household.data.probability import ( @@ -40,6 +41,10 @@ def get_switch_on_probability(self, date_time: datetime) -> float: def get_operation_time(self) -> timedelta: pass + @abstractmethod + def get_timeseries_for(self, step_size: timedelta) -> List[float]: + pass + @dataclass(frozen=True) class ApplianceTypeLoadProfile(ApplianceType): @@ -52,6 +57,20 @@ class ApplianceTypeLoadProfile(ApplianceType): def get_operation_time(self) -> timedelta: return self.profile.length + def get_timeseries_for(self, step_size: timedelta) -> List[float]: + last = self.profile.values[0] + + result = [last.value] + + for entry in self.profile.values[1:]: + delta_seconds = entry.time - last.time + + if delta_seconds == step_size: + result.append(entry.value) + last = entry + + return result + @dataclass(frozen=True) class ApplianceTypeConstantPower(ApplianceType): @@ -65,6 +84,11 @@ class ApplianceTypeConstantPower(ApplianceType): def get_operation_time(self) -> timedelta: return self.operation_time + def get_timeseries_for(self, step_size: timedelta) -> List[float]: + steps = ceil(self.operation_time / step_size) + + return [self.power] * steps + @dataclass(frozen=True) class Appliance: diff --git a/markovs_household/output/__init__.py b/markovs_household/output/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/markovs_household/output/result_sink.py b/markovs_household/output/result_sink.py new file mode 100644 index 0000000..c538cb5 --- /dev/null +++ b/markovs_household/output/result_sink.py @@ -0,0 +1,37 @@ +import csv +from datetime import datetime, timedelta + +from markovs_household.data.household import Household + + +def create_timeseries(hh: Household, step_size: timedelta) -> dict[datetime, float]: + timeseries: dict[datetime, float] = {} + + for appliance in hh.appliances: + intervals = appliance.get_operation_intervals() + + for interval in intervals: + appliance_series = appliance.appliance_type.get_timeseries_for(step_size) + date = interval.start + + for power in appliance_series: + old_value = timeseries.get(date, 0.0) + timeseries[date] = old_value + power + + date = date + step_size + + # set the value that follows the last of the appliance + # time series values to 0, if it has not been set yet + if timeseries.get(date) is None: + timeseries[date] = 0.0 + + return timeseries + + +def write_timeseries(timeseries: dict[datetime, float], file: str): + with open(file, mode="w", newline="") as csv_file: + csv_writer = csv.writer(csv_file, delimiter=",", quoting=csv.QUOTE_ALL) + csv_writer.writerow(["time", "power"]) + + for time, power in timeseries.items(): + csv_writer.writerow([time, power]) diff --git a/tests/common/test_data.py b/tests/common/test_data.py index 739d333..556eede 100644 --- a/tests/common/test_data.py +++ b/tests/common/test_data.py @@ -1,7 +1,10 @@ import random from datetime import datetime, timedelta -from markovs_household.data.appliance import ApplianceTypeLoadProfile +from markovs_household.data.appliance import ( + ApplianceTypeConstantPower, + ApplianceTypeLoadProfile, +) from markovs_household.data.probability import ( SwitchOnProbabilities, SwitchOnProbabilityKey, @@ -18,22 +21,52 @@ ] random.seed(42) + RANDOM_SWITCH_ON_PROBABILITIES = SwitchOnProbabilities( {key: random.random() for key in SWITCH_ON_PROBABILITY_KEYS} ) LOAD_PROFILE_STOVE = TimeSeries( [ - TimeSeriesEntry(timedelta(), 1), - TimeSeriesEntry(timedelta(seconds=60), 2), - TimeSeriesEntry(timedelta(seconds=120), 1), + TimeSeriesEntry(timedelta(), 2), + TimeSeriesEntry(timedelta(minutes=15), 1), + TimeSeriesEntry(timedelta(minutes=30), 0.8), ], - timedelta(minutes=4), + timedelta(minutes=60), ) + STOVE = ApplianceTypeLoadProfile( category=ApplianceCategory.STOVE, switch_on_probabilities=RANDOM_SWITCH_ON_PROBABILITIES, profile=LOAD_PROFILE_STOVE, ) + +PC = ApplianceTypeConstantPower( + ApplianceCategory.PC, + SwitchOnProbabilities( + {SwitchOnProbabilityKey(Season.SPRING, DayType.WEEKDAY, 0): 0.1} + ), + 0.1, + timedelta(hours=1), +) + +VIDEO_RECORDER = ApplianceTypeConstantPower( + ApplianceCategory.VIDEO_RECORDER, + SwitchOnProbabilities( + {SwitchOnProbabilityKey(Season.SPRING, DayType.WEEKDAY, 0): 0.5} + ), + 0.025, + timedelta(hours=1), +) + +WASHING_MACHINE = ApplianceTypeConstantPower( + ApplianceCategory.WASHING_MACHINE, + SwitchOnProbabilities( + {SwitchOnProbabilityKey(Season.SPRING, DayType.WEEKDAY, 0): 0.5} + ), + 1.0, + timedelta(hours=1, minutes=30), +) + DATE_TIME_KEY_PAIR = ( datetime(year=2021, month=11, day=16, hour=9, minute=0), SwitchOnProbabilityKey(Season.AUTUMN, DayType.WEEKDAY, 36), diff --git a/tests/data/test_appliance.py b/tests/data/test_appliance.py index f7e60e6..cb8e36c 100644 --- a/tests/data/test_appliance.py +++ b/tests/data/test_appliance.py @@ -12,6 +12,7 @@ ) from markovs_household.utils.time import DayType, Season, TimeInterval from tests.common import test_data +from tests.common.test_data import STOVE def test_init_appliance_load_profile(): @@ -52,14 +53,14 @@ def test_init_appliance_constant_profile(): def test_get_switch_on_probability(): - stove = test_data.STOVE (datetime, key) = test_data.DATE_TIME_KEY_PAIR - expected = stove.switch_on_probabilities.get_probability(key) - assert stove.get_switch_on_probability(datetime) == expected + expected = STOVE.switch_on_probabilities.get_probability(key) + assert STOVE.get_switch_on_probability(datetime) == expected + (dt, key) = test_data.DATE_TIME_KEY_PAIR - probabilities = stove.switch_on_probabilities.probabilities + probabilities = STOVE.switch_on_probabilities.probabilities expected = probabilities[SwitchOnProbabilityKey.extract_from_datetime(dt)] - assert stove.get_switch_on_probability(dt) == expected + assert STOVE.get_switch_on_probability(dt) == expected def test_is_turned_on(): diff --git a/tests/input/test_household.py b/tests/input/test_household.py index c0068f4..ac167bc 100644 --- a/tests/input/test_household.py +++ b/tests/input/test_household.py @@ -1,55 +1,18 @@ -from datetime import timedelta from typing import Dict -from markovs_household.data.appliance import ( - Appliance, - ApplianceCategory, - ApplianceType, - ApplianceTypeConstantPower, -) +from markovs_household.data.appliance import Appliance, ApplianceCategory, ApplianceType from markovs_household.data.household import Household, HouseholdIncome, HouseholdType -from markovs_household.data.probability import ( - SwitchOnProbabilities, - SwitchOnProbabilityKey, -) from markovs_household.input.appliances_input import HouseholdAppliancesInput -from markovs_household.utils.time import DayType, Season +from tests.common.test_data import PC, VIDEO_RECORDER, WASHING_MACHINE class TestHouseholdAppliancesInput(HouseholdAppliancesInput): - pc = ApplianceTypeConstantPower( - ApplianceCategory.PC, - SwitchOnProbabilities( - {SwitchOnProbabilityKey(Season.SPRING, DayType.WEEKDAY, 0): 0.1} - ), - 50.0, - timedelta(hours=1), - ) - - video_recorder = ApplianceTypeConstantPower( - ApplianceCategory.VIDEO_RECORDER, - SwitchOnProbabilities( - {SwitchOnProbabilityKey(Season.SPRING, DayType.WEEKDAY, 0): 0.5} - ), - 20.0, - timedelta(hours=1), - ) - - washing_machine = ApplianceTypeConstantPower( - ApplianceCategory.WASHING_MACHINE, - SwitchOnProbabilities( - {SwitchOnProbabilityKey(Season.SPRING, DayType.WEEKDAY, 0): 0.5} - ), - 100.0, - timedelta(hours=1), - ) - @classmethod def get_appliance_types(cls) -> Dict[ApplianceCategory, ApplianceType]: return { - ApplianceCategory.PC: cls.pc, - ApplianceCategory.VIDEO_RECORDER: cls.video_recorder, - ApplianceCategory.WASHING_MACHINE: cls.washing_machine, + ApplianceCategory.PC: PC, + ApplianceCategory.VIDEO_RECORDER: VIDEO_RECORDER, + ApplianceCategory.WASHING_MACHINE: WASHING_MACHINE, } @classmethod @@ -102,7 +65,7 @@ def test_init_household_avg(): assert len(household.appliances) == 2 for appliance in household.appliances: - assert appliance.appliance_type == TestHouseholdAppliancesInput.pc + assert appliance.appliance_type == PC assert appliance.get_operation_intervals() == [] @@ -113,7 +76,7 @@ def test_init_household_by_no_of_inhabitants(): assert len(household.appliances) == 2 or len(household.appliances) == 3 for appliance in household.appliances: - assert appliance.appliance_type == TestHouseholdAppliancesInput.pc + assert appliance.appliance_type == PC assert appliance.get_operation_intervals() == [] @@ -122,11 +85,8 @@ def test_init_household_by_income(): assert len(household.appliances) == 3 - assert ( - Appliance(TestHouseholdAppliancesInput.washing_machine, []) - in household.appliances - ) - assert Appliance(TestHouseholdAppliancesInput.pc, []) in household.appliances + assert Appliance(WASHING_MACHINE, []) in household.appliances + assert Appliance(PC, []) in household.appliances def test_init_household_by_household_type(): @@ -136,12 +96,6 @@ def test_init_household_by_household_type(): assert len(household.appliances) == 5 - assert ( - Appliance(TestHouseholdAppliancesInput.washing_machine, []) - in household.appliances - ) - assert Appliance(TestHouseholdAppliancesInput.pc, []) in household.appliances - assert ( - Appliance(TestHouseholdAppliancesInput.video_recorder, []) - in household.appliances - ) + assert Appliance(WASHING_MACHINE, []) in household.appliances + assert Appliance(PC, []) in household.appliances + assert Appliance(VIDEO_RECORDER, []) in household.appliances diff --git a/tests/output/__init__.py b/tests/output/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/output/test_result_sink.py b/tests/output/test_result_sink.py new file mode 100644 index 0000000..60a05f3 --- /dev/null +++ b/tests/output/test_result_sink.py @@ -0,0 +1,106 @@ +import csv +import os +import tempfile +from datetime import datetime, timedelta + +from markovs_household.data.appliance import Appliance +from markovs_household.data.household import Household +from markovs_household.output.result_sink import create_timeseries, write_timeseries +from markovs_household.utils.time import TimeInterval +from tests.common.test_data import PC, STOVE, VIDEO_RECORDER, WASHING_MACHINE + + +def test_create_timeseries(): + pc_start = datetime(year=2021, month=11, day=11, hour=11) + video_recorder_start = datetime(year=2021, month=11, day=11, hour=11, minute=30) + washing_machine_start = datetime(year=2021, month=11, day=11, hour=16, minute=15) + stove_start = datetime(year=2021, month=11, day=11, hour=16, minute=30) + + appliances = [ + Appliance(PC, [TimeInterval(pc_start, pc_start + PC.get_operation_time())]), + Appliance( + VIDEO_RECORDER, + [ + TimeInterval( + video_recorder_start, + video_recorder_start + VIDEO_RECORDER.get_operation_time(), + ) + ], + ), + Appliance( + WASHING_MACHINE, + [ + TimeInterval( + washing_machine_start, + washing_machine_start + WASHING_MACHINE.get_operation_time(), + ) + ], + ), + Appliance( + STOVE, + [ + TimeInterval( + stove_start, + stove_start + STOVE.get_operation_time(), + ) + ], + ), # stove has a fine resolution of its time series, thus only first entry is used here + ] + hh = Household(appliances) + + timeseries = create_timeseries(hh, timedelta(minutes=15)) + + expected = { + datetime(year=2021, month=11, day=11, hour=11, minute=0): 0.1, + datetime(year=2021, month=11, day=11, hour=11, minute=15): 0.1, + datetime(year=2021, month=11, day=11, hour=11, minute=30): 0.125, + datetime(year=2021, month=11, day=11, hour=11, minute=45): 0.125, + datetime(year=2021, month=11, day=11, hour=12, minute=0): 0.025, + datetime(year=2021, month=11, day=11, hour=12, minute=15): 0.025, + datetime(year=2021, month=11, day=11, hour=12, minute=30): 0.0, + datetime(year=2021, month=11, day=11, hour=16, minute=15): 1.0, + datetime(year=2021, month=11, day=11, hour=16, minute=30): 3.0, + datetime(year=2021, month=11, day=11, hour=16, minute=45): 2.0, + datetime(year=2021, month=11, day=11, hour=17, minute=0): 1.8, + datetime(year=2021, month=11, day=11, hour=17, minute=15): 1.0, + datetime(year=2021, month=11, day=11, hour=17, minute=30): 1.0, + datetime(year=2021, month=11, day=11, hour=17, minute=45): 0.0, + } + assert timeseries == expected + + +def test_write_timeseries(): + # disable automatic deletion, we handle this manually + tmp_file = tempfile.NamedTemporaryFile( + mode="w+", delete=False, prefix="mhh_write_result_", suffix=".csv" + ) + # close it again right away (Windows does not like opening it a second time) + tmp_file.close() + + timeseries = { + datetime(year=2021, month=11, day=11, hour=11, minute=0): 50.0, + datetime(year=2021, month=11, day=11, hour=11, minute=15): 60.0, + datetime(year=2021, month=11, day=11, hour=11, minute=30): 70.0, + datetime(year=2021, month=11, day=11, hour=11, minute=45): 100.0, + datetime(year=2021, month=11, day=11, hour=12, minute=0): 20.0, + datetime(year=2021, month=11, day=11, hour=12, minute=15): 20.0, + datetime(year=2021, month=11, day=11, hour=12, minute=30): 0.0, + } + + # write timeseries + write_timeseries(timeseries, tmp_file.name) + + # open file again in read mode + with open(tmp_file.name, mode="r", newline="") as read_file: + csv_reader = csv.DictReader(read_file, delimiter=",") + assert csv_reader.fieldnames == ["time", "power"] + + read_result: dict[datetime, float] = {} + for row in csv_reader: + datetime_obj = datetime.strptime(row["time"], "%Y-%m-%d %H:%M:%S") + read_result[datetime_obj] = float(row["power"]) + + assert read_result == timeseries + + # clean up: delete temporary file + os.remove(tmp_file.name)