Skip to content

Commit

Permalink
Add SQL function cagg_validate_query
Browse files Browse the repository at this point in the history
With this function is possible to execute the Continuous Aggregate query
validation over an arbitrary query string, without the need to actually
create the Continuous Aggregate.

It can be used, for example, to check for most frequent queries maybe
using `pg_stat_statements`, validate them and check if there are queries
that potenttialy can turned into a Continuous Aggregate.
  • Loading branch information
fabriziomello committed Nov 10, 2023
1 parent 483ddcd commit 2111936
Show file tree
Hide file tree
Showing 16 changed files with 653 additions and 30 deletions.
10 changes: 10 additions & 0 deletions sql/cagg_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,13 @@ CREATE OR REPLACE FUNCTION _timescaledb_functions.invalidation_process_cagg_log(
OUT ret_window_start BIGINT,
OUT ret_window_end BIGINT
) RETURNS RECORD AS '@MODULE_PATHNAME@', 'ts_invalidation_process_cagg_log' LANGUAGE C STRICT VOLATILE;

CREATE OR REPLACE FUNCTION _timescaledb_functions.cagg_validate_query(
query TEXT,
OUT is_valid BOOLEAN,
OUT error_level TEXT,
OUT error_code TEXT,
OUT error_message TEXT,
OUT error_detail TEXT,
OUT error_hint TEXT
) RETURNS RECORD AS '@MODULE_PATHNAME@', 'ts_continuous_agg_validate_query' LANGUAGE C STRICT VOLATILE;
3 changes: 3 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,6 @@ CREATE FUNCTION @[email protected]_chunks(

DROP PROCEDURE IF EXISTS _timescaledb_functions.repair_relation_acls();
DROP FUNCTION IF EXISTS _timescaledb_functions.makeaclitem(regrole, regrole, text, bool);

DROP FUNCTION _timescaledb_functions.continuous_agg_validate_query(TEXT, BOOLEAN, TEXT, TEXT, TEXT, TEXT, TEXT);

64 changes: 64 additions & 0 deletions src/compat/compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -995,4 +995,68 @@ object_ownercheck(Oid classid, Oid objectid, Oid roleid)
#define F_SUM_INT4 2108
#endif


/*
* PG15 refactored elog.c functions and exposed error_severity
*
* https://github.com/postgres/postgres/commit/ac7c80758a7
*/
#if PG15_LT
/*
* error_severity --- get string representing elevel
*
* The string is not localized here, but we mark the strings for translation
* so that callers can invoke _() on the result.
*
* Imported from src/backend/utils/error/elog.c
*/
static inline const char *
error_severity(int elevel)
{
const char *prefix;

switch (elevel)
{
case DEBUG1:
case DEBUG2:
case DEBUG3:
case DEBUG4:
case DEBUG5:
prefix = gettext_noop("DEBUG");
break;
case LOG:
case LOG_SERVER_ONLY:
prefix = gettext_noop("LOG");
break;
case INFO:
prefix = gettext_noop("INFO");
break;
case NOTICE:
prefix = gettext_noop("NOTICE");
break;
case WARNING:
#if PG14_GE
/* https://github.com/postgres/postgres/commit/1f9158ba481 */
case WARNING_CLIENT_ONLY:
#endif
prefix = gettext_noop("WARNING");
break;
case ERROR:
prefix = gettext_noop("ERROR");
break;
case FATAL:
prefix = gettext_noop("FATAL");
break;
case PANIC:
prefix = gettext_noop("PANIC");
break;
default:
prefix = "???";
break;
}

return prefix;
}
#endif

#endif /* TIMESCALEDB_COMPAT_H */
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ CROSSMODULE_WRAPPER(decompress_chunk);
/* continuous aggregate */
CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger);
CROSSMODULE_WRAPPER(continuous_agg_refresh);
CROSSMODULE_WRAPPER(continuous_agg_validate_query);
CROSSMODULE_WRAPPER(invalidation_cagg_log_add_entry);
CROSSMODULE_WRAPPER(invalidation_hyper_log_add_entry);
CROSSMODULE_WRAPPER(drop_dist_ht_invalidation_trigger);
Expand Down Expand Up @@ -494,6 +495,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.continuous_agg_invalidate_raw_ht = continuous_agg_invalidate_raw_ht_all_default,
.continuous_agg_invalidate_mat_ht = continuous_agg_invalidate_mat_ht_all_default,
.continuous_agg_update_options = continuous_agg_update_options_default,
.continuous_agg_validate_query = error_no_default_fn_pg_community,
.invalidation_cagg_log_add_entry = error_no_default_fn_pg_community,
.invalidation_hyper_log_add_entry = error_no_default_fn_pg_community,
.remote_invalidation_log_delete = NULL,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ typedef struct CrossModuleFunctions
int64 start, int64 end);
void (*continuous_agg_update_options)(ContinuousAgg *cagg,
WithClauseResult *with_clause_options);
PGFunction continuous_agg_validate_query;
PGFunction invalidation_cagg_log_add_entry;
PGFunction invalidation_hyper_log_add_entry;
void (*remote_invalidation_log_delete)(int32 raw_hypertable_id,
Expand Down
7 changes: 4 additions & 3 deletions tsl/src/continuous_aggs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/common.c
${CMAKE_CURRENT_SOURCE_DIR}/finalize.c
${CMAKE_CURRENT_SOURCE_DIR}/create.c
${CMAKE_CURRENT_SOURCE_DIR}/finalize.c
${CMAKE_CURRENT_SOURCE_DIR}/insert.c
${CMAKE_CURRENT_SOURCE_DIR}/invalidation_threshold.c
${CMAKE_CURRENT_SOURCE_DIR}/invalidation.c
${CMAKE_CURRENT_SOURCE_DIR}/materialize.c
${CMAKE_CURRENT_SOURCE_DIR}/options.c
${CMAKE_CURRENT_SOURCE_DIR}/refresh.c
${CMAKE_CURRENT_SOURCE_DIR}/repair.c
${CMAKE_CURRENT_SOURCE_DIR}/invalidation.c
${CMAKE_CURRENT_SOURCE_DIR}/invalidation_threshold.c)
${CMAKE_CURRENT_SOURCE_DIR}/utils.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
65 changes: 43 additions & 22 deletions tsl/src/continuous_aggs/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ static void caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id
Oid hypertable_partition_coltype,
int64 hypertable_partition_col_interval,
int32 parent_mat_hypertable_id);
static void caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause,
List *targetList);
static void caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList,
bool is_cagg_create);
static bool cagg_agg_validate(Node *node, void *context);
static bool cagg_query_supported(const Query *query, StringInfo hint, StringInfo detail,
const bool finalized);
Expand Down Expand Up @@ -149,7 +149,8 @@ destroy_union_query(Query *q)
* the `bucket_width` and other fields of `tbinfo`.
*/
static void
caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList)
caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList,
bool is_cagg_create)
{
ListCell *l;
bool found = false;
Expand Down Expand Up @@ -320,22 +321,27 @@ caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *tar
tbinfo->bucket_width_type = width->consttype;

if (width->constisnull)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid bucket width for time bucket function")));

if (width->consttype == INTERVALOID)
{
tbinfo->interval = DatumGetIntervalP(width->constvalue);
if (tbinfo->interval->month != 0)
tbinfo->bucket_width = BUCKET_WIDTH_VARIABLE;
if (is_cagg_create)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid bucket width for time bucket function")));
}

if (tbinfo->bucket_width != BUCKET_WIDTH_VARIABLE)
else
{
/* The bucket size is fixed. */
tbinfo->bucket_width =
ts_interval_value_to_internal(width->constvalue, width->consttype);
if (width->consttype == INTERVALOID)
{
tbinfo->interval = DatumGetIntervalP(width->constvalue);
if (tbinfo->interval && tbinfo->interval->month != 0)
tbinfo->bucket_width = BUCKET_WIDTH_VARIABLE;
}

if (tbinfo->bucket_width != BUCKET_WIDTH_VARIABLE)
{
/* The bucket size is fixed. */
tbinfo->bucket_width =
ts_interval_value_to_internal(width->constvalue, width->consttype);
}
}
}
else
Expand Down Expand Up @@ -618,7 +624,7 @@ get_bucket_width(CAggTimebucketInfo bucket_info)

CAggTimebucketInfo
cagg_validate_query(const Query *query, const bool finalized, const char *cagg_schema,
const char *cagg_name)
const char *cagg_name, const bool is_cagg_create)
{
CAggTimebucketInfo bucket_info = { 0 }, bucket_info_parent;
Cache *hcache;
Expand Down Expand Up @@ -801,7 +807,18 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
int32 parent_mat_hypertable_id = INVALID_HYPERTABLE_ID;

if (rte->relkind == RELKIND_RELATION)
ht = ts_hypertable_cache_get_cache_and_entry(rte->relid, CACHE_FLAG_NONE, &hcache);
{
ht =
ts_hypertable_cache_get_cache_and_entry(rte->relid, CACHE_FLAG_MISSING_OK, &hcache);

if (!ht)
{
ts_cache_release(hcache);
ereport(ERROR,
(errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST),
errmsg("table \"%s\" is not a hypertable", get_rel_name(rte->relid))));
}
}
else
{
cagg_parent = ts_continuous_agg_find_by_relid(rte->relid);
Expand Down Expand Up @@ -845,6 +862,8 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
prev_query = ts_continuous_agg_get_query(cagg_parent);
}

ts_cache_release(hcache);

if (TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(ht))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
Expand Down Expand Up @@ -925,14 +944,15 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
INVALID_HYPERTABLE_ID);
}

ts_cache_release(hcache);

/*
* We need a GROUP By clause with time_bucket on the partitioning
* column of the hypertable
*/
Assert(query->groupClause);
caggtimebucket_validate(&bucket_info, query->groupClause, query->targetList);
caggtimebucket_validate(&bucket_info,
query->groupClause,
query->targetList,
is_cagg_create);
}

/* Check row security settings for the table. */
Expand All @@ -950,7 +970,8 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
Assert(prev_query->groupClause);
caggtimebucket_validate(&bucket_info_parent,
prev_query->groupClause,
prev_query->targetList);
prev_query->targetList,
is_cagg_create);

/* Cannot create cagg with fixed bucket on top of variable bucket. */
if ((bucket_info_parent.bucket_width == BUCKET_WIDTH_VARIABLE &&
Expand Down
7 changes: 5 additions & 2 deletions tsl/src/continuous_aggs/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

#ifndef TIMESCALEDB_TSL_CONTINUOUS_AGGS_COMMON_H
#define TIMESCALEDB_TSL_CONTINUOUS_AGGS_COMMON_H

Expand Down Expand Up @@ -122,7 +123,8 @@ typedef struct AggPartCxt
} while (0);

extern CAggTimebucketInfo cagg_validate_query(const Query *query, const bool finalized,
const char *cagg_schema, const char *cagg_name);
const char *cagg_schema, const char *cagg_name,
const bool is_cagg_create);
extern Query *destroy_union_query(Query *q);
extern Oid relation_oid(Name schema, Name name);
extern void RemoveRangeTableEntries(Query *query);
Expand All @@ -131,4 +133,5 @@ extern Query *build_union_query(CAggTimebucketInfo *tbinfo, int matpartcolno, Qu
extern void mattablecolumninfo_init(MatTableColumnInfo *matcolinfo, List *grouplist);
extern void mattablecolumninfo_addinternal(MatTableColumnInfo *matcolinfo);
extern bool function_allowed_in_cagg_definition(Oid funcid);
#endif

#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_COMMON_H */
6 changes: 4 additions & 2 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,8 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
timebucket_exprinfo = cagg_validate_query((Query *) stmt->into->viewQuery,
finalized,
schema_name,
stmt->into->rel->relname);
stmt->into->rel->relname,
true);
cagg_create(stmt, &viewstmt, (Query *) stmt->query, &timebucket_exprinfo, with_clause_options);

/* Insert the MIN of the time dimension type for the new watermark */
Expand Down Expand Up @@ -1024,7 +1025,8 @@ cagg_flip_realtime_view_definition(ContinuousAgg *agg, Hypertable *mat_ht)
cagg_validate_query(direct_query,
agg->data.finalized,
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name));
NameStr(agg->data.user_view_name),
true);

/* Flip */
agg->data.materialized_only = !agg->data.materialized_only;
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/continuous_aggs/repair.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht, bool force_
cagg_validate_query(direct_query,
finalized,
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name));
NameStr(agg->data.user_view_name),
true);

mattablecolumninfo_init(&mattblinfo, copyObject(direct_query->groupClause));
fqi.finalized = finalized;
Expand Down
Loading

0 comments on commit 2111936

Please sign in to comment.