diff --git a/.unreleased/feature_6307 b/.unreleased/feature_6307 new file mode 100644 index 00000000000..3d2381cfac1 --- /dev/null +++ b/.unreleased/feature_6307 @@ -0,0 +1 @@ +Implements: #6307 Add SQL function cagg_validate_query diff --git a/sql/cagg_utils.sql b/sql/cagg_utils.sql index 872ca55c2b8..166d27ac9a6 100644 --- a/sql/cagg_utils.sql +++ b/sql/cagg_utils.sql @@ -120,3 +120,13 @@ CREATE OR REPLACE FUNCTION _timescaledb_functions.invalidation_process_cagg_log( OUT ret_window_start BIGINT, OUT ret_window_end BIGINT ) RETURNS RECORD AS '@MODULE_PATHNAME@', 'ts_invalidation_process_cagg_log' LANGUAGE C STRICT VOLATILE; + +CREATE OR REPLACE FUNCTION _timescaledb_functions.cagg_validate_query( + query TEXT, + OUT is_valid BOOLEAN, + OUT error_level TEXT, + OUT error_code TEXT, + OUT error_message TEXT, + OUT error_detail TEXT, + OUT error_hint TEXT +) RETURNS RECORD AS '@MODULE_PATHNAME@', 'ts_continuous_agg_validate_query' LANGUAGE C STRICT VOLATILE; diff --git a/sql/updates/reverse-dev.sql b/sql/updates/reverse-dev.sql index 0b638966edf..eb2ae9ef109 100644 --- a/sql/updates/reverse-dev.sql +++ b/sql/updates/reverse-dev.sql @@ -194,3 +194,6 @@ CREATE FUNCTION @extschema@.show_chunks( DROP PROCEDURE IF EXISTS _timescaledb_functions.repair_relation_acls(); DROP FUNCTION IF EXISTS _timescaledb_functions.makeaclitem(regrole, regrole, text, bool); + +DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_validate_query(TEXT, BOOLEAN, TEXT, TEXT, TEXT, TEXT, TEXT); + diff --git a/src/compat/compat.h b/src/compat/compat.h index 6a6c5cdfc42..8452342ada9 100644 --- a/src/compat/compat.h +++ b/src/compat/compat.h @@ -995,4 +995,69 @@ object_ownercheck(Oid classid, Oid objectid, Oid roleid) #define F_SUM_INT4 2108 #endif +/* + * PG15 refactored elog.c functions and exposed error_severity + * but previous versions don't have it exposed, so imported it + * from Postgres source code. + * + * https://github.com/postgres/postgres/commit/ac7c80758a7 + */ +#if PG15_LT +/* + * error_severity --- get string representing elevel + * + * The string is not localized here, but we mark the strings for translation + * so that callers can invoke _() on the result. + * + * Imported from src/backend/utils/error/elog.c + */ +static inline const char * +error_severity(int elevel) +{ + const char *prefix; + + switch (elevel) + { + case DEBUG1: + case DEBUG2: + case DEBUG3: + case DEBUG4: + case DEBUG5: + prefix = gettext_noop("DEBUG"); + break; + case LOG: + case LOG_SERVER_ONLY: + prefix = gettext_noop("LOG"); + break; + case INFO: + prefix = gettext_noop("INFO"); + break; + case NOTICE: + prefix = gettext_noop("NOTICE"); + break; + case WARNING: +#if PG14_GE + /* https://github.com/postgres/postgres/commit/1f9158ba481 */ + case WARNING_CLIENT_ONLY: +#endif + prefix = gettext_noop("WARNING"); + break; + case ERROR: + prefix = gettext_noop("ERROR"); + break; + case FATAL: + prefix = gettext_noop("FATAL"); + break; + case PANIC: + prefix = gettext_noop("PANIC"); + break; + default: + prefix = "???"; + break; + } + + return prefix; +} +#endif + #endif /* TIMESCALEDB_COMPAT_H */ diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 1582ad37435..fed61928cf3 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -88,6 +88,7 @@ CROSSMODULE_WRAPPER(decompress_chunk); /* continuous aggregate */ CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger); CROSSMODULE_WRAPPER(continuous_agg_refresh); +CROSSMODULE_WRAPPER(continuous_agg_validate_query); CROSSMODULE_WRAPPER(invalidation_cagg_log_add_entry); CROSSMODULE_WRAPPER(invalidation_hyper_log_add_entry); CROSSMODULE_WRAPPER(drop_dist_ht_invalidation_trigger); @@ -494,6 +495,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .continuous_agg_invalidate_raw_ht = continuous_agg_invalidate_raw_ht_all_default, .continuous_agg_invalidate_mat_ht = continuous_agg_invalidate_mat_ht_all_default, .continuous_agg_update_options = continuous_agg_update_options_default, + .continuous_agg_validate_query = error_no_default_fn_pg_community, .invalidation_cagg_log_add_entry = error_no_default_fn_pg_community, .invalidation_hyper_log_add_entry = error_no_default_fn_pg_community, .remote_invalidation_log_delete = NULL, diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 51fdf32e6c8..a3f0613a30e 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -120,6 +120,7 @@ typedef struct CrossModuleFunctions int64 start, int64 end); void (*continuous_agg_update_options)(ContinuousAgg *cagg, WithClauseResult *with_clause_options); + PGFunction continuous_agg_validate_query; PGFunction invalidation_cagg_log_add_entry; PGFunction invalidation_hyper_log_add_entry; void (*remote_invalidation_log_delete)(int32 raw_hypertable_id, diff --git a/tsl/src/continuous_aggs/CMakeLists.txt b/tsl/src/continuous_aggs/CMakeLists.txt index 72e852123ac..11091cdf188 100644 --- a/tsl/src/continuous_aggs/CMakeLists.txt +++ b/tsl/src/continuous_aggs/CMakeLists.txt @@ -1,12 +1,13 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/common.c - ${CMAKE_CURRENT_SOURCE_DIR}/finalize.c ${CMAKE_CURRENT_SOURCE_DIR}/create.c + ${CMAKE_CURRENT_SOURCE_DIR}/finalize.c ${CMAKE_CURRENT_SOURCE_DIR}/insert.c + ${CMAKE_CURRENT_SOURCE_DIR}/invalidation_threshold.c + ${CMAKE_CURRENT_SOURCE_DIR}/invalidation.c ${CMAKE_CURRENT_SOURCE_DIR}/materialize.c ${CMAKE_CURRENT_SOURCE_DIR}/options.c ${CMAKE_CURRENT_SOURCE_DIR}/refresh.c ${CMAKE_CURRENT_SOURCE_DIR}/repair.c - ${CMAKE_CURRENT_SOURCE_DIR}/invalidation.c - ${CMAKE_CURRENT_SOURCE_DIR}/invalidation_threshold.c) + ${CMAKE_CURRENT_SOURCE_DIR}/utils.c) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/src/continuous_aggs/common.c b/tsl/src/continuous_aggs/common.c index cd784453c3d..d0a5e01e162 100644 --- a/tsl/src/continuous_aggs/common.c +++ b/tsl/src/continuous_aggs/common.c @@ -12,8 +12,8 @@ static void caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id Oid hypertable_partition_coltype, int64 hypertable_partition_col_interval, int32 parent_mat_hypertable_id); -static void caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, - List *targetList); +static void caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList, + bool is_cagg_create); static bool cagg_agg_validate(Node *node, void *context); static bool cagg_query_supported(const Query *query, StringInfo hint, StringInfo detail, const bool finalized); @@ -149,7 +149,8 @@ destroy_union_query(Query *q) * the `bucket_width` and other fields of `tbinfo`. */ static void -caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList) +caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList, + bool is_cagg_create) { ListCell *l; bool found = false; @@ -320,22 +321,27 @@ caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *tar tbinfo->bucket_width_type = width->consttype; if (width->constisnull) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid bucket width for time bucket function"))); - - if (width->consttype == INTERVALOID) { - tbinfo->interval = DatumGetIntervalP(width->constvalue); - if (tbinfo->interval->month != 0) - tbinfo->bucket_width = BUCKET_WIDTH_VARIABLE; + if (is_cagg_create) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid bucket width for time bucket function"))); } - - if (tbinfo->bucket_width != BUCKET_WIDTH_VARIABLE) + else { - /* The bucket size is fixed. */ - tbinfo->bucket_width = - ts_interval_value_to_internal(width->constvalue, width->consttype); + if (width->consttype == INTERVALOID) + { + tbinfo->interval = DatumGetIntervalP(width->constvalue); + if (tbinfo->interval && tbinfo->interval->month != 0) + tbinfo->bucket_width = BUCKET_WIDTH_VARIABLE; + } + + if (tbinfo->bucket_width != BUCKET_WIDTH_VARIABLE) + { + /* The bucket size is fixed. */ + tbinfo->bucket_width = + ts_interval_value_to_internal(width->constvalue, width->consttype); + } } } else @@ -618,10 +624,9 @@ get_bucket_width(CAggTimebucketInfo bucket_info) CAggTimebucketInfo cagg_validate_query(const Query *query, const bool finalized, const char *cagg_schema, - const char *cagg_name) + const char *cagg_name, const bool is_cagg_create) { CAggTimebucketInfo bucket_info = { 0 }, bucket_info_parent; - Cache *hcache; Hypertable *ht = NULL, *ht_parent = NULL; RangeTblRef *rtref = NULL, *rtref_other = NULL; RangeTblEntry *rte = NULL, *rte_other = NULL; @@ -799,15 +804,27 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s { const Dimension *part_dimension = NULL; int32 parent_mat_hypertable_id = INVALID_HYPERTABLE_ID; + Cache *hcache = ts_hypertable_cache_pin(); if (rte->relkind == RELKIND_RELATION) - ht = ts_hypertable_cache_get_cache_and_entry(rte->relid, CACHE_FLAG_NONE, &hcache); + { + ht = ts_hypertable_cache_get_entry(hcache, rte->relid, CACHE_FLAG_MISSING_OK); + + if (!ht) + { + ts_cache_release(hcache); + ereport(ERROR, + (errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST), + errmsg("table \"%s\" is not a hypertable", get_rel_name(rte->relid)))); + } + } else { cagg_parent = ts_continuous_agg_find_by_relid(rte->relid); if (!cagg_parent) { + ts_cache_release(hcache); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("invalid continuous aggregate query"), @@ -817,6 +834,7 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s if (!ContinuousAggIsFinalized(cagg_parent)) { + ts_cache_release(hcache); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("old format of continuous aggregate is not supported"), @@ -827,7 +845,6 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s } parent_mat_hypertable_id = cagg_parent->data.mat_hypertable_id; - hcache = ts_hypertable_cache_pin(); ht = ts_hypertable_cache_get_entry_by_id(hcache, cagg_parent->data.mat_hypertable_id); /* If parent cagg is hierarchical then we should get the matht otherwise the rawht. */ @@ -846,9 +863,12 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s } if (TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(ht)) + { + ts_cache_release(hcache); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("hypertable is an internal compressed hypertable"))); + } if (rte->relkind == RELKIND_RELATION) { @@ -861,6 +881,7 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s const ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(ht->fd.id); Assert(cagg != NULL); + ts_cache_release(hcache); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("hypertable is a continuous aggregate materialization table"), @@ -882,10 +903,13 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s * below, along with any other fallout. */ if (part_dimension->partitioning != NULL) + { + ts_cache_release(hcache); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("custom partitioning functions not supported" " with continuous aggregates"))); + } if (IS_INTEGER_TYPE(ts_dimension_get_partition_type(part_dimension)) && rte->relkind == RELKIND_RELATION) @@ -894,6 +918,8 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s const char *funcname = NameStr(part_dimension->fd.integer_now_func); if (strlen(funcschema) == 0 || strlen(funcname) == 0) + { + ts_cache_release(hcache); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("custom time function required on hypertable \"%s\"", @@ -901,6 +927,7 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s errdetail("An integer-based hypertable requires a custom time function to " "support continuous aggregates."), errhint("Set a custom time function on the hypertable."))); + } } caggtimebucketinfo_init(&bucket_info, @@ -932,7 +959,10 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s * column of the hypertable */ Assert(query->groupClause); - caggtimebucket_validate(&bucket_info, query->groupClause, query->targetList); + caggtimebucket_validate(&bucket_info, + query->groupClause, + query->targetList, + is_cagg_create); } /* Check row security settings for the table. */ @@ -950,7 +980,8 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s Assert(prev_query->groupClause); caggtimebucket_validate(&bucket_info_parent, prev_query->groupClause, - prev_query->targetList); + prev_query->targetList, + is_cagg_create); /* Cannot create cagg with fixed bucket on top of variable bucket. */ if ((bucket_info_parent.bucket_width == BUCKET_WIDTH_VARIABLE && diff --git a/tsl/src/continuous_aggs/common.h b/tsl/src/continuous_aggs/common.h index a79e6bba190..c57d09231a1 100644 --- a/tsl/src/continuous_aggs/common.h +++ b/tsl/src/continuous_aggs/common.h @@ -3,6 +3,7 @@ * Please see the included NOTICE for copyright information and * LICENSE-TIMESCALE for a copy of the license. */ + #ifndef TIMESCALEDB_TSL_CONTINUOUS_AGGS_COMMON_H #define TIMESCALEDB_TSL_CONTINUOUS_AGGS_COMMON_H @@ -122,7 +123,8 @@ typedef struct AggPartCxt } while (0); extern CAggTimebucketInfo cagg_validate_query(const Query *query, const bool finalized, - const char *cagg_schema, const char *cagg_name); + const char *cagg_schema, const char *cagg_name, + const bool is_cagg_create); extern Query *destroy_union_query(Query *q); extern Oid relation_oid(Name schema, Name name); extern void RemoveRangeTableEntries(Query *query); @@ -131,4 +133,5 @@ extern Query *build_union_query(CAggTimebucketInfo *tbinfo, int matpartcolno, Qu extern void mattablecolumninfo_init(MatTableColumnInfo *matcolinfo, List *grouplist); extern void mattablecolumninfo_addinternal(MatTableColumnInfo *matcolinfo); extern bool function_allowed_in_cagg_definition(Oid funcid); -#endif + +#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_COMMON_H */ diff --git a/tsl/src/continuous_aggs/create.c b/tsl/src/continuous_aggs/create.c index 72fa1bb846e..8c8c63af0c5 100644 --- a/tsl/src/continuous_aggs/create.c +++ b/tsl/src/continuous_aggs/create.c @@ -926,7 +926,8 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void * timebucket_exprinfo = cagg_validate_query((Query *) stmt->into->viewQuery, finalized, schema_name, - stmt->into->rel->relname); + stmt->into->rel->relname, + true); cagg_create(stmt, &viewstmt, (Query *) stmt->query, &timebucket_exprinfo, with_clause_options); /* Insert the MIN of the time dimension type for the new watermark */ @@ -1024,7 +1025,8 @@ cagg_flip_realtime_view_definition(ContinuousAgg *agg, Hypertable *mat_ht) cagg_validate_query(direct_query, agg->data.finalized, NameStr(agg->data.user_view_schema), - NameStr(agg->data.user_view_name)); + NameStr(agg->data.user_view_name), + true); /* Flip */ agg->data.materialized_only = !agg->data.materialized_only; diff --git a/tsl/src/continuous_aggs/repair.c b/tsl/src/continuous_aggs/repair.c index 87e2a129e9e..984400673bc 100644 --- a/tsl/src/continuous_aggs/repair.c +++ b/tsl/src/continuous_aggs/repair.c @@ -112,7 +112,8 @@ cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht, bool force_ cagg_validate_query(direct_query, finalized, NameStr(agg->data.user_view_schema), - NameStr(agg->data.user_view_name)); + NameStr(agg->data.user_view_name), + true); mattablecolumninfo_init(&mattblinfo, copyObject(direct_query->groupClause)); fqi.finalized = finalized; diff --git a/tsl/src/continuous_aggs/utils.c b/tsl/src/continuous_aggs/utils.c new file mode 100644 index 00000000000..be92349a48a --- /dev/null +++ b/tsl/src/continuous_aggs/utils.c @@ -0,0 +1,153 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ + +#include "utils.h" + +static void +fill_text_values(int index, Datum *values, bool *nulls, char *value) +{ + if (value) + { + values[index] = PointerGetDatum(cstring_to_text(value)); + nulls[index] = false; + } + else + nulls[index] = true; +} + +enum +{ + Anum_cagg_validate_query_valid = 1, + Anum_cagg_validate_query_error_level, + Anum_cagg_validate_query_error_code, + Anum_cagg_validate_query_error_message, + Anum_cagg_validate_query_error_detail, + Anum_cagg_validate_query_error_hint, + _Anum_cagg_validate_query_max +}; + +static Datum +create_cagg_validate_query_datum(TupleDesc tupdesc, const bool is_valid_query, + const ErrorData *edata) +{ + Datum values[_Anum_cagg_validate_query_max] = { 0 }; + bool nulls[_Anum_cagg_validate_query_max] = { false }; + HeapTuple tuple; + + tupdesc = BlessTupleDesc(tupdesc); + + values[AttrNumberGetAttrOffset(Anum_cagg_validate_query_valid)] = BoolGetDatum(is_valid_query); + + fill_text_values(AttrNumberGetAttrOffset(Anum_cagg_validate_query_error_level), + values, + nulls, + edata->elevel > 0 ? (char *) error_severity(edata->elevel) : NULL); + fill_text_values(AttrNumberGetAttrOffset(Anum_cagg_validate_query_error_code), + values, + nulls, + edata->sqlerrcode > 0 ? unpack_sql_state(edata->sqlerrcode) : NULL); + fill_text_values(AttrNumberGetAttrOffset(Anum_cagg_validate_query_error_message), + values, + nulls, + edata->message); + fill_text_values(AttrNumberGetAttrOffset(Anum_cagg_validate_query_error_detail), + values, + nulls, + edata->detail); + fill_text_values(AttrNumberGetAttrOffset(Anum_cagg_validate_query_error_hint), + values, + nulls, + edata->hint); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + return HeapTupleGetDatum(tuple); +} + +Datum +continuous_agg_validate_query(PG_FUNCTION_ARGS) +{ + text *query_text = PG_GETARG_TEXT_P(0); + char *sql; + bool is_valid_query = false; + Datum datum_sql; + TupleDesc tupdesc; + ErrorData *edata; + MemoryContext oldcontext = CurrentMemoryContext; + + /* Change $1, $2 ... placeholders to NULL constant. This is necessary to make parser happy */ + sql = text_to_cstring(query_text); + elog(DEBUG1, "sql: %s", sql); + + datum_sql = CStringGetTextDatum(sql); + datum_sql = DirectFunctionCall4Coll(textregexreplace, + C_COLLATION_OID, + datum_sql, + CStringGetTextDatum("\\$[0-9]+"), + CStringGetTextDatum("NULL"), + CStringGetTextDatum("g")); + sql = text_to_cstring(DatumGetTextP(datum_sql)); + elog(DEBUG1, "sql: %s", sql); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "function returning record called in context that cannot accept type record"); + + PG_TRY(); + { + List *tree; + Node *node; + RawStmt *rawstmt; + ParseState *pstate; + Query *query; + + edata = (ErrorData *) palloc0(sizeof(ErrorData)); + edata->message = NULL; + edata->detail = NULL; + edata->hint = NULL; + + tree = pg_parse_query(sql); + + if (list_length(tree) > 1) + { + edata->elevel = WARNING; + edata->sqlerrcode = ERRCODE_FEATURE_NOT_SUPPORTED; + edata->message = "multiple statements are not supported"; + + PG_RETURN_DATUM(create_cagg_validate_query_datum(tupdesc, is_valid_query, edata)); + } + + node = linitial(tree); + rawstmt = (RawStmt *) node; + pstate = make_parsestate(NULL); + + Assert(IsA(node, RawStmt)); + + if (!IsA(rawstmt->stmt, SelectStmt)) + { + edata->elevel = WARNING; + edata->sqlerrcode = ERRCODE_FEATURE_NOT_SUPPORTED; + edata->message = "only select statements are supported"; + + PG_RETURN_DATUM(create_cagg_validate_query_datum(tupdesc, is_valid_query, edata)); + } + + pstate->p_sourcetext = sql; + query = transformTopLevelStmt(pstate, rawstmt); + free_parsestate(pstate); + + (void) cagg_validate_query(query, true, "public", "cagg_validate", false); + is_valid_query = true; + } + PG_CATCH(); + { + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + } + PG_END_TRY(); + + PG_RETURN_DATUM(create_cagg_validate_query_datum(tupdesc, is_valid_query, edata)); +} diff --git a/tsl/src/continuous_aggs/utils.h b/tsl/src/continuous_aggs/utils.h new file mode 100644 index 00000000000..b70e3e77552 --- /dev/null +++ b/tsl/src/continuous_aggs/utils.h @@ -0,0 +1,22 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ + +#ifndef TIMESCALEDB_TSL_CONTINUOUS_AGGS_UTILS_H +#define TIMESCALEDB_TSL_CONTINUOUS_AGGS_UTILS_H + +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "compat/compat.h" + +extern Datum continuous_agg_validate_query(PG_FUNCTION_ARGS); + +#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_UTILS_H */ diff --git a/tsl/src/init.c b/tsl/src/init.c index 0ccf0b294ad..598133f7c00 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -31,6 +31,7 @@ #include "continuous_aggs/refresh.h" #include "continuous_aggs/invalidation.h" #include "continuous_aggs/repair.h" +#include "continuous_aggs/utils.h" #include "cross_module_fn.h" #include "nodes/data_node_dispatch.h" #include "data_node.h" @@ -159,6 +160,7 @@ CrossModuleFunctions tsl_cm_functions = { .continuous_agg_invalidate_raw_ht = continuous_agg_invalidate_raw_ht, .continuous_agg_invalidate_mat_ht = continuous_agg_invalidate_mat_ht, .continuous_agg_update_options = continuous_agg_update_options, + .continuous_agg_validate_query = continuous_agg_validate_query, .invalidation_cagg_log_add_entry = tsl_invalidation_cagg_log_add_entry, .invalidation_hyper_log_add_entry = tsl_invalidation_hyper_log_add_entry, .remote_invalidation_log_delete = remote_invalidation_log_delete, diff --git a/tsl/test/expected/cagg_utils.out b/tsl/test/expected/cagg_utils.out new file mode 100644 index 00000000000..6c932cf127f --- /dev/null +++ b/tsl/test/expected/cagg_utils.out @@ -0,0 +1,238 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +SET search_path TO public, _timescaledb_functions; +CREATE TABLE devices ( + id INTEGER, + name TEXT +); +CREATE TABLE metrics ( + "time" TIMESTAMPTZ NOT NULL, + device_id INTEGER, + value FLOAT8 +); +SELECT table_name FROM create_hypertable('metrics', 'time'); + table_name +------------ + metrics +(1 row) + +-- fixed bucket size +CREATE MATERIALIZED VIEW metrics_by_hour WITH (timescaledb.continuous) AS +SELECT time_bucket('1 hour', time) AS bucket, count(*) +FROM metrics +GROUP BY 1 +WITH NO DATA; +-- variable bucket size +CREATE MATERIALIZED VIEW metrics_by_month WITH (timescaledb.continuous) AS +SELECT time_bucket('1 month', bucket) AS bucket, sum(count) AS count +FROM metrics_by_hour +GROUP BY 1 +WITH NO DATA; +-- +-- ERRORS +-- +-- return NULL +SELECT * FROM cagg_validate_query(NULL); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------+--------------+------------ + | | | | | +(1 row) + +-- syntax error +SELECT * FROM cagg_validate_query('blahh'); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------------------------+--------------+------------ + f | ERROR | 42601 | syntax error at or near "blahh" | | +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket(blahh "time") FROM metrics GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+----------------------------------+--------------+------------ + f | ERROR | 42601 | syntax error at or near ""time"" | | +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour' "time") FROM metrics GROUP BY $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+----------------------------------+--------------+------------ + f | ERROR | 42601 | syntax error at or near ""time"" | | +(1 row) + +-- multiple statements are not allowed +SELECT * FROM cagg_validate_query($$ SELECT 1; SELECT 2; $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------------------------------+--------------+------------ + f | WARNING | 0A000 | multiple statements are not supported | | +(1 row) + +-- only SELECT queries are allowed +SELECT * FROM cagg_validate_query($$ DELETE FROM pg_catalog.pg_class $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+--------------------------------------+--------------+------------ + f | WARNING | 0A000 | only select statements are supported | | +(1 row) + +SELECT * FROM cagg_validate_query($$ UPDATE pg_catalog.pg_class SET relkind = 'r' $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+--------------------------------------+--------------+------------ + f | WARNING | 0A000 | only select statements are supported | | +(1 row) + +SELECT * FROM cagg_validate_query($$ DELETE FROM pg_catalog.pg_class $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+--------------------------------------+--------------+------------ + f | WARNING | 0A000 | only select statements are supported | | +(1 row) + +SELECT * FROM cagg_validate_query($$ VACUUM (ANALYZE) $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+--------------------------------------+--------------+------------ + f | WARNING | 0A000 | only select statements are supported | | +(1 row) + +-- invalid queries +SELECT * FROM cagg_validate_query($$ SELECT 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+------------------------------------+--------------+---------------------------------- + f | ERROR | 0A000 | invalid continuous aggregate query | | FROM clause missing in the query +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT 1 FROM pg_catalog.pg_class $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+------------------------------------+--------------+--------------------------------------------------------------------------------- + f | ERROR | 0A000 | invalid continuous aggregate query | | Include at least one aggregate function and a GROUP BY clause with time bucket. +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT relkind, count(*) FROM pg_catalog.pg_class GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+--------------------------------------+--------------+------------ + f | ERROR | TS001 | table "pg_class" is not a hypertable | | +(1 row) + +-- time_bucket with offset is not allowed +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", "offset" => '-1 minute'::interval), count(*) FROM metrics GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------------------------------------------------------------+--------------+------------ + f | ERROR | XX000 | continuous aggregate view must include a valid time bucket function | | +(1 row) + +-- time_bucket with origin is not allowed +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", origin => '2023-01-01'::timestamptz), count(*) FROM metrics GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------------------------------------------------------------+--------------+------------ + f | ERROR | XX000 | continuous aggregate view must include a valid time bucket function | | +(1 row) + +-- time_bucket with origin is not allowed +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", origin => '2023-01-01'::timestamptz), count(*) FROM metrics GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------------------------------------------------------------+--------------+------------ + f | ERROR | XX000 | continuous aggregate view must include a valid time bucket function | | +(1 row) + +-- time_bucket_gapfill is not allowed +SELECT * FROM cagg_validate_query($$ SELECT time_bucket_gapfill('1 hour', "time"), count(*) FROM metrics GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------------------------------------------------------------+--------------+------------ + f | ERROR | XX000 | continuous aggregate view must include a valid time bucket function | | +(1 row) + +-- invalid join queries +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', a."time"), count(*) FROM metrics a, metrics b GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+-----------------------------------+-------------------------------------------------------------------------+------------ + f | ERROR | 0A000 | invalid continuous aggregate view | Multiple hypertables or normal tables are not supported in FROM clause. | +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), count(*) FROM metrics, devices a, devices b GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------------------------------------------------------------------------------------------+--------------+------------ + f | ERROR | 0A000 | only two tables with one hypertable and one normal tableare allowed in continuous aggregate view | | +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), device_id, count(*) FROM metrics LEFT JOIN devices ON id = device_id GROUP BY 1, 2 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------------------------------------------------+--------------+------------ + f | ERROR | 0A000 | only inner joins are supported in continuous aggregates | | +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), device_id, count(*) FROM metrics JOIN devices ON id = device_id AND name = 'foo' GROUP BY 1, 2 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+-----------------------------------+----------------------------------------+------------------------------------------------------------------ + f | ERROR | 0A000 | invalid continuous aggregate view | Unsupported expression in join clause. | Only equality conditions are supported in continuous aggregates. +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), device_id, count(*) FROM metrics JOIN devices ON id < device_id GROUP BY 1, 2 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+-----------------------------------+------------------------------------------------------------------+------------ + f | ERROR | 0A000 | invalid continuous aggregate view | Only equality conditions are supported in continuous aggregates. | +(1 row) + +-- invalid caggs on caggs +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('60 days', bucket) AS bucket, sum(count) AS count FROM metrics_by_month GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+------------ + f | ERROR | 0A000 | cannot create continuous aggregate with fixed-width bucket on top of one using variable-width bucket | Continuous aggregate with a fixed time bucket width (e.g. 61 days) cannot be created on top of one using variable time bucket width (e.g. 1 month).+| + | | | | The variance can lead to the fixed width one not being a multiple of the variable width one. | +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 day 33 minutes', bucket) AS bucket, sum(count) AS count FROM metrics_by_hour GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+-------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------+------------ + f | ERROR | 0A000 | cannot create continuous aggregate with incompatible bucket width | Time bucket width of "public.cagg_validate" [@ 1 day 33 mins] should be multiple of the time bucket width of "public.metrics_by_hour" [@ 1 hour]. | +(1 row) + +-- +-- OK +-- +-- valid join queries +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), device_id, count(*) FROM metrics JOIN devices ON id = device_id GROUP BY 1, 2 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------+--------------+------------ + t | | | | | +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), device_id, count(*) FROM metrics JOIN devices ON id = device_id WHERE devices.name = 'foo' GROUP BY 1, 2 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------+--------------+------------ + t | | | | | +(1 row) + +-- valid queries +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), count(*) FROM metrics GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------+--------------+------------ + t | | | | | +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", timezone => 'UTC'), count(*) FROM metrics GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------+--------------+------------ + t | | | | | +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", timezone => 'UTC'), count(*) FROM metrics GROUP BY 1 HAVING count(*) > 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------+--------------+------------ + t | | | | | +(1 row) + +-- caggs on caggs +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 day', bucket) AS bucket, sum(count) AS count FROM metrics_by_hour GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------+--------------+------------ + t | | | | | +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 month', bucket) AS bucket, sum(count) AS count FROM metrics_by_hour GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------+--------------+------------ + t | | | | | +(1 row) + +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 year', bucket) AS bucket, sum(count) AS count FROM metrics_by_month GROUP BY 1 $$); + is_valid | error_level | error_code | error_message | error_detail | error_hint +----------+-------------+------------+---------------+--------------+------------ + t | | | | | +(1 row) + diff --git a/tsl/test/shared/expected/extension.out b/tsl/test/shared/expected/extension.out index cc7825ec7f7..7db0572be44 100644 --- a/tsl/test/shared/expected/extension.out +++ b/tsl/test/shared/expected/extension.out @@ -36,6 +36,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text _timescaledb_functions.cagg_migrate_execute_refresh_new_cagg(_timescaledb_catalog.continuous_agg,_timescaledb_catalog.continuous_agg_migrate_plan_step) _timescaledb_functions.cagg_migrate_plan_exists(integer) _timescaledb_functions.cagg_migrate_pre_validation(text,text,text) + _timescaledb_functions.cagg_validate_query(text) _timescaledb_functions.cagg_watermark(integer) _timescaledb_functions.cagg_watermark_materialized(integer) _timescaledb_functions.calculate_chunk_interval(integer,bigint,bigint) diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 7b23ab4ebcb..7d69dd1fb06 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -11,6 +11,7 @@ set(TEST_FILES cagg_permissions.sql cagg_policy.sql cagg_refresh.sql + cagg_utils.sql cagg_watermark.sql compressed_collation.sql compression_create_compressed_table.sql diff --git a/tsl/test/sql/cagg_utils.sql b/tsl/test/sql/cagg_utils.sql new file mode 100644 index 00000000000..ffa2516ae2e --- /dev/null +++ b/tsl/test/sql/cagg_utils.sql @@ -0,0 +1,99 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +SET search_path TO public, _timescaledb_functions; + +CREATE TABLE devices ( + id INTEGER, + name TEXT +); + +CREATE TABLE metrics ( + "time" TIMESTAMPTZ NOT NULL, + device_id INTEGER, + value FLOAT8 +); + +SELECT table_name FROM create_hypertable('metrics', 'time'); + +-- fixed bucket size +CREATE MATERIALIZED VIEW metrics_by_hour WITH (timescaledb.continuous) AS +SELECT time_bucket('1 hour', time) AS bucket, count(*) +FROM metrics +GROUP BY 1 +WITH NO DATA; + +-- variable bucket size +CREATE MATERIALIZED VIEW metrics_by_month WITH (timescaledb.continuous) AS +SELECT time_bucket('1 month', bucket) AS bucket, sum(count) AS count +FROM metrics_by_hour +GROUP BY 1 +WITH NO DATA; + +-- +-- ERRORS +-- + +-- return NULL +SELECT * FROM cagg_validate_query(NULL); + +-- syntax error +SELECT * FROM cagg_validate_query('blahh'); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket(blahh "time") FROM metrics GROUP BY 1 $$); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour' "time") FROM metrics GROUP BY $$); + +-- multiple statements are not allowed +SELECT * FROM cagg_validate_query($$ SELECT 1; SELECT 2; $$); + +-- only SELECT queries are allowed +SELECT * FROM cagg_validate_query($$ DELETE FROM pg_catalog.pg_class $$); +SELECT * FROM cagg_validate_query($$ UPDATE pg_catalog.pg_class SET relkind = 'r' $$); +SELECT * FROM cagg_validate_query($$ DELETE FROM pg_catalog.pg_class $$); +SELECT * FROM cagg_validate_query($$ VACUUM (ANALYZE) $$); + +-- invalid queries +SELECT * FROM cagg_validate_query($$ SELECT 1 $$); +SELECT * FROM cagg_validate_query($$ SELECT 1 FROM pg_catalog.pg_class $$); +SELECT * FROM cagg_validate_query($$ SELECT relkind, count(*) FROM pg_catalog.pg_class GROUP BY 1 $$); + +-- time_bucket with offset is not allowed +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", "offset" => '-1 minute'::interval), count(*) FROM metrics GROUP BY 1 $$); + +-- time_bucket with origin is not allowed +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", origin => '2023-01-01'::timestamptz), count(*) FROM metrics GROUP BY 1 $$); + +-- time_bucket with origin is not allowed +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", origin => '2023-01-01'::timestamptz), count(*) FROM metrics GROUP BY 1 $$); + +-- time_bucket_gapfill is not allowed +SELECT * FROM cagg_validate_query($$ SELECT time_bucket_gapfill('1 hour', "time"), count(*) FROM metrics GROUP BY 1 $$); + +-- invalid join queries +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', a."time"), count(*) FROM metrics a, metrics b GROUP BY 1 $$); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), count(*) FROM metrics, devices a, devices b GROUP BY 1 $$); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), device_id, count(*) FROM metrics LEFT JOIN devices ON id = device_id GROUP BY 1, 2 $$); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), device_id, count(*) FROM metrics JOIN devices ON id = device_id AND name = 'foo' GROUP BY 1, 2 $$); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), device_id, count(*) FROM metrics JOIN devices ON id < device_id GROUP BY 1, 2 $$); + +-- invalid caggs on caggs +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('60 days', bucket) AS bucket, sum(count) AS count FROM metrics_by_month GROUP BY 1 $$); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 day 33 minutes', bucket) AS bucket, sum(count) AS count FROM metrics_by_hour GROUP BY 1 $$); + +-- +-- OK +-- + +-- valid join queries +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), device_id, count(*) FROM metrics JOIN devices ON id = device_id GROUP BY 1, 2 $$); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), device_id, count(*) FROM metrics JOIN devices ON id = device_id WHERE devices.name = 'foo' GROUP BY 1, 2 $$); + +-- valid queries +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time"), count(*) FROM metrics GROUP BY 1 $$); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", timezone => 'UTC'), count(*) FROM metrics GROUP BY 1 $$); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 hour', "time", timezone => 'UTC'), count(*) FROM metrics GROUP BY 1 HAVING count(*) > 1 $$); + +-- caggs on caggs +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 day', bucket) AS bucket, sum(count) AS count FROM metrics_by_hour GROUP BY 1 $$); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 month', bucket) AS bucket, sum(count) AS count FROM metrics_by_hour GROUP BY 1 $$); +SELECT * FROM cagg_validate_query($$ SELECT time_bucket('1 year', bucket) AS bucket, sum(count) AS count FROM metrics_by_month GROUP BY 1 $$); \ No newline at end of file