Skip to content

Commit

Permalink
Add SQL function cagg_validate_query
Browse files Browse the repository at this point in the history
With this function is possible to execute the Continuous Aggregate query
validation over an arbitrary query string, without the need to actually
create the Continuous Aggregate.

It can be used, for example, to check for most frequent queries maybe
using `pg_stat_statements`, validate them and check if there are queries
that potenttialy can turned into a Continuous Aggregate.
  • Loading branch information
fabriziomello committed Nov 11, 2023
1 parent 483ddcd commit af5cc2a
Show file tree
Hide file tree
Showing 18 changed files with 666 additions and 30 deletions.
1 change: 1 addition & 0 deletions .unreleased/feature_6307
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #6307 Add SQL function cagg_validate_query
10 changes: 10 additions & 0 deletions sql/cagg_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,13 @@ CREATE OR REPLACE FUNCTION _timescaledb_functions.invalidation_process_cagg_log(
OUT ret_window_start BIGINT,
OUT ret_window_end BIGINT
) RETURNS RECORD AS '@MODULE_PATHNAME@', 'ts_invalidation_process_cagg_log' LANGUAGE C STRICT VOLATILE;

CREATE OR REPLACE FUNCTION _timescaledb_functions.cagg_validate_query(
query TEXT,
OUT is_valid BOOLEAN,
OUT error_level TEXT,
OUT error_code TEXT,
OUT error_message TEXT,
OUT error_detail TEXT,
OUT error_hint TEXT
) RETURNS RECORD AS '@MODULE_PATHNAME@', 'ts_continuous_agg_validate_query' LANGUAGE C STRICT VOLATILE;
3 changes: 3 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,6 @@ CREATE FUNCTION @[email protected]_chunks(

DROP PROCEDURE IF EXISTS _timescaledb_functions.repair_relation_acls();
DROP FUNCTION IF EXISTS _timescaledb_functions.makeaclitem(regrole, regrole, text, bool);

DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_validate_query(TEXT, BOOLEAN, TEXT, TEXT, TEXT, TEXT, TEXT);

65 changes: 65 additions & 0 deletions src/compat/compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -995,4 +995,69 @@ object_ownercheck(Oid classid, Oid objectid, Oid roleid)
#define F_SUM_INT4 2108
#endif

/*
* PG15 refactored elog.c functions and exposed error_severity
* but previous versions don't have it exposed, so imported it
* from Postgres source code.
*
* https://github.com/postgres/postgres/commit/ac7c80758a7
*/
#if PG15_LT
/*
* error_severity --- get string representing elevel
*
* The string is not localized here, but we mark the strings for translation
* so that callers can invoke _() on the result.
*
* Imported from src/backend/utils/error/elog.c
*/
static inline const char *
error_severity(int elevel)
{
const char *prefix;

switch (elevel)
{
case DEBUG1:

Check warning on line 1021 in src/compat/compat.h

View check run for this annotation

Codecov / codecov/patch

src/compat/compat.h#L1021

Added line #L1021 was not covered by tests
case DEBUG2:
case DEBUG3:
case DEBUG4:
case DEBUG5:
prefix = gettext_noop("DEBUG");
break;
case LOG:

Check warning on line 1028 in src/compat/compat.h

View check run for this annotation

Codecov / codecov/patch

src/compat/compat.h#L1026-L1028

Added lines #L1026 - L1028 were not covered by tests
case LOG_SERVER_ONLY:
prefix = gettext_noop("LOG");
break;
case INFO:
prefix = gettext_noop("INFO");
break;
case NOTICE:
prefix = gettext_noop("NOTICE");
break;

Check warning on line 1037 in src/compat/compat.h

View check run for this annotation

Codecov / codecov/patch

src/compat/compat.h#L1030-L1037

Added lines #L1030 - L1037 were not covered by tests
case WARNING:
#if PG14_GE
/* https://github.com/postgres/postgres/commit/1f9158ba481 */
case WARNING_CLIENT_ONLY:
#endif
prefix = gettext_noop("WARNING");
break;
case ERROR:
prefix = gettext_noop("ERROR");
break;
case FATAL:
prefix = gettext_noop("FATAL");
break;
case PANIC:
prefix = gettext_noop("PANIC");
break;
default:
prefix = "???";
break;

Check warning on line 1056 in src/compat/compat.h

View check run for this annotation

Codecov / codecov/patch

src/compat/compat.h#L1048-L1056

Added lines #L1048 - L1056 were not covered by tests
}

return prefix;
}
#endif

#endif /* TIMESCALEDB_COMPAT_H */
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ CROSSMODULE_WRAPPER(decompress_chunk);
/* continuous aggregate */
CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger);
CROSSMODULE_WRAPPER(continuous_agg_refresh);
CROSSMODULE_WRAPPER(continuous_agg_validate_query);
CROSSMODULE_WRAPPER(invalidation_cagg_log_add_entry);
CROSSMODULE_WRAPPER(invalidation_hyper_log_add_entry);
CROSSMODULE_WRAPPER(drop_dist_ht_invalidation_trigger);
Expand Down Expand Up @@ -494,6 +495,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.continuous_agg_invalidate_raw_ht = continuous_agg_invalidate_raw_ht_all_default,
.continuous_agg_invalidate_mat_ht = continuous_agg_invalidate_mat_ht_all_default,
.continuous_agg_update_options = continuous_agg_update_options_default,
.continuous_agg_validate_query = error_no_default_fn_pg_community,
.invalidation_cagg_log_add_entry = error_no_default_fn_pg_community,
.invalidation_hyper_log_add_entry = error_no_default_fn_pg_community,
.remote_invalidation_log_delete = NULL,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ typedef struct CrossModuleFunctions
int64 start, int64 end);
void (*continuous_agg_update_options)(ContinuousAgg *cagg,
WithClauseResult *with_clause_options);
PGFunction continuous_agg_validate_query;
PGFunction invalidation_cagg_log_add_entry;
PGFunction invalidation_hyper_log_add_entry;
void (*remote_invalidation_log_delete)(int32 raw_hypertable_id,
Expand Down
7 changes: 4 additions & 3 deletions tsl/src/continuous_aggs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/common.c
${CMAKE_CURRENT_SOURCE_DIR}/finalize.c
${CMAKE_CURRENT_SOURCE_DIR}/create.c
${CMAKE_CURRENT_SOURCE_DIR}/finalize.c
${CMAKE_CURRENT_SOURCE_DIR}/insert.c
${CMAKE_CURRENT_SOURCE_DIR}/invalidation_threshold.c
${CMAKE_CURRENT_SOURCE_DIR}/invalidation.c
${CMAKE_CURRENT_SOURCE_DIR}/materialize.c
${CMAKE_CURRENT_SOURCE_DIR}/options.c
${CMAKE_CURRENT_SOURCE_DIR}/refresh.c
${CMAKE_CURRENT_SOURCE_DIR}/repair.c
${CMAKE_CURRENT_SOURCE_DIR}/invalidation.c
${CMAKE_CURRENT_SOURCE_DIR}/invalidation_threshold.c)
${CMAKE_CURRENT_SOURCE_DIR}/utils.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
75 changes: 53 additions & 22 deletions tsl/src/continuous_aggs/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ static void caggtimebucketinfo_init(CAggTimebucketInfo *src, int32 hypertable_id
Oid hypertable_partition_coltype,
int64 hypertable_partition_col_interval,
int32 parent_mat_hypertable_id);
static void caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause,
List *targetList);
static void caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList,
bool is_cagg_create);
static bool cagg_agg_validate(Node *node, void *context);
static bool cagg_query_supported(const Query *query, StringInfo hint, StringInfo detail,
const bool finalized);
Expand Down Expand Up @@ -149,7 +149,8 @@ destroy_union_query(Query *q)
* the `bucket_width` and other fields of `tbinfo`.
*/
static void
caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList)
caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *targetList,
bool is_cagg_create)
{
ListCell *l;
bool found = false;
Expand Down Expand Up @@ -320,22 +321,27 @@ caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *tar
tbinfo->bucket_width_type = width->consttype;

if (width->constisnull)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid bucket width for time bucket function")));

if (width->consttype == INTERVALOID)
{
tbinfo->interval = DatumGetIntervalP(width->constvalue);
if (tbinfo->interval->month != 0)
tbinfo->bucket_width = BUCKET_WIDTH_VARIABLE;
if (is_cagg_create)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid bucket width for time bucket function")));
}

if (tbinfo->bucket_width != BUCKET_WIDTH_VARIABLE)
else
{
/* The bucket size is fixed. */
tbinfo->bucket_width =
ts_interval_value_to_internal(width->constvalue, width->consttype);
if (width->consttype == INTERVALOID)
{
tbinfo->interval = DatumGetIntervalP(width->constvalue);
if (tbinfo->interval && tbinfo->interval->month != 0)
tbinfo->bucket_width = BUCKET_WIDTH_VARIABLE;
}

if (tbinfo->bucket_width != BUCKET_WIDTH_VARIABLE)
{
/* The bucket size is fixed. */
tbinfo->bucket_width =
ts_interval_value_to_internal(width->constvalue, width->consttype);
}
}
}
else
Expand Down Expand Up @@ -618,10 +624,9 @@ get_bucket_width(CAggTimebucketInfo bucket_info)

CAggTimebucketInfo
cagg_validate_query(const Query *query, const bool finalized, const char *cagg_schema,
const char *cagg_name)
const char *cagg_name, const bool is_cagg_create)
{
CAggTimebucketInfo bucket_info = { 0 }, bucket_info_parent;
Cache *hcache;
Hypertable *ht = NULL, *ht_parent = NULL;
RangeTblRef *rtref = NULL, *rtref_other = NULL;
RangeTblEntry *rte = NULL, *rte_other = NULL;
Expand Down Expand Up @@ -799,15 +804,27 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
{
const Dimension *part_dimension = NULL;
int32 parent_mat_hypertable_id = INVALID_HYPERTABLE_ID;
Cache *hcache = ts_hypertable_cache_pin();

if (rte->relkind == RELKIND_RELATION)
ht = ts_hypertable_cache_get_cache_and_entry(rte->relid, CACHE_FLAG_NONE, &hcache);
{
ht = ts_hypertable_cache_get_entry(hcache, rte->relid, CACHE_FLAG_MISSING_OK);

if (!ht)
{
ts_cache_release(hcache);
ereport(ERROR,
(errcode(ERRCODE_TS_HYPERTABLE_NOT_EXIST),
errmsg("table \"%s\" is not a hypertable", get_rel_name(rte->relid))));
}

Check warning on line 819 in tsl/src/continuous_aggs/common.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/continuous_aggs/common.c#L819

Added line #L819 was not covered by tests
}
else
{
cagg_parent = ts_continuous_agg_find_by_relid(rte->relid);

if (!cagg_parent)
{
ts_cache_release(hcache);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("invalid continuous aggregate query"),
Expand All @@ -817,6 +834,7 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s

if (!ContinuousAggIsFinalized(cagg_parent))
{
ts_cache_release(hcache);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("old format of continuous aggregate is not supported"),
Expand All @@ -827,7 +845,6 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
}

parent_mat_hypertable_id = cagg_parent->data.mat_hypertable_id;
hcache = ts_hypertable_cache_pin();
ht = ts_hypertable_cache_get_entry_by_id(hcache, cagg_parent->data.mat_hypertable_id);

/* If parent cagg is hierarchical then we should get the matht otherwise the rawht. */
Expand All @@ -846,9 +863,12 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
}

if (TS_HYPERTABLE_IS_INTERNAL_COMPRESSION_TABLE(ht))
{
ts_cache_release(hcache);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("hypertable is an internal compressed hypertable")));
}

Check warning on line 871 in tsl/src/continuous_aggs/common.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/continuous_aggs/common.c#L871

Added line #L871 was not covered by tests

if (rte->relkind == RELKIND_RELATION)
{
Expand All @@ -861,6 +881,7 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
const ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(ht->fd.id);
Assert(cagg != NULL);

ts_cache_release(hcache);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("hypertable is a continuous aggregate materialization table"),
Expand All @@ -882,10 +903,13 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
* below, along with any other fallout.
*/
if (part_dimension->partitioning != NULL)
{
ts_cache_release(hcache);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("custom partitioning functions not supported"
" with continuous aggregates")));
}

Check warning on line 912 in tsl/src/continuous_aggs/common.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/continuous_aggs/common.c#L912

Added line #L912 was not covered by tests

if (IS_INTEGER_TYPE(ts_dimension_get_partition_type(part_dimension)) &&
rte->relkind == RELKIND_RELATION)
Expand All @@ -894,13 +918,16 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
const char *funcname = NameStr(part_dimension->fd.integer_now_func);

if (strlen(funcschema) == 0 || strlen(funcname) == 0)
{
ts_cache_release(hcache);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("custom time function required on hypertable \"%s\"",
get_rel_name(ht->main_table_relid)),
errdetail("An integer-based hypertable requires a custom time function to "
"support continuous aggregates."),
errhint("Set a custom time function on the hypertable.")));
}

Check warning on line 930 in tsl/src/continuous_aggs/common.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/continuous_aggs/common.c#L930

Added line #L930 was not covered by tests
}

caggtimebucketinfo_init(&bucket_info,
Expand Down Expand Up @@ -932,7 +959,10 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
* column of the hypertable
*/
Assert(query->groupClause);
caggtimebucket_validate(&bucket_info, query->groupClause, query->targetList);
caggtimebucket_validate(&bucket_info,
query->groupClause,
query->targetList,
is_cagg_create);
}

/* Check row security settings for the table. */
Expand All @@ -950,7 +980,8 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
Assert(prev_query->groupClause);
caggtimebucket_validate(&bucket_info_parent,
prev_query->groupClause,
prev_query->targetList);
prev_query->targetList,
is_cagg_create);

/* Cannot create cagg with fixed bucket on top of variable bucket. */
if ((bucket_info_parent.bucket_width == BUCKET_WIDTH_VARIABLE &&
Expand Down
7 changes: 5 additions & 2 deletions tsl/src/continuous_aggs/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

#ifndef TIMESCALEDB_TSL_CONTINUOUS_AGGS_COMMON_H
#define TIMESCALEDB_TSL_CONTINUOUS_AGGS_COMMON_H

Expand Down Expand Up @@ -122,7 +123,8 @@ typedef struct AggPartCxt
} while (0);

extern CAggTimebucketInfo cagg_validate_query(const Query *query, const bool finalized,
const char *cagg_schema, const char *cagg_name);
const char *cagg_schema, const char *cagg_name,
const bool is_cagg_create);
extern Query *destroy_union_query(Query *q);
extern Oid relation_oid(Name schema, Name name);
extern void RemoveRangeTableEntries(Query *query);
Expand All @@ -131,4 +133,5 @@ extern Query *build_union_query(CAggTimebucketInfo *tbinfo, int matpartcolno, Qu
extern void mattablecolumninfo_init(MatTableColumnInfo *matcolinfo, List *grouplist);
extern void mattablecolumninfo_addinternal(MatTableColumnInfo *matcolinfo);
extern bool function_allowed_in_cagg_definition(Oid funcid);
#endif

#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_COMMON_H */
6 changes: 4 additions & 2 deletions tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,8 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
timebucket_exprinfo = cagg_validate_query((Query *) stmt->into->viewQuery,
finalized,
schema_name,
stmt->into->rel->relname);
stmt->into->rel->relname,
true);
cagg_create(stmt, &viewstmt, (Query *) stmt->query, &timebucket_exprinfo, with_clause_options);

/* Insert the MIN of the time dimension type for the new watermark */
Expand Down Expand Up @@ -1024,7 +1025,8 @@ cagg_flip_realtime_view_definition(ContinuousAgg *agg, Hypertable *mat_ht)
cagg_validate_query(direct_query,
agg->data.finalized,
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name));
NameStr(agg->data.user_view_name),
true);

/* Flip */
agg->data.materialized_only = !agg->data.materialized_only;
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/continuous_aggs/repair.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht, bool force_
cagg_validate_query(direct_query,
finalized,
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name));
NameStr(agg->data.user_view_name),
true);

mattablecolumninfo_init(&mattblinfo, copyObject(direct_query->groupClause));
fqi.finalized = finalized;
Expand Down
Loading

0 comments on commit af5cc2a

Please sign in to comment.