Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds 2PC distributed commands from other databases #7203

Merged
merged 4 commits into from
Dec 22, 2023
Merged

Conversation

halilozanakgul
Copy link
Contributor

@halilozanakgul halilozanakgul commented Sep 14, 2023

DESCRIPTION: Adds support for 2PC from non-Citus main databases

This PR only adds support for CREATE USER queries, other queries need to be added. But it should be simple because this PR creates the underlying structure.

Citus main database is the database where the Citus extension is created. A non-main database is all the other databases that are in the same node with a Citus main database.

When a CREATE USER query is run on a non-main database we:

  1. Run start_management_transaction on the main database. This function saves the outer transaction's xid (the non-main database query's transaction id) and marks the current query as main db command.
  2. Run execute_command_on_remote_nodes_as_user("CREATE USER <username>", <username to run the command>) on the main database. This function creates the users in the rest of the cluster by running the query on the other nodes. The user on the current node is created by the query on the outer, non-main db, query to make sure consequent commands in the same transaction can see this user.
  3. Run mark_object_distributed on the main database. This function adds the user to pg_dist_object in all of the nodes, including the current one.

This PR also implements transaction recovery for the queries from non-main databases.

@marcocitus marcocitus changed the title Adds 2PC distributed commands to pools Adds 2PC distributed commands from other databases Sep 19, 2023
@halilozanakgul halilozanakgul force-pushed the pools_2pc branch 2 times, most recently from 7a33f0e to 60ec41d Compare September 20, 2023 13:23
@@ -5723,6 +5724,10 @@ GetPoolinfoViaCatalog(int32 nodeId)
char *
GetAuthinfoViaCatalog(const char *roleName, int64 nodeId)
{
if (!isCitusManagementDatabase())
{
return "";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use a comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still applies

bool IsManagementCommand = false;
FullTransactionId outerXid;
static MultiConnection *managementCon = NULL;
bool CitusManagementDatabase = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this boolean? Seems cheap enough to just check the database name in IsCitusManagementDatabase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot use CurrentDatabaseName() function in the post commit hook, where I need the IsCitusMainDb() check. Because it needs to be in a transaction to be able to access the database names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to check if the database you are connected is the MainDb or not would be to add the following change:

diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c
index 5b0208eca..bbf0d2bc7 100644
--- a/src/backend/distributed/shared_library_init.c
+++ b/src/backend/distributed/shared_library_init.c
@@ -3139,6 +3139,11 @@ CitusAuthHook(Port *port, int status)
InitializeBackendData(port->application_name);

+ bool isInMainDB = strcmp(port->database_name, MainDb) == 0;
/* let other authentication hooks to kick in first */
if (original_client_auth_hook)
{

Note that:

  • Need to make isInMainDB global
  • CitusAuthHook is run for all client connections. But it is not run for the background workers (such as autovacuum launcher, Citus maintenance deamon). So you need to initialize isInMainDB with a default value such as isInMainDB = false;

CoordinatedTransactionCallback will be called in those background workers as well. There will be a maintenance deamon connected to MainDb. If we need to do 2PC in there, we should set isMainDB to true there.

if (IsManagementCommand)
{
names = namesArg;
objectType = "role";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be on the safe side (and to make this function useful for further cases other than CREATE ROLE), should we use objectType = getObjectTypeDescription(address, false); here too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(e.g., we will probably use this for CREATE DATABASE as well)

Copy link
Member

@onurctirtir onurctirtir Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(e.g., we will probably use this for CREATE DATABASE as well)

(Maybe not for CREATE DATABASE because we may want to directly propagate CREATE DATABASE to all nodes as it cannot be executed in a xact block anyway.)

Copy link
Contributor

@JelteF JelteF left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks like a good approach. I left some comments to improve this.

Comment on lines 72 to 75
PG_FUNCTION_INFO_V1(citus_internal_start_management_transaction);
PG_FUNCTION_INFO_V1(execute_command_on_other_nodes);
PG_FUNCTION_INFO_V1(citus_mark_object_distributed);
PG_FUNCTION_INFO_V1(commit_management_command_2PC);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all those functions start with citus_internal?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, should they perform our usual checks for citus_internal UDFs?

	CheckCitusVersion(ERROR);

	// also some null input checks go here, via PG_ENSURE_ARGNOTNULL()

	if (!ShouldSkipMetadataChecks())
	{
		EnsureCoordinatorInitiatedOperation();
	}

{
/* handles both already prepared and open transactions */
CoordinatedRemoteTransactionsCommit();
}

if (!IsCitusMainDB())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still concerned about executing below UDF even for regular Citus clusters, due to the check (if !IsCitusMainDb()) not knowing about whether this database is used for db sharding or row-based sharding. Should we check for IsCitusInstalled() too? Or am I missing something?

Copy link
Collaborator

@marcoslot marcoslot Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we're in a sharded not-main database the ideal behaviour would be to create the roles via the current database but add the pg_dist_object records via the main database. (Both as part of the same 2PC)

Until we implement that, we should make sure we keep whatever the current behaviour is in the not-main Citus databases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsCitusInstalled()

Can use CitusHasBeenLoaded probably

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create the roles via the current database but add the pg_dist_object records via the main database

I'm adding this, and main database also runs the query on the workers too.

I assumed we'll set up the parameters so that IsCitusMainDB will return true for regular Citus database too. Maybe we can rename this, so it'll make more sense?

But sure, I can even add CitusHasBeenLoaded to IsCitusMainDB function.

Comment on lines 48 to 63
/*
* All the active primary nodes in the metadata except the current node
*/
OTHER_NODES
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/*
* All the active primary nodes in the metadata except the current node
*/
OTHER_NODES
/*
* All the active primary nodes in the metadata except the local node
*/
OTHER_NODES

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can discuss but to me the word local means something in this node. So local node doesn't sound right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be introduced anymore. We already have REMOTE_NODES which is the same afaict. It was introduced by @onurctirtir in #7278. Let's also make all other places where we use the term "other nodes" to saying "remote nodes". e.g. the execute_command_on_other_nodes UDF.

@JelteF JelteF self-requested a review October 30, 2023 10:08
Copy link

codecov bot commented Nov 17, 2023

Codecov Report

Merging #7203 (06c2a66) into main (6801a1e) will increase coverage by 0.03%.
The diff coverage is 97.58%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #7203      +/-   ##
==========================================
+ Coverage   89.57%   89.61%   +0.03%     
==========================================
  Files         278      278              
  Lines       59984    60100     +116     
  Branches     7469     7487      +18     
==========================================
+ Hits        53732    53857     +125     
+ Misses       4105     4098       -7     
+ Partials     2147     2145       -2     

Comment on lines +282 to +301
{
/*
* The transaction is initiated from an outer transaction and the outer
* transaction is not yet committed, so we should not commit either.
* We remove this transaction from the pendingTransactionSet so it'll
* not be aborted by the loop below.
*/
hash_search(pendingTransactionSet, transactionName, HASH_REMOVE,
&foundPreparedTransactionBeforeCommit);
continue;
}
else if (!outerXactIsInProgress && !outerXactDidCommit)
{
/*
* Since outer transaction isn't in progress and did not commit we need to
* abort the prepared transaction too. We do this by simply doing the same
* thing we would do for transactions that are initiated from the main
* database.
*/
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two cases are currently not covered by tests yet. But it is important that they do the correct thing. They are hard to test though, because they are about edge cases that can happen after the main database transaction has already successfully committed:

  1. the main db transaction is committed, but the outer has not committed yet
  2. the main db transaction is committed, but the outer transaction failed to commit

I think the easiest way to trigger these cases is by simulating them using the python test framework:

  1. open two connections
  2. in connection A we open a transaction and get its transaction id (transaction A)
  3. in connection B we manually do the flow with all our new functions using the transaction id of transaction A: start_managament_command->mark_object_distributed->execute_command_on_other_nodes->COMMIT
  4. And then for edge case 2 we abort transaction A (and for edge case 1 we do nothing)
  5. we run recover_prepared_transactions
  6. check if the changes made by connection B have been applied or not.

For some explanation of the python test suite you can take a look here:

  1. README: https://github.com/citusdata/citus/blob/main/src/test/regress/citus_tests/test/README.md
  2. Example written by Emel: https://github.com/citusdata/citus/blob/main/src/test/regress/citus_tests/test/test_extension.py

MultiConnection *MainDBConnection = NULL;
bool IsMainDBCommand = false;
FullTransactionId OuterXid;
bool IsMainDB = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a code comment saying why we need this boolean (i.e. it makes sense to me after reading the discussion on the PR, but we should have the conclusion as a code comment). Also @emelsimsek said it does not get set automatically for background workers. So we should mention that and probably set it ourselves for the workers that we spawn:

  • CitusAuthHook is run for all client connections. But it is not run for the background workers (such as autovacuum launcher, Citus maintenance deamon). So you need to initialize isInMainDB with a default value such as isInMainDB = false;

CoordinatedTransactionCallback will be called in those background workers as well. There will be a maintenance deamon connected to MainDb. If we need to do 2PC in there, we should set isMainDB to true there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value is true, and when it is true we don't do any new logic. So, I think the default value should be fine for the background workers too. Do you think we should set it to false there?

Copy link
Contributor

@JelteF JelteF Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I think false might be better. That way if a background worker does some global DDL (very unlikely) it will always use the "connect to main database" logic. The only downside of that that I can think of would be an unnecessary connection, in the case where the background worker is actually running in the main database.

While if the default is true, that means it will always be executed in the local database. Which would mean that if the background worker runs global DDL (again, very unlikely) while running in a non-main database then that DDL would not be propagated to other nodes.

So I think defaulting to false would be best. Or well, the best would be if would obviously be to automatically detect in background workers in what database we are. But I don't think that's worth the effort at the moment.

IMPORTANT NOTE: The reasoning above applies only when citus.main_db is actually set to something else than the empty string. If citus.main_db is not configured, I think the safest would be to keep IsMainDB default to true. The easiest way for that would be to set it to true/false from _PG_init based on the value of MainDb. Let's do that in this PR.

Finally, I do think that we should add a code comment that we made this decision to not worry about it for now and why:

When citus.main_db is set IsMainDB variable is only ever set to true for connection backends, not background workers. Background worker processes will always report false here, even if they are running in a different database than the citus.main_db. This is currently fine in practice since this we only use this variable when propagating global DDL, which already seems very unlikely to happen from a background worker. And even if it did the only downside of it being true incorrectly when the background worker is running in the citus main database is that the background worker will create an unnecessary connection to the main database.

* IsCitusMainDB returns true if this is the Citus main database.
*/
bool
IsCitusMainDB(void)
Copy link
Contributor

@JelteF JelteF Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this function has a reason to exist anymore. We can use IsMainDB directly everywhere we use this function.

{
if (!CitusHasBeenLoaded())
{
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quoting @onurctirtir from a previous comment that got outdated:

Should we instead throw an error if we don't expect this function to be called when isCitusManagementDatabase() holds?

@@ -444,7 +444,17 @@ GetEffectiveConnKey(ConnectionHashKey *key)
return key;
}

if (!CitusHasBeenLoaded())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment of this function is now even more outdated than it already was. Let's remove the "task tracker" mention from the function comment and replace it with something about the Citus non-main database.

{
if (strlen(SuperuserRole) == 0)
{
ereport(ERROR, (errmsg("No superuser role is given for Citus main "
Copy link
Contributor

@JelteF JelteF Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is reported as not covered by codecov. I think we should be able to easily cover this case.

@@ -5722,6 +5723,10 @@ GetPoolinfoViaCatalog(int32 nodeId)
char *
GetAuthinfoViaCatalog(const char *roleName, int64 nodeId)
{
if (!CitusHasBeenLoaded())
Copy link
Contributor

@JelteF JelteF Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could still use a comment, e.g.

This happens when doing global DDL from a Citus non-main database.

Comment on lines 15 to 20
#include "udfs/start_management_transaction/12.1-1.sql"
#include "udfs/execute_command_on_remote_nodes_as_user/12.1-1.sql"
#include "udfs/mark_object_distributed/12.1-1.sql"
#include "udfs/commit_management_command_2pc/12.1-1.sql"

ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8;
Copy link
Contributor

@JelteF JelteF Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Citus 12.1 has already been released. These should not be included in this file but in the one for citus 12.2: src/backend/distributed/sql/citus--12.1-1--12.2-1.sql

Same for all the included migration files and the downgrade script, they are currently using files with wrong version numbers.

@@ -0,0 +1,88 @@
CREATE SCHEMA other_databases;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed over chat. We should add mx tests for this, i.e. CREATE/DROP USER should work from non citus main databases on workers too.

Comment on lines 9 to 10
with c.cur(dbname="db1") as cur1:
with c.cur() as cur2:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can combine these two in one with statements, which reduces the indentation of the following code:

Suggested change
with c.cur(dbname="db1") as cur1:
with c.cur() as cur2:
with c.cur(dbname="db1") as cur1, \
c.cur() as cur2:

P.S. I learned of this only a few months ago

Comment on lines 19 to 22
cur2.execute(
"SELECT citus_internal.start_management_transaction('"
+ str(txid[0][0])
+ "')"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is both easier to read (imho) and better practice, because it avoids sql injection (not that that really matters here, but good practice in general not to interpolate sql strings manually when not needed).

Suggested change
cur2.execute(
"SELECT citus_internal.start_management_transaction('"
+ str(txid[0][0])
+ "')"
cur2.execute(
"SELECT citus_internal.start_management_transaction(%s)",
str(txid[0][0]))

"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')"
)
cur2.execute(
"SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually removed from pg_dist_object on the coordinator now? As far as I can tell this leaks outside of the test.

Maybe easiest solution is just to not call mark_object_distributed. Let's manually remove it at the end of the test.

"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')"
)
cur2.execute(
"SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check that this is not visible on the coordinator yet until the commit actually happened. So similar logic like you have now for checking if roles exist.

Comment on lines 36 to 44
before_commit = c.sql_value(
"""
SELECT result
FROM run_command_on_workers($$
SELECT count(*) FROM pg_roles
WHERE rolname = 'u1'
$$) LIMIT 1
"""
)
Copy link
Contributor

@JelteF JelteF Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if you're doing the limit one to only check a single worker anyway, I think it's easier to simply connect straight to the worker.

Suggested change
before_commit = c.sql_value(
"""
SELECT result
FROM run_command_on_workers($$
SELECT count(*) FROM pg_roles
WHERE rolname = 'u1'
$$) LIMIT 1
"""
)
before_commit = cluster.workers[0].sql_value(
"SELECT count(*) FROM pg_roles WHERE rolname = 'u1'"
)

Comment on lines +42 to +46
pdo_before_commit = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
)

assert int(pdo_before_commit) == 0, "role is created despite not committing"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also check pg_dist_object on the coordinator, because mark_object_distributed should have effect there too (as opposed to execute_command_on_remote_nodes_as_user). So just copy paste these lines and replace w0 with c, same for other places where we check pg_dist_object in this test file:

Suggested change
pdo_before_commit = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
)
assert int(pdo_before_commit) == 0, "role is created despite not committing"
pdo_before_commit = c.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
)
assert int(pdo_before_commit) == 0, "role is created despite not committing"

Copy link
Contributor

@JelteF JelteF left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left one final comment about also testing pg_dist_object on the coordinator. Once you do that and get tests to pass in CI we can merge this.

@halilozanakgul halilozanakgul merged commit b877d60 into main Dec 22, 2023
156 checks passed
@halilozanakgul halilozanakgul deleted the pools_2pc branch December 22, 2023 16:19
JelteF added a commit that referenced this pull request Jan 18, 2024
I noticed while reviewing #7203 that there as no example of executing
sql on a worker for the pytest README. Since this is a pretty common
thing that people want to do, this PR adds that.
*/
if (!IsMainDB && MainDBConnection != NULL)
{
RunCitusMainDBQuery(COMMIT_MANAGEMENT_COMMAND_2PC);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invoking citus_internal.commit_management_command_2pc() here, in the XACT_EVENT_COMMIT handler makes the server much more susceptible to crashes, because every error thrown here will lead to a process termination. I faced it when introducing a timeout for maintenance connection pool in #7286.
It would be better to signal the maintenance worker to trigger a recovery somehow...
cc @JelteF

eaydingol added a commit that referenced this pull request Feb 21, 2025
DESCRIPTION: The PR disables the non-main db related features. 

The non-main db related features were introduced in
#7203.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants