-
Notifications
You must be signed in to change notification settings - Fork 695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds 2PC distributed commands from other databases #7203
Conversation
7a33f0e
to
60ec41d
Compare
@@ -5723,6 +5724,10 @@ GetPoolinfoViaCatalog(int32 nodeId) | |||
char * | |||
GetAuthinfoViaCatalog(const char *roleName, int64 nodeId) | |||
{ | |||
if (!isCitusManagementDatabase()) | |||
{ | |||
return ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still applies
bool IsManagementCommand = false; | ||
FullTransactionId outerXid; | ||
static MultiConnection *managementCon = NULL; | ||
bool CitusManagementDatabase = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this boolean? Seems cheap enough to just check the database name in IsCitusManagementDatabase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot use CurrentDatabaseName()
function in the post commit hook, where I need the IsCitusMainDb()
check. Because it needs to be in a transaction to be able to access the database names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to check if the database you are connected is the MainDb or not would be to add the following change:
diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c
index 5b0208eca..bbf0d2bc7 100644
--- a/src/backend/distributed/shared_library_init.c
+++ b/src/backend/distributed/shared_library_init.c
@@ -3139,6 +3139,11 @@ CitusAuthHook(Port *port, int status)
InitializeBackendData(port->application_name);+ bool isInMainDB = strcmp(port->database_name, MainDb) == 0;
/* let other authentication hooks to kick in first */
if (original_client_auth_hook)
{
Note that:
- Need to make isInMainDB global
- CitusAuthHook is run for all client connections. But it is not run for the background workers (such as autovacuum launcher, Citus maintenance deamon). So you need to initialize isInMainDB with a default value such as isInMainDB = false;
CoordinatedTransactionCallback
will be called in those background workers as well. There will be a maintenance deamon connected to MainDb. If we need to do 2PC in there, we should set isMainDB to true there.
src/backend/distributed/sql/udfs/commit_management_command_2PC/12.1-1.sql
Outdated
Show resolved
Hide resolved
if (IsManagementCommand) | ||
{ | ||
names = namesArg; | ||
objectType = "role"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be on the safe side (and to make this function useful for further cases other than CREATE ROLE), should we use objectType = getObjectTypeDescription(address, false);
here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(e.g., we will probably use this for CREATE DATABASE as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(e.g., we will probably use this for CREATE DATABASE as well)
(Maybe not for CREATE DATABASE because we may want to directly propagate CREATE DATABASE to all nodes as it cannot be executed in a xact block anyway.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks like a good approach. I left some comments to improve this.
src/backend/distributed/sql/udfs/commit_management_command_2PC/latest.sql
Outdated
Show resolved
Hide resolved
src/backend/distributed/sql/udfs/citus_mark_object_distributed/latest.sql
Outdated
Show resolved
Hide resolved
PG_FUNCTION_INFO_V1(citus_internal_start_management_transaction); | ||
PG_FUNCTION_INFO_V1(execute_command_on_other_nodes); | ||
PG_FUNCTION_INFO_V1(citus_mark_object_distributed); | ||
PG_FUNCTION_INFO_V1(commit_management_command_2PC); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should all those functions start with citus_internal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, should they perform our usual checks for citus_internal UDFs?
CheckCitusVersion(ERROR);
// also some null input checks go here, via PG_ENSURE_ARGNOTNULL()
if (!ShouldSkipMetadataChecks())
{
EnsureCoordinatorInitiatedOperation();
}
{ | ||
/* handles both already prepared and open transactions */ | ||
CoordinatedRemoteTransactionsCommit(); | ||
} | ||
|
||
if (!IsCitusMainDB()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still concerned about executing below UDF even for regular Citus clusters, due to the check (if !IsCitusMainDb()) not knowing about whether this database is used for db sharding or row-based sharding. Should we check for IsCitusInstalled() too? Or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we're in a sharded not-main database the ideal behaviour would be to create the roles via the current database but add the pg_dist_object records via the main database. (Both as part of the same 2PC)
Until we implement that, we should make sure we keep whatever the current behaviour is in the not-main Citus databases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IsCitusInstalled()
Can use CitusHasBeenLoaded probably
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create the roles via the current database but add the pg_dist_object records via the main database
I'm adding this, and main database also runs the query on the workers too.
I assumed we'll set up the parameters so that IsCitusMainDB
will return true for regular Citus database too. Maybe we can rename this, so it'll make more sense?
But sure, I can even add CitusHasBeenLoaded
to IsCitusMainDB
function.
/* | ||
* All the active primary nodes in the metadata except the current node | ||
*/ | ||
OTHER_NODES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/* | |
* All the active primary nodes in the metadata except the current node | |
*/ | |
OTHER_NODES | |
/* | |
* All the active primary nodes in the metadata except the local node | |
*/ | |
OTHER_NODES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can discuss but to me the word local means something in this node. So local node doesn't sound right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this needs to be introduced anymore. We already have REMOTE_NODES which is the same afaict. It was introduced by @onurctirtir in #7278. Let's also make all other places where we use the term "other nodes" to saying "remote nodes". e.g. the execute_command_on_other_nodes
UDF.
57c44b1
to
b0b6f3e
Compare
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #7203 +/- ##
==========================================
+ Coverage 89.57% 89.61% +0.03%
==========================================
Files 278 278
Lines 59984 60100 +116
Branches 7469 7487 +18
==========================================
+ Hits 53732 53857 +125
+ Misses 4105 4098 -7
+ Partials 2147 2145 -2 |
b9d34a6
to
b5606fa
Compare
5a2f254
to
e1f8b7a
Compare
{ | ||
/* | ||
* The transaction is initiated from an outer transaction and the outer | ||
* transaction is not yet committed, so we should not commit either. | ||
* We remove this transaction from the pendingTransactionSet so it'll | ||
* not be aborted by the loop below. | ||
*/ | ||
hash_search(pendingTransactionSet, transactionName, HASH_REMOVE, | ||
&foundPreparedTransactionBeforeCommit); | ||
continue; | ||
} | ||
else if (!outerXactIsInProgress && !outerXactDidCommit) | ||
{ | ||
/* | ||
* Since outer transaction isn't in progress and did not commit we need to | ||
* abort the prepared transaction too. We do this by simply doing the same | ||
* thing we would do for transactions that are initiated from the main | ||
* database. | ||
*/ | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two cases are currently not covered by tests yet. But it is important that they do the correct thing. They are hard to test though, because they are about edge cases that can happen after the main database transaction has already successfully committed:
- the main db transaction is committed, but the outer has not committed yet
- the main db transaction is committed, but the outer transaction failed to commit
I think the easiest way to trigger these cases is by simulating them using the python test framework:
- open two connections
- in connection A we open a transaction and get its transaction id (transaction A)
- in connection B we manually do the flow with all our new functions using the transaction id of transaction A: start_managament_command->mark_object_distributed->execute_command_on_other_nodes->COMMIT
- And then for edge case 2 we abort transaction A (and for edge case 1 we do nothing)
- we run recover_prepared_transactions
- check if the changes made by connection B have been applied or not.
For some explanation of the python test suite you can take a look here:
MultiConnection *MainDBConnection = NULL; | ||
bool IsMainDBCommand = false; | ||
FullTransactionId OuterXid; | ||
bool IsMainDB = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be a code comment saying why we need this boolean (i.e. it makes sense to me after reading the discussion on the PR, but we should have the conclusion as a code comment). Also @emelsimsek said it does not get set automatically for background workers. So we should mention that and probably set it ourselves for the workers that we spawn:
- CitusAuthHook is run for all client connections. But it is not run for the background workers (such as autovacuum launcher, Citus maintenance deamon). So you need to initialize isInMainDB with a default value such as isInMainDB = false;
CoordinatedTransactionCallback will be called in those background workers as well. There will be a maintenance deamon connected to MainDb. If we need to do 2PC in there, we should set isMainDB to true there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value is true
, and when it is true
we don't do any new logic. So, I think the default value should be fine for the background workers too. Do you think we should set it to false there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I think false might be better. That way if a background worker does some global DDL (very unlikely) it will always use the "connect to main database" logic. The only downside of that that I can think of would be an unnecessary connection, in the case where the background worker is actually running in the main database.
While if the default is true, that means it will always be executed in the local database. Which would mean that if the background worker runs global DDL (again, very unlikely) while running in a non-main database then that DDL would not be propagated to other nodes.
So I think defaulting to false
would be best. Or well, the best would be if would obviously be to automatically detect in background workers in what database we are. But I don't think that's worth the effort at the moment.
IMPORTANT NOTE: The reasoning above applies only when citus.main_db
is actually set to something else than the empty string. If citus.main_db
is not configured, I think the safest would be to keep IsMainDB
default to true
. The easiest way for that would be to set it to true/false from _PG_init
based on the value of MainDb
. Let's do that in this PR.
Finally, I do think that we should add a code comment that we made this decision to not worry about it for now and why:
When citus.main_db is set IsMainDB variable is only ever set to true for connection backends, not background workers. Background worker processes will always report false here, even if they are running in a different database than the citus.main_db. This is currently fine in practice since this we only use this variable when propagating global DDL, which already seems very unlikely to happen from a background worker. And even if it did the only downside of it being true incorrectly when the background worker is running in the citus main database is that the background worker will create an unnecessary connection to the main database.
* IsCitusMainDB returns true if this is the Citus main database. | ||
*/ | ||
bool | ||
IsCitusMainDB(void) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this function has a reason to exist anymore. We can use IsMainDB
directly everywhere we use this function.
{ | ||
if (!CitusHasBeenLoaded()) | ||
{ | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quoting @onurctirtir from a previous comment that got outdated:
Should we instead throw an error if we don't expect this function to be called when isCitusManagementDatabase() holds?
511d9c6
to
b07bc2b
Compare
@@ -444,7 +444,17 @@ GetEffectiveConnKey(ConnectionHashKey *key) | |||
return key; | |||
} | |||
|
|||
if (!CitusHasBeenLoaded()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment of this function is now even more outdated than it already was. Let's remove the "task tracker" mention from the function comment and replace it with something about the Citus non-main database.
{ | ||
if (strlen(SuperuserRole) == 0) | ||
{ | ||
ereport(ERROR, (errmsg("No superuser role is given for Citus main " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is reported as not covered by codecov. I think we should be able to easily cover this case.
@@ -5722,6 +5723,10 @@ GetPoolinfoViaCatalog(int32 nodeId) | |||
char * | |||
GetAuthinfoViaCatalog(const char *roleName, int64 nodeId) | |||
{ | |||
if (!CitusHasBeenLoaded()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could still use a comment, e.g.
This happens when doing global DDL from a Citus non-main database.
#include "udfs/start_management_transaction/12.1-1.sql" | ||
#include "udfs/execute_command_on_remote_nodes_as_user/12.1-1.sql" | ||
#include "udfs/mark_object_distributed/12.1-1.sql" | ||
#include "udfs/commit_management_command_2pc/12.1-1.sql" | ||
|
||
ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Citus 12.1 has already been released. These should not be included in this file but in the one for citus 12.2: src/backend/distributed/sql/citus--12.1-1--12.2-1.sql
Same for all the included migration files and the downgrade script, they are currently using files with wrong version numbers.
@@ -0,0 +1,88 @@ | |||
CREATE SCHEMA other_databases; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed over chat. We should add mx tests for this, i.e. CREATE/DROP USER should work from non citus main databases on workers too.
with c.cur(dbname="db1") as cur1: | ||
with c.cur() as cur2: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can combine these two in one with statements, which reduces the indentation of the following code:
with c.cur(dbname="db1") as cur1: | |
with c.cur() as cur2: | |
with c.cur(dbname="db1") as cur1, \ | |
c.cur() as cur2: |
P.S. I learned of this only a few months ago
cur2.execute( | ||
"SELECT citus_internal.start_management_transaction('" | ||
+ str(txid[0][0]) | ||
+ "')" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is both easier to read (imho) and better practice, because it avoids sql injection (not that that really matters here, but good practice in general not to interpolate sql strings manually when not needed).
cur2.execute( | |
"SELECT citus_internal.start_management_transaction('" | |
+ str(txid[0][0]) | |
+ "')" | |
cur2.execute( | |
"SELECT citus_internal.start_management_transaction(%s)", | |
str(txid[0][0])) |
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')" | ||
) | ||
cur2.execute( | ||
"SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually removed from pg_dist_object on the coordinator now? As far as I can tell this leaks outside of the test.
Maybe easiest solution is just to not call mark_object_distributed. Let's manually remove it at the end of the test.
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')" | ||
) | ||
cur2.execute( | ||
"SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check that this is not visible on the coordinator yet until the commit actually happened. So similar logic like you have now for checking if roles exist.
before_commit = c.sql_value( | ||
""" | ||
SELECT result | ||
FROM run_command_on_workers($$ | ||
SELECT count(*) FROM pg_roles | ||
WHERE rolname = 'u1' | ||
$$) LIMIT 1 | ||
""" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if you're doing the limit one to only check a single worker anyway, I think it's easier to simply connect straight to the worker.
before_commit = c.sql_value( | |
""" | |
SELECT result | |
FROM run_command_on_workers($$ | |
SELECT count(*) FROM pg_roles | |
WHERE rolname = 'u1' | |
$$) LIMIT 1 | |
""" | |
) | |
before_commit = cluster.workers[0].sql_value( | |
"SELECT count(*) FROM pg_roles WHERE rolname = 'u1'" | |
) |
f315f16
to
7124d7e
Compare
adc12e1
to
9bec1ca
Compare
pdo_before_commit = w0.sql_value( | ||
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" | ||
) | ||
|
||
assert int(pdo_before_commit) == 0, "role is created despite not committing" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should also check pg_dist_object on the coordinator, because mark_object_distributed should have effect there too (as opposed to execute_command_on_remote_nodes_as_user
). So just copy paste these lines and replace w0 with c, same for other places where we check pg_dist_object in this test file:
pdo_before_commit = w0.sql_value( | |
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" | |
) | |
assert int(pdo_before_commit) == 0, "role is created despite not committing" | |
pdo_before_commit = c.sql_value( | |
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" | |
) | |
assert int(pdo_before_commit) == 0, "role is created despite not committing" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left one final comment about also testing pg_dist_object on the coordinator. Once you do that and get tests to pass in CI we can merge this.
I noticed while reviewing #7203 that there as no example of executing sql on a worker for the pytest README. Since this is a pretty common thing that people want to do, this PR adds that.
*/ | ||
if (!IsMainDB && MainDBConnection != NULL) | ||
{ | ||
RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Invoking citus_internal.commit_management_command_2pc()
here, in the XACT_EVENT_COMMIT
handler makes the server much more susceptible to crashes, because every error thrown here will lead to a process termination. I faced it when introducing a timeout for maintenance connection pool in #7286.
It would be better to signal the maintenance worker to trigger a recovery somehow...
cc @JelteF
DESCRIPTION: The PR disables the non-main db related features. The non-main db related features were introduced in #7203.
DESCRIPTION: Adds support for 2PC from non-Citus main databases
This PR only adds support for
CREATE USER
queries, other queries need to be added. But it should be simple because this PR creates the underlying structure.Citus main database is the database where the Citus extension is created. A non-main database is all the other databases that are in the same node with a Citus main database.
When a
CREATE USER
query is run on a non-main database we:start_management_transaction
on the main database. This function saves the outer transaction's xid (the non-main database query's transaction id) and marks the current query as main db command.execute_command_on_remote_nodes_as_user("CREATE USER <username>", <username to run the command>)
on the main database. This function creates the users in the rest of the cluster by running the query on the other nodes. The user on the current node is created by the query on the outer, non-main db, query to make sure consequent commands in the same transaction can see this user.mark_object_distributed
on the main database. This function adds the user topg_dist_object
in all of the nodes, including the current one.This PR also implements transaction recovery for the queries from non-main databases.