Skip to content

Commit

Permalink
Merge branch 'main' into grant_database_2pc
Browse files Browse the repository at this point in the history
  • Loading branch information
gurkanindibay committed Feb 19, 2024
2 parents 9a2cabe + 2cbfdbf commit 166e27d
Show file tree
Hide file tree
Showing 120 changed files with 2,742 additions and 957 deletions.
8 changes: 4 additions & 4 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ USER citus

# build postgres versions separately for effective parrallelism and caching of already built versions when changing only certain versions
FROM base AS pg14
RUN MAKEFLAGS="-j $(nproc)" pgenv build 14.10
RUN MAKEFLAGS="-j $(nproc)" pgenv build 14.11
RUN rm .pgenv/src/*.tar*
RUN make -C .pgenv/src/postgresql-*/ clean
RUN make -C .pgenv/src/postgresql-*/src/include install
Expand All @@ -80,7 +80,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
RUN rm .pgenv-staging/config/default.conf

FROM base AS pg15
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.5
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.6
RUN rm .pgenv/src/*.tar*
RUN make -C .pgenv/src/postgresql-*/ clean
RUN make -C .pgenv/src/postgresql-*/src/include install
Expand All @@ -92,7 +92,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
RUN rm .pgenv-staging/config/default.conf

FROM base AS pg16
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.1
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.2
RUN rm .pgenv/src/*.tar*
RUN make -C .pgenv/src/postgresql-*/ clean
RUN make -C .pgenv/src/postgresql-*/src/include install
Expand Down Expand Up @@ -210,7 +210,7 @@ COPY --chown=citus:citus .psqlrc .
RUN sudo chown --from=root:root citus:citus -R ~

# sets default pg version
RUN pgenv switch 16.1
RUN pgenv switch 16.2

# make connecting to the coordinator easy
ENV PGPORT=9700
10 changes: 5 additions & 5 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ jobs:
pgupgrade_image_name: "citus/pgupgradetester"
style_checker_image_name: "citus/stylechecker"
style_checker_tools_version: "0.8.18"
image_suffix: "-v19b671f"
pg14_version: '{ "major": "14", "full": "14.10" }'
pg15_version: '{ "major": "15", "full": "15.5" }'
pg16_version: '{ "major": "16", "full": "16.1" }'
upgrade_pg_versions: "14.10-15.5-16.1"
image_suffix: "-v390dab3"
pg14_version: '{ "major": "14", "full": "14.11" }'
pg15_version: '{ "major": "15", "full": "15.6" }'
pg16_version: '{ "major": "16", "full": "16.2" }'
upgrade_pg_versions: "14.11-15.6-16.2"
steps:
# Since GHA jobs needs at least one step we use a noop step here.
- name: Set up parameters
Expand Down
110 changes: 110 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,113 @@
### citus v12.1.2 (February 12, 2024) ###

* Fixes the incorrect column count after ALTER TABLE (#7379)

### citus v12.0.1 (July 11, 2023) ###

* Fixes incorrect default value assumption for VACUUM(PROCESS_TOAST) #7122)

* Fixes a bug that causes an unexpected error when adding a column
with a NULL constraint (#7093)

* Fixes a bug that could cause COPY logic to skip data in case of OOM (#7152)

* Fixes a bug with deleting colocation groups (#6929)

* Fixes memory and memory contexts leaks in Foreign Constraint Graphs (#7236)

* Fixes shard size bug with too many shards (#7018)

* Fixes the incorrect column count after ALTER TABLE (#7379)

* Improves citus_tables view performance (#7050)

* Makes sure to disallow creating a replicated distributed table
concurrently (#7219)

* Removes pg_send_cancellation and all references (#7135)

### citus v11.3.1 (February 12, 2024) ###

* Disallows MERGE when the query prunes down to zero shards (#6946)

* Fixes a bug related to non-existent objects in DDL commands (#6984)

* Fixes a bug that could cause COPY logic to skip data in case of OOM (#7152)

* Fixes a bug with deleting colocation groups (#6929)

* Fixes incorrect results on fetching scrollable with hold cursors (#7014)

* Fixes memory and memory context leaks in Foreign Constraint Graphs (#7236)

* Fixes replicate reference tables task fail when user is superuser (#6930)

* Fixes the incorrect column count after ALTER TABLE (#7379)

* Improves citus_shard_sizes performance (#7050)

* Makes sure to disallow creating a replicated distributed table
concurrently (#7219)

* Removes pg_send_cancellation and all references (#7135)

### citus v11.2.2 (February 12, 2024) ###

* Fixes a bug in background shard rebalancer where the replicate
reference tables task fails if the current user is not a superuser (#6930)

* Fixes a bug related to non-existent objects in DDL commands (#6984)

* Fixes a bug that could cause COPY logic to skip data in case of OOM (#7152)

* Fixes a bug with deleting colocation groups (#6929)

* Fixes incorrect results on fetching scrollable with hold cursors (#7014)

* Fixes memory and memory context leaks in Foreign Constraint Graphs (#7236)

* Fixes the incorrect column count after ALTER TABLE (#7379)

* Improves failure handling of distributed execution (#7090)

* Makes sure to disallow creating a replicated distributed table
concurrently (#7219)

* Removes pg_send_cancellation (#7135)

### citus v11.1.7 (February 12, 2024) ###

* Fixes memory and memory context leaks in Foreign Constraint Graphs (#7236)

* Fixes a bug related to non-existent objects in DDL commands (#6984)

* Fixes a bug that could cause COPY logic to skip data in case of OOM (#7152)

* Fixes a bug with deleting colocation groups (#6929)

* Fixes incorrect results on fetching scrollable with hold cursors (#7014)

* Fixes the incorrect column count after ALTER TABLE (#7379)

* Improves failure handling of distributed execution (#7090)

* Makes sure to disallow creating a replicated distributed table
concurrently (#7219)

* Removes pg_send_cancellation and all references (#7135)

### citus v11.0.9 (February 12, 2024) ###

* Fixes a bug that could cause COPY logic to skip data in case of OOM (#7152)

* Fixes a bug with deleting colocation groups (#6929)

* Fixes memory and memory context leaks in Foreign Constraint Graphs (#7236)

* Fixes the incorrect column count after ALTER TABLE (#7462)

* Improve failure handling of distributed execution (#7090)

### citus v12.1.1 (November 9, 2023) ###

* Fixes leaking of memory and memory contexts in Citus foreign key cache
Expand Down
2 changes: 1 addition & 1 deletion ci/check_gucs_are_alphabetically_sorted.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ tail -n +$RegisterCitusConfigVariables_begin_linenumber src/backend/distributed/

# extract citus gucs in the form of <tab><tab>"citus.X"
grep -P "^[\t][\t]\"citus\.[a-zA-Z_0-9]+\"" RegisterCitusConfigVariables_func_def.out > gucs.out
sort -c gucs.out
LC_COLLATE=C sort -c gucs.out
rm gucs.out
rm RegisterCitusConfigVariables_func_def.out
16 changes: 11 additions & 5 deletions src/backend/distributed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1749,8 +1749,6 @@ The reason for handling dependencies and deparsing in post-process step is that

Not all table DDL is currently deparsed. In that case, the original command sent by the client is used. That is a shortcoming in our DDL logic that causes user-facing issues and should be addressed. We do not directly construct a separate DDL command for each shard. Instead, we call the `worker_apply_shard_ddl_command(shardid bigint, ddl_command text)` function which parses the DDL command, replaces the table names with shard names in the parse tree according to the shard ID, and then executes the command. That also has some shortcomings, because we cannot support more complex DDL commands in this manner (e.g. adding multiple foreign keys). Ideally, all DDL would be deparsed, and for table DDL the deparsed query string would have shard names, similar to regular queries.

`markDistributed` is used to indicate whether we add a record to `pg_dist_object` to mark the object as "distributed".

## Defining a new DDL command

All commands that are propagated by Citus should be defined in DistributeObjectOps struct. Below is a sample DistributeObjectOps for ALTER DATABASE command that is defined in [distribute_object_ops.c](commands/distribute_object_ops.c) file.
Expand Down Expand Up @@ -1810,6 +1808,14 @@ GetDistributeObjectOps(Node *node)
...
```
Finally, when adding support for propagation of a new DDL command, you also need to make sure that:
* Use `quote_identifier()` or `quote_literal_cstr()` for the fields that might need escaping some characters or bare quotes when deparsing a DDL command.
* The code is tolerant to nullable fields within given `Stmt *` object, i.e., the ones that Postgres allows not specifying at all.
* You register the object into `pg_dist_object` if it's a CREATE command and you delete the object from `pg_dist_object` if it's a DROP command.
* Node activation (e.g., `citus_add_node()`) properly propagates the object and its dependencies to new nodes.
* Add tests cases for all the scenarios noted above.
* Add test cases for different options that can be specified for the settings. For example, `CREATE DATABASE .. IS_TEMPLATE = TRUE` and `CREATE DATABASE .. IS_TEMPLATE = FALSE` should be tested separately.
## Object & dependency propagation
These two topics are closely related, so we'll discuss them together. You can start the topic by reading [Nils' blog](https://www.citusdata.com/blog/2020/06/25/using-custom-types-with-citus-and-postgres/) on the topic.
Expand Down Expand Up @@ -1885,7 +1891,7 @@ Generally, the process is straightforward: When a new object is created, Citus a

Citus employs a universal strategy for dealing with objects. Every object creation, alteration, or deletion event (like custom types, tables, or extensions) is represented by the C struct `DistributeObjectOps`. You can find a list of all supported object types in [`distribute_object_ops.c`](https://github.com/citusdata/citus/blob/2c190d068918d1c457894adf97f550e5b3739184/src/backend/distributed/commands/distribute_object_ops.c#L4). As of Citus 12.1, most Postgres objects are supported, although there are a few exceptions.

Whenever `DistributeObjectOps->markDistributed` is set to true—usually during `CREATE` operations—Citus calls `MarkObjectDistributed()`. Citus also labels the same objects as distributed across all nodes via the `citus_internal_add_object_metadata()` UDF.
Whenever `DistributeObjectOps->markDistributed` is set to true—usually during `CREATE` operations—Citus calls `MarkObjectDistributed()`. Citus also labels the same objects as distributed across all nodes via the `citus_internal.add_object_metadata()` UDF.

Here's a simple example:

Expand All @@ -1895,7 +1901,7 @@ CREATE TYPE type_test AS (a int, b int);
...
NOTICE: issuing SELECT worker_create_or_replace_object('CREATE TYPE public.type_test AS (a integer, b integer);');
....
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
...

-- Then, check pg_dist_object. This should be consistent across all nodes.
Expand Down Expand Up @@ -2423,7 +2429,7 @@ Cleanup records always need to be committed before creating the actual object. I

PostgreSQL supports change data capture (CDC) via the logical decoding interface. The basic idea behind logical decoding is that you make a replication connection (a special type of postgres connection), start replication, and then the backend process reads through the WAL and decodes the WAL records and emits it over the wire in a format defined by the output plugin. If we were to use regular logical decoding on the nodes of a Citus cluster, we would see the name of the shard in each write, and internal data transfers such as shard moves would result in inserts being emitted. We use several techniques to avoid this.

All writes in PostgreSQL are marked with a replication origin (0 by default) and the decoder can make decisions on whether to emit the change based on the replication origin. We use this to filter out internal data transfers. If `citus.enable_change_data_capture` is enabled, all internal data transfers are marked with the special DoNotReplicateId replication origin by calling the `citus_internal_start_replication_origin_tracking()` UDF before writing the data. This replication origin ID is special in the sense that it does not need to be created (which prevents locking issues, especially when dropping replication origins). It is still up to output plugin to decide what to do with changes marked as DoNotReplicateId.
All writes in PostgreSQL are marked with a replication origin (0 by default) and the decoder can make decisions on whether to emit the change based on the replication origin. We use this to filter out internal data transfers. If `citus.enable_change_data_capture` is enabled, all internal data transfers are marked with the special DoNotReplicateId replication origin by calling the `citus_internal.start_replication_origin_tracking()` UDF before writing the data. This replication origin ID is special in the sense that it does not need to be created (which prevents locking issues, especially when dropping replication origins). It is still up to output plugin to decide what to do with changes marked as DoNotReplicateId.

We have very minimal control over replication commands like `CREATE_REPLICATION_SLOT`, since there are no direct hooks, and decoder names (e.g. “pgoutput”) are typically hard-coded in the client. The only method we found of overriding logical decoding behaviour is to overload the output plugin name in the dynamic library path.

Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/clock/causal_clock.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ AdjustClocksToTransactionHighest(List *nodeConnectionList,

/* Set the clock value on participating worker nodes */
appendStringInfo(queryToSend,
"SELECT pg_catalog.citus_internal_adjust_local_clock_to_remote"
"SELECT citus_internal.adjust_local_clock_to_remote"
"('(%lu, %u)'::pg_catalog.cluster_clock);",
transactionClockValue->logical, transactionClockValue->counter);

Expand Down
40 changes: 25 additions & 15 deletions src/backend/distributed/commands/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -481,9 +481,7 @@ PreprocessCreateDatabaseStmt(Node *node, const char *queryString,
/*
* PostprocessCreateDatabaseStmt is executed after the statement is applied to the local
* postgres instance. In this stage we prepare the commands that need to be run on
* all workers to create the database. Since the CREATE DATABASE statement gives error
* in a transaction block, we need to use NontransactionalNodeDDLTaskList to send the
* CREATE DATABASE statement to the workers.
* all workers to create the database.
*
*/
List *
Expand All @@ -508,20 +506,25 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)

char *createDatabaseCommand = DeparseTreeNode(node);

List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) createDatabaseCommand,
ENABLE_DDL_PROPAGATION);
List *createDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) createDatabaseCommand,
ENABLE_DDL_PROPAGATION);

return NontransactionalNodeDDLTaskList(REMOTE_NODES, commands);
/*
* Since the CREATE DATABASE statements cannot be executed in a transaction
* block, we need to use NontransactionalNodeDDLTaskList() to send the CREATE
* DATABASE statement to the workers.
*/
List *createDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, createDatabaseCommands);
return createDatabaseDDLJobList;
}


/*
* PreprocessDropDatabaseStmt is executed before the statement is applied to the local
* postgres instance. In this stage we can prepare the commands that need to be run on
* all workers to drop the database. Since the DROP DATABASE statement gives error in
* transaction context, we need to use NontransactionalNodeDDLTaskList to send the
* DROP DATABASE statement to the workers.
* all workers to drop the database.
*
* We also serialize database commands globally by acquiring a Citus specific advisory
* lock based on OCLASS_DATABASE on the first primary worker node.
Expand Down Expand Up @@ -559,11 +562,18 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,

char *dropDatabaseCommand = DeparseTreeNode(node);

List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropDatabaseCommand,
ENABLE_DDL_PROPAGATION);
List *dropDatabaseCommands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) dropDatabaseCommand,
ENABLE_DDL_PROPAGATION);

return NontransactionalNodeDDLTaskList(REMOTE_NODES, commands);
/*
* Due to same reason stated in PostprocessCreateDatabaseStmt(), we need to
* use NontransactionalNodeDDLTaskList() to send the DROP DATABASE statement
* to the workers.
*/
List *dropDatabaseDDLJobList =
NontransactionalNodeDDLTaskList(REMOTE_NODES, dropDatabaseCommands);
return dropDatabaseDDLJobList;
}


Expand Down Expand Up @@ -890,7 +900,7 @@ CreateDatabaseDDLCommand(Oid dbId)

/* Generate the CREATE DATABASE statement */
appendStringInfo(outerDbStmt,
"SELECT pg_catalog.citus_internal_database_command(%s)",
"SELECT citus_internal.database_command(%s)",
quote_literal_cstr(createStmt));

ReleaseSysCache(tuple);
Expand Down
3 changes: 1 addition & 2 deletions src/backend/distributed/commands/multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2663,15 +2663,14 @@ CreateLocalColocatedIntermediateFile(CitusCopyDestReceiver *copyDest,
CreateIntermediateResultsDirectory();

const int fileFlags = (O_CREAT | O_RDWR | O_TRUNC);
const int fileMode = (S_IRUSR | S_IWUSR);

StringInfo filePath = makeStringInfo();
appendStringInfo(filePath, "%s_%ld", copyDest->colocatedIntermediateResultIdPrefix,
shardState->shardId);

const char *fileName = QueryResultFileName(filePath->data);
shardState->fileDest =
FileCompatFromFileStart(FileOpenForTransmit(fileName, fileFlags, fileMode));
FileCompatFromFileStart(FileOpenForTransmit(fileName, fileFlags));

CopyOutState localFileCopyOutState = shardState->copyOutState;
bool isBinaryCopy = localFileCopyOutState->binary;
Expand Down
20 changes: 13 additions & 7 deletions src/backend/distributed/commands/role.c
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,14 @@ GenerateGrantRoleStmtsOfRole(Oid roleid)
{
Form_pg_auth_members membership = (Form_pg_auth_members) GETSTRUCT(tuple);

ObjectAddress *roleAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*roleAddress, AuthIdRelationId, membership->grantor);
if (!IsAnyObjectDistributed(list_make1(roleAddress)))
{
/* we only need to propagate the grant if the grantor is distributed */
continue;
}

GrantRoleStmt *grantRoleStmt = makeNode(GrantRoleStmt);
grantRoleStmt->is_grant = true;

Expand All @@ -901,7 +909,11 @@ GenerateGrantRoleStmtsOfRole(Oid roleid)
granteeRole->rolename = GetUserNameFromId(membership->member, true);
grantRoleStmt->grantee_roles = list_make1(granteeRole);

grantRoleStmt->grantor = NULL;
RoleSpec *grantorRole = makeNode(RoleSpec);
grantorRole->roletype = ROLESPEC_CSTRING;
grantorRole->location = -1;
grantorRole->rolename = GetUserNameFromId(membership->grantor, false);
grantRoleStmt->grantor = grantorRole;

#if PG_VERSION_NUM >= PG_VERSION_16

Expand Down Expand Up @@ -1241,12 +1253,6 @@ PreprocessGrantRoleStmt(Node *node, const char *queryString,
return NIL;
}

/*
* Postgres don't seem to use the grantor. Even dropping the grantor doesn't
* seem to affect the membership. If this changes, we might need to add grantors
* to the dependency resolution too. For now we just don't propagate it.
*/
stmt->grantor = NULL;
stmt->grantee_roles = distributedGranteeRoles;
char *sql = DeparseTreeNode((Node *) stmt);
stmt->grantee_roles = allGranteeRoles;
Expand Down
Loading

0 comments on commit 166e27d

Please sign in to comment.