Skip to content

Commit

Permalink
Merge pull request #34 from NREL/enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
rajeee authored Nov 22, 2023
2 parents 1ac1c67 + cb075f8 commit 93bde69
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 16 deletions.
5 changes: 5 additions & 0 deletions buildstock_query/aggregate_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ def aggregate_annual(self, *,
enduse_selection += [sa.func.approx_percentile(enduse, [0, 0.02, 0.25, 0.5, 0.75, 0.98, 1]).label(
f"{self._bsq._simple_label(enduse.name)}__quartiles") for enduse in enduse_cols]

if params.get_nonzero_count:
enduse_selection += [safunc.sum(sa.case((safunc.coalesce(enduse, 0) != 0, 1), else_=0)
* total_weight).label(f"{self._bsq._simple_label(enduse.name)}__nonzero_units_count")
for enduse in enduse_cols]

grouping_metrics_selction = [safunc.sum(1).label("sample_count"),
safunc.sum(total_weight).label("units_count")]

Expand Down
5 changes: 5 additions & 0 deletions buildstock_query/aggregate_query.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class BuildStockAggregate:
weights: Sequence[Union[str, tuple]] = [],
restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = [],
get_quartiles: bool = False,
get_nonzero_count: bool = False,
) -> str:
...

Expand All @@ -36,6 +37,7 @@ class BuildStockAggregate:
weights: Sequence[Union[str, tuple]] = [],
restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = [],
get_quartiles: bool = False,
get_nonzero_count: bool = False,
) -> pd.DataFrame:
...

Expand All @@ -50,6 +52,7 @@ class BuildStockAggregate:
weights: Sequence[Union[str, tuple]] = [],
restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = [],
get_quartiles: bool = False,
get_nonzero_count: bool = False,
) -> Union[pd.DataFrame, str]:
"""
Aggregates the baseline annual result on select enduses.
Expand Down Expand Up @@ -79,6 +82,8 @@ class BuildStockAggregate:
get_quartiles: If true, return the following quartiles in addition to the sum for each enduses:
[0, 0.02, .25, .5, .75, .98, 1]. The 0% quartile is the minimum and the 100% quartile
is the maximum.
get_nonzero_count: If true, return the number of non-zero rows for each enduses. Useful, for example, for
finding the number of natural gas customers by using natural gas total fuel use as the enduse.
get_query_only: Skips submitting the query to Athena and just returns the query string. Useful for batch
submitting multiple queries or debugging
Expand Down
7 changes: 6 additions & 1 deletion buildstock_query/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self,
region_name: str = 'us-west-2',
execution_history: Optional[str] = None,
skip_reports: bool = False,
athena_query_reuse: bool = True,
) -> None:
"""A class to run Athena queries for BuildStock runs and download results as pandas DataFrame.
Expand All @@ -64,6 +65,9 @@ def __init__(self,
to help stop them. Will use .execution_history if not supplied.
skip_reports (bool, optional): If true, skips report printing during initialization. If False (default),
prints report from `buildstock_query.report_query.BuildStockReport.get_success_report`.
athena_query_reuse (bool, optional): When true, Athena will make use of its built-in 7 day query cache.
When false, it will not. Defaults to True. One use case to set this to False is when you have modified
the underlying s3 data or glue schema and want to make sure you are not using the cached results.
"""
self.params = BSQParams(
workgroup=workgroup,
Expand All @@ -74,7 +78,8 @@ def __init__(self,
building_id_column_name=building_id_column_name,
sample_weight=sample_weight,
region_name=region_name,
execution_history=execution_history
execution_history=execution_history,
athena_query_reuse=athena_query_reuse
)
self.run_params = self.params.get_run_params()
from buildstock_query.report_query import BuildStockReport
Expand Down
10 changes: 5 additions & 5 deletions buildstock_query/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(self, *, params: RunParams
use .execution_history if not supplied.
"""
logger.info(f"Loading {params.table_name} ...")
self.run_params = params
self.workgroup = params.workgroup
self.buildstock_type = params.buildstock_type
self._query_cache: dict[str, pd.DataFrame] = {} # {"query": query_result_df} to cache queries
Expand Down Expand Up @@ -108,6 +109,7 @@ def __init__(self, *, params: RunParams
self.sample_weight = params.sample_weight
self.table_name = params.table_name
self.cache_folder = pathlib.Path(params.cache_folder)
self.athena_query_reuse = params.athena_query_reuse
os.makedirs(self.cache_folder, exist_ok=True)
self._initialize_tables()
self._initialize_book_keeping(params.execution_history)
Expand Down Expand Up @@ -460,7 +462,7 @@ def execute(self, query, run_async: bool = False) -> Union[pd.DataFrame, tuple[L
return "CACHED", CachedFutureDf(self._query_cache[query].copy())
# in case of asynchronous run, you get the execution id and futures object
exe_id, result_future = self._async_conn.cursor().execute(query,
result_reuse_enable=True,
result_reuse_enable=self.athena_query_reuse,
result_reuse_minutes=60 * 24 * 7,
na_values=['']) # type: ignore
exe_id = ExeId(exe_id)
Expand All @@ -480,7 +482,7 @@ def get_pandas(future):
else:
if query not in self._query_cache:
self._query_cache[query] = self._conn.cursor().execute(query,
result_reuse_enable=True,
result_reuse_enable=self.athena_query_reuse,
result_reuse_minutes=60 * 24 * 7,
).as_pandas()
return self._query_cache[query].copy()
Expand Down Expand Up @@ -691,9 +693,7 @@ def get_batch_query_result(self, batch_id: int, *, combine: bool = True, no_bloc
for index, exe_id in enumerate(query_exe_ids):
df = query_futures[index].as_pandas().copy()
if combine:
if len(df) == 0:
df = pd.DataFrame({'query_id': [index]})
else:
if len(df) > 0:
df['query_id'] = index
logger.info(f"Got result from Query [{index}] ({exe_id})")
self._log_execution_cost(exe_id)
Expand Down
1 change: 1 addition & 0 deletions buildstock_query/report_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from buildstock_query.schema.utilities import AnyColType
from pydantic import validate_arguments, Field
from typing_extensions import assert_never

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
FUELS = ['electricity', 'natural_gas', 'propane', 'fuel_oil', 'coal', 'wood_cord', 'wood_pellets']
Expand Down
1 change: 1 addition & 0 deletions buildstock_query/schema/query_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class AnnualQuery(BaseModel):
restrict: Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]] = Field(default_factory=list)
weights: Sequence[Union[str, tuple, AnyColType]] = Field(default_factory=list)
get_quartiles: bool = False
get_nonzero_count: bool = False
get_query_only: bool = False
limit: Optional[int] = None

Expand Down
1 change: 1 addition & 0 deletions buildstock_query/schema/run_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class RunParams(BaseModel):
region_name: str = 'us-west-2'
execution_history: Optional[str] = None
cache_folder: str = '.bsq_cache'
athena_query_reuse: bool = True

class Config:
arbitrary_types_allowed = True
Expand Down
9 changes: 5 additions & 4 deletions buildstock_query/utility_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _aggregate_ts_by_map(self,
limit=params.limit,
split_enduses=params.split_enduses,
get_quartiles=params.get_quartiles,
get_query_only=params.get_query_only,
get_query_only=False if params.split_enduses else True,
)
logger.info(f"Submitting query for {current_ids}")
result = self._agg.aggregate_timeseries(params=new_params)
Expand Down Expand Up @@ -196,8 +196,8 @@ def aggregate_unit_counts_by_eiaid(self, *, eiaid_list: list[str],
return result

@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
def aggregate_annual_by_eiaid(self, enduses: List[str], group_by: Optional[List[str]] = None,
get_query_only: bool = False):
def aggregate_annual_by_eiaid(self, enduses: Sequence[AnyColType], group_by: Optional[List[str]] = None,
get_query_only: bool = False, get_nonzero_count: bool = False):
"""
Aggregates the annual consumption in the baseline table, grouping by all the utilities
Args:
Expand All @@ -218,7 +218,8 @@ def aggregate_annual_by_eiaid(self, enduses: List[str], group_by: Optional[List[
join_list=join_list,
weights=['weight'],
sort=True,
get_query_only=get_query_only)
get_query_only=get_query_only,
get_nonzero_count=get_nonzero_count)
return result

@validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
Expand Down
15 changes: 9 additions & 6 deletions buildstock_query/utility_query.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class BuildStockUtility:

@typing.overload
def aggregate_ts_by_eiaid(self, *,
enduses: Sequence[str],
enduses: Sequence[AnyColType],
eiaid_list: Sequence[str],
get_query_only: Literal[True],
group_by: Sequence[Union[AnyColType, tuple[str, str]]] = Field(default_factory=list),
Expand All @@ -50,7 +50,7 @@ class BuildStockUtility:

@typing.overload
def aggregate_ts_by_eiaid(self, *,
enduses: Sequence[str],
enduses: Sequence[AnyColType],
eiaid_list: Sequence[str],
group_by: Sequence[Union[AnyColType, tuple[str, str]]] = Field(default_factory=list),
upgrade_id: Union[int, str] = '0',
Expand All @@ -70,7 +70,7 @@ class BuildStockUtility:

@typing.overload
def aggregate_ts_by_eiaid(self, *,
enduses: Sequence[str],
enduses: Sequence[AnyColType],
eiaid_list: Sequence[str],
get_query_only: bool,
group_by: Sequence[Union[AnyColType, tuple[str, str]]] = Field(default_factory=list),
Expand Down Expand Up @@ -144,22 +144,25 @@ class BuildStockUtility:
...

@typing.overload
def aggregate_annual_by_eiaid(self, enduses: List[str],
def aggregate_annual_by_eiaid(self, enduses: Sequence[AnyColType],
get_query_only: Literal[True],
group_by: Sequence[Union[AnyColType, tuple[str, str]]] = Field(default_factory=list),
get_nonzero_count: bool = False,
) -> str:
...

@typing.overload
def aggregate_annual_by_eiaid(self, enduses: List[str],
def aggregate_annual_by_eiaid(self, enduses: Sequence[AnyColType],
group_by: Sequence[Union[AnyColType, tuple[str, str]]] = Field(default_factory=list),
get_nonzero_count: bool = False,
get_query_only: Literal[False] = False) -> pd.DataFrame:
...

@typing.overload
def aggregate_annual_by_eiaid(self, enduses: List[str],
def aggregate_annual_by_eiaid(self, enduses: Sequence[AnyColType],
get_query_only: bool,
group_by: Sequence[Union[AnyColType, tuple[str, str]]] = Field(default_factory=list),
get_nonzero_count: bool = False,
) -> Union[str, pd.DataFrame]:
...

Expand Down

0 comments on commit 93bde69

Please sign in to comment.