Skip to content

Commit

Permalink
Merge pull request #423 from radim2tej/multiple-roof_id-support
Browse files Browse the repository at this point in the history
Multiple roof_id support
  • Loading branch information
davidusb-geek authored Jan 11, 2025
2 parents a6b5e80 + 67725af commit a5463ed
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 77 deletions.
164 changes: 87 additions & 77 deletions src/emhass/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import pickle
import pickle as cPickle
from datetime import datetime, timedelta
import re
from itertools import zip_longest
from typing import Optional

import numpy as np
Expand Down Expand Up @@ -319,86 +321,94 @@ def get_weather_forecast(
# If weather_forecast_cache, set request days as twice as long to avoid length issues (add a buffer)
if self.params["passed_data"].get("weather_forecast_cache", False):
days_solcast = min((days_solcast * 2), 336)
url = (
"https://api.solcast.com.au/rooftop_sites/"
+ self.retrieve_hass_conf["solcast_rooftop_id"]
+ "/forecasts?hours="
+ str(days_solcast)
)
response = get(url, headers=headers)
"""import bz2 # Uncomment to save a serialized data for tests
import _pickle as cPickle
with bz2.BZ2File("data/test_response_solcast_get_method.pbz2", "w") as f:
cPickle.dump(response, f)"""
# Verify the request passed
if int(response.status_code) == 200:
data = response.json()
elif (
int(response.status_code) == 402
or int(response.status_code) == 429
):
self.logger.error(
"Solcast error: May have exceeded your subscription limit."
)
return False
elif (
int(response.status_code) >= 400
or int(response.status_code) >= 202
# Split `roof_id` into a list (support comma or space as separator)
roof_ids = re.split(r"[,\s]+", self.retrieve_hass_conf["solcast_rooftop_id"].strip())
# Summary list of data
total_data_list = [0] * len(self.forecast_dates)
# Iteration over individual `roof_id`
for roof_id in roof_ids:
url = (
f"https://api.solcast.com.au/rooftop_sites/{roof_id}/forecasts?hours={days_solcast}"
)
response = get(url, headers=headers)
"""import bz2 # Uncomment to save a serialized data for tests
import _pickle as cPickle
with bz2.BZ2File("data/test_response_solcast_get_method.pbz2", "w") as f:
cPickle.dump(response, f)"""
# Verify the request passed
if int(response.status_code) == 200:
data = response.json()
elif (
int(response.status_code) == 402
or int(response.status_code) == 429
):
self.logger.error(
"Solcast error: May have exceeded your subscription limit."
)
return False
elif (
int(response.status_code) >= 400
or (int(response.status_code) >= 202 and int(response.status_code) <= 299)
):
self.logger.error(
"Solcast error: There was a issue with the solcast request, check solcast API key and rooftop ID."
)
self.logger.error(
"Solcast error: Check that your subscription is valid and your network can connect to Solcast."
)
return False
# Data processing for the current `roof_id`
data_list = []
for elm in data["forecasts"]:
data_list.append(
elm["pv_estimate"] * 1000
) # Converting kW to W
# Check if the retrieved data has the correct length
if len(data_list) < len(self.forecast_dates):
self.logger.error(
"Not enough data retrieved from Solcast service, try increasing the time step or use MPC."
)
return False
# Adding the data of the current `roof_id` to the total
total_data_list = [
total + current for total, current in zip_longest(total_data_list, data_list, fillvalue=0)
]
# If runtime weather_forecast_cache is true save forecast result to file as cache
if self.params["passed_data"].get(
"weather_forecast_cache", False
):
self.logger.error(
"Solcast error: There was a issue with the solcast request, check solcast API key and rooftop ID."
)
self.logger.error(
"Solcast error: Check that your subscription is valid and your network can connect to Solcast."
)
return False
data_list = []
for elm in data["forecasts"]:
data_list.append(
elm["pv_estimate"] * 1000
) # Converting kW to W
# Check if the retrieved data has the correct length
if len(data_list) < len(self.forecast_dates):
self.logger.error(
"Not enough data retried from Solcast service, try increasing the time step or use MPC."
# Add x2 forecast periods for cached results. This adds a extra delta_forecast amount of days for a buffer
cached_forecast_dates = self.forecast_dates.union(
pd.date_range(
self.forecast_dates[-1],
periods=(len(self.forecast_dates) + 1),
freq=self.freq,
)[1:]
)
else:
# If runtime weather_forecast_cache is true save forecast result to file as cache
if self.params["passed_data"].get(
"weather_forecast_cache", False
):
# Add x2 forecast periods for cached results. This adds a extra delta_forecast amount of days for a buffer
cached_forecast_dates = self.forecast_dates.union(
pd.date_range(
self.forecast_dates[-1],
periods=(len(self.forecast_dates) + 1),
freq=self.freq,
)[1:]
cache_data_list = total_data_list[0 : len(cached_forecast_dates)]
cache_data_dict = {
"ts": cached_forecast_dates,
"yhat": cache_data_list,
}
data_cache = pd.DataFrame.from_dict(cache_data_dict)
data_cache.set_index("ts", inplace=True)
with open(w_forecast_cache_path, "wb") as file:
cPickle.dump(data_cache, file)
if not os.path.isfile(w_forecast_cache_path):
self.logger.warning(
"Solcast forecast data could not be saved to file."
)
else:
self.logger.info(
"Saved the Solcast results to cache, for later reference."
)
cache_data_list = data_list[0 : len(cached_forecast_dates)]
cache_data_dict = {
"ts": cached_forecast_dates,
"yhat": cache_data_list,
}
data_cache = pd.DataFrame.from_dict(cache_data_dict)
data_cache.set_index("ts", inplace=True)
with open(w_forecast_cache_path, "wb") as file:
cPickle.dump(data_cache, file)
if not os.path.isfile(w_forecast_cache_path):
self.logger.warning(
"Solcast forecast data could not be saved to file."
)
else:
self.logger.info(
"Saved the Solcast results to cache, for later reference."
)
# Trim request results to forecast_dates
data_list = data_list[0 : len(self.forecast_dates)]
data_dict = {"ts": self.forecast_dates, "yhat": data_list}
# Define DataFrame
data = pd.DataFrame.from_dict(data_dict)
# Define index
data.set_index("ts", inplace=True)
# Trim request results to forecast_dates
total_data_list = total_data_list[0 : len(self.forecast_dates)]
data_dict = {"ts": self.forecast_dates, "yhat": total_data_list}
# Define DataFrame
data = pd.DataFrame.from_dict(data_dict)
# Define index
data.set_index("ts", inplace=True)
# Else, notify user to update cache
else:
self.logger.error("Unable to obtain Solcast cache file.")
Expand Down
56 changes: 56 additions & 0 deletions tests/test_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import pandas as pd
import requests_mock
import re

from emhass import utils
from emhass.command_line import set_input_data_dict
Expand Down Expand Up @@ -323,6 +324,61 @@ def test_get_weather_forecast_solcast_method_mock(self):
emhass_conf["data_path"] / "weather_forecast_data.pkl",
)

# Test output weather forecast using Solcast-multiroofs with mock get request data
def test_get_weather_forecast_solcast_multiroofs_method_mock(self):
self.fcst.params = {
"passed_data": {
"weather_forecast_cache": False,
"weather_forecast_cache_only": False,
}
}
self.fcst.retrieve_hass_conf["solcast_api_key"] = "123456"
self.fcst.retrieve_hass_conf["solcast_rooftop_id"] = "111111,222222,333333"
roof_ids = re.split(r"[,\s]+", self.fcst.retrieve_hass_conf["solcast_rooftop_id"].strip())
if os.path.isfile(emhass_conf["data_path"] / "weather_forecast_data.pkl"):
os.rename(
emhass_conf["data_path"] / "weather_forecast_data.pkl",
emhass_conf["data_path"] / "temp_weather_forecast_data.pkl",
)
with requests_mock.mock() as m:
for roof_id in roof_ids:
data = bz2.BZ2File(
str(emhass_conf["data_path"] / "test_response_solcast_get_method.pbz2"),
"rb",
)
data = cPickle.load(data)
get_url = (
f"https://api.solcast.com.au/rooftop_sites/{roof_id}/forecasts?hours=24"
)
m.get(get_url, json=data.json())
df_weather_scrap = self.fcst.get_weather_forecast(method="solcast")
self.assertIsInstance(df_weather_scrap, type(pd.DataFrame()))
self.assertIsInstance(
df_weather_scrap.index, pd.core.indexes.datetimes.DatetimeIndex
)
self.assertIsInstance(
df_weather_scrap.index.dtype, pd.core.dtypes.dtypes.DatetimeTZDtype
)
self.assertEqual(df_weather_scrap.index.tz, self.fcst.time_zone)
self.assertTrue(
self.fcst.start_forecast < ts for ts in df_weather_scrap.index
)
self.assertEqual(
len(df_weather_scrap),
int(
self.optim_conf["delta_forecast_daily"].total_seconds()
/ 3600
/ self.fcst.timeStep
),
)
if os.path.isfile(
emhass_conf["data_path"] / "temp_weather_forecast_data.pkl"
):
os.rename(
emhass_conf["data_path"] / "temp_weather_forecast_data.pkl",
emhass_conf["data_path"] / "weather_forecast_data.pkl",
)

# Test output weather forecast using Forecast.Solar with mock get request data
def test_get_weather_forecast_solarforecast_method_mock(self):
with requests_mock.mock() as m:
Expand Down

0 comments on commit a5463ed

Please sign in to comment.