Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

7244/multi db connection improvements #7286

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
b2051bf
Move databaseOid from SharedConnStatsHashKey to SharedConnStatsHashEntry
ivyazmitinov Oct 5, 2023
23e6b36
- Separate HTAB for connection management and statistics.
ivyazmitinov Oct 13, 2023
d44b1dd
Small fixes
ivyazmitinov Oct 13, 2023
0bb5876
Small fixes
ivyazmitinov Oct 13, 2023
4d775ab
Support for maintenance quota
ivyazmitinov Oct 13, 2023
76d10cc
Implementation of a dedicated maintenance quota
ivyazmitinov Oct 17, 2023
19681ca
Introducing new backend type and disabling caching for daemon backends
ivyazmitinov Oct 18, 2023
5880ecc
Introduce new GUC
ivyazmitinov Oct 18, 2023
9f26f74
Implementation of a dedicated maintenance database
ivyazmitinov Oct 19, 2023
c8ec1b6
Style fixes
ivyazmitinov Oct 19, 2023
481aa99
Changes:
ivyazmitinov Oct 27, 2023
b3bfca9
Adapt implementation to changing MaintenanceQuota on a fly
ivyazmitinov Oct 28, 2023
f447b39
- Synced with main
ivyazmitinov Nov 28, 2023
09917f8
- Change the implementation from quota of the shared pool to a separa…
ivyazmitinov Nov 29, 2023
115ed00
- Adapt locally_reserved_shared_connections to maintenance connection…
ivyazmitinov Nov 30, 2023
6171486
Run make reindent
JelteF Jan 18, 2024
728ce2b
Fix tests
ivyazmitinov Jan 30, 2024
5b8dcb1
Fix failure_single_select
ivyazmitinov Feb 1, 2024
f1b5509
Fix pg16 build
ivyazmitinov Feb 1, 2024
d3f358e
- Address review
ivyazmitinov Feb 1, 2024
62c5090
Fix gucs order
ivyazmitinov Feb 7, 2024
3719836
Run citus_indent
ivyazmitinov Feb 7, 2024
5b2c297
Fix flakiness in multi_maintenance_multiple_databases.sql
ivyazmitinov Feb 12, 2024
10f4777
Fix flakiness in multi_maintenance_multiple_databases.sql x2
ivyazmitinov Feb 12, 2024
a48f274
Address review
ivyazmitinov Feb 28, 2024
46ec3fe
Merge branch 'main' into 7244/multi_db_connection_improvements
JelteF Mar 21, 2024
725bdce
Preliminary add "citus.maintenance_connection_pool_timeout"
ivyazmitinov May 9, 2024
02e7191
Merge branch 'citusdata:main' into 7244/multi_db_connection_improvements
ivyazmitinov May 9, 2024
53869e0
Delete maintenance_connection_timeout.sql temporarily to pass the tests
ivyazmitinov Jun 12, 2024
bcd1bb3
Merge branch 'citusdata:main' into 7244/multi_db_connection_improvements
ivyazmitinov Jun 12, 2024
9f63b5a
Fix StopMaintenanceDaemon by introduction of dbData.daemonShuttingDow…
ivyazmitinov Jun 18, 2024
b1ca008
Workaround for -Werror=unused-variable
ivyazmitinov Jun 18, 2024
028702e
Fix formatting
ivyazmitinov Jun 18, 2024
f584ecc
Fix flaky test and checkstyle
ivyazmitinov Jun 19, 2024
bdc7bea
test_multiple_databases_distributed_deadlock_detection WIP
ivyazmitinov Jun 26, 2024
4312b06
- Fix limits check for local nodes
ivyazmitinov Jun 27, 2024
76a2776
Fix checkstyle
ivyazmitinov Jun 27, 2024
3517850
WIP test_multiple_databases_distributed_deadlock_detection
ivyazmitinov Jun 27, 2024
f25b4b2
Fix checkstyle
ivyazmitinov Jun 27, 2024
1c219fe
Done test_multiple_databases_distributed_deadlock_detection
ivyazmitinov Jun 28, 2024
f7516d7
Fix tests
ivyazmitinov Jun 28, 2024
4d54ee8
Fix tests
ivyazmitinov Jun 28, 2024
a227bcc
- Fix tests
ivyazmitinov Jun 28, 2024
bf2572a
Fix citus_local_tables
ivyazmitinov Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/backend/distributed/commands/multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2160,6 +2160,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext);

RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML);
uint32 connectionFlags = 0;

/*
* Colocated intermediate results do not honor citus.max_shared_pool_size,
Expand All @@ -2181,7 +2182,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
* and cannot switch to local execution (e.g., disabled by user),
* COPY would fail hinting the user to change the relevant settiing.
*/
EnsureConnectionPossibilityForRemotePrimaryNodes();
EnsureConnectionPossibilityForRemotePrimaryNodes(connectionFlags);
}

LocalCopyStatus localCopyStatus = GetLocalCopyStatus();
Expand Down Expand Up @@ -2211,7 +2212,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
*/
if (ShardIntervalListHasLocalPlacements(shardIntervalList))
{
bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode();
bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(
connectionFlags);
copyDest->shouldUseLocalCopy = !reservedConnection;
}
}
Expand Down Expand Up @@ -3634,7 +3636,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
return connection;
}

if (IsReservationPossible())
if (IsReservationPossible(connectionFlags))
{
/*
* Enforce the requirements for adaptive connection management
Expand Down
38 changes: 30 additions & 8 deletions src/backend/distributed/connection/connection_management.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32
static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int
cachedConnectionCount);
static bool ShouldShutdownConnection(MultiConnection *connection,
const int cachedConnectionCount);
static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections);

Expand Down Expand Up @@ -354,6 +354,11 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
if (connection)
{
if (flags & REQUIRE_MAINTENANCE_CONNECTION)
{
/* Maintenance database may have changed, so cached connection should be closed */
connection->forceCloseAtTransactionEnd = true;
}
return connection;
}
}
Expand All @@ -377,9 +382,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
/* these two flags are by nature cannot happen at the same time */
Assert(!((flags & WAIT_FOR_CONNECTION) && (flags & OPTIONAL_CONNECTION)));

int sharedCounterFlags = (flags & REQUIRE_MAINTENANCE_CONNECTION)
? MAINTENANCE_CONNECTION
: 0;
if (flags & WAIT_FOR_CONNECTION)
{
WaitLoopForSharedConnection(hostname, port);
WaitLoopForSharedConnection(sharedCounterFlags, hostname, port);
}
else if (flags & OPTIONAL_CONNECTION)
{
Expand All @@ -389,7 +397,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* cannot reserve the right to establish a connection, we prefer to
* error out.
*/
if (!TryToIncrementSharedConnectionCounter(hostname, port))
if (!TryToIncrementSharedConnectionCounter(sharedCounterFlags, hostname, port))
{
/* do not track the connection anymore */
dlist_delete(&connection->connectionNode);
Expand All @@ -409,7 +417,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
*
* Still, we keep track of the connection counter.
*/
IncrementSharedConnectionCounter(hostname, port);
IncrementSharedConnectionCounter(sharedCounterFlags, hostname, port);
}


Expand All @@ -423,11 +431,15 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,

ResetShardPlacementAssociation(connection);


if ((flags & REQUIRE_METADATA_CONNECTION))
if (flags & REQUIRE_METADATA_CONNECTION)
{
connection->useForMetadataOperations = true;
}
else if (flags & REQUIRE_MAINTENANCE_CONNECTION)
{
connection->useForMaintenanceOperations = true;
connection->forceCloseAtTransactionEnd = true;
}

/* fully initialized the connection, record it */
connection->initializationState = POOL_STATE_INITIALIZED;
Expand Down Expand Up @@ -495,6 +507,12 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
continue;
}

if ((flags & REQUIRE_MAINTENANCE_CONNECTION) &&
!connection->useForMaintenanceOperations)
{
continue;
}

if ((flags & REQUIRE_METADATA_CONNECTION) &&
!connection->useForMetadataOperations)
{
Expand Down Expand Up @@ -1191,7 +1209,11 @@ CitusPQFinish(MultiConnection *connection)
/* behave idempotently, there is no gurantee that CitusPQFinish() is called once */
if (connection->initializationState >= POOL_STATE_COUNTER_INCREMENTED)
{
DecrementSharedConnectionCounter(connection->hostname, connection->port);
int sharedCounterFlags = (connection->useForMaintenanceOperations)
? MAINTENANCE_CONNECTION
: 0;
DecrementSharedConnectionCounter(sharedCounterFlags, connection->hostname,
connection->port);
connection->initializationState = POOL_STATE_NOT_INITIALIZED;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,11 @@ static ReservedConnectionHashEntry * AllocateOrGetReservedConnectionEntry(char *
userId, Oid
databaseOid,
bool *found);
static void EnsureConnectionPossibilityForNodeList(List *nodeList);
static void EnsureConnectionPossibilityForNodeList(List *nodeList, uint32
connectionFlags);
static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode,
bool waitForConnection);
bool waitForConnection,
uint32 connectionFlags);
static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize);
static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize);

Expand Down Expand Up @@ -240,7 +242,9 @@ DeallocateReservedConnections(void)
* We have not used this reservation, make sure to clean-up from
* the shared memory as well.
*/
DecrementSharedConnectionCounter(entry->key.hostname, entry->key.port);
int sharedCounterFlags = 0;
DecrementSharedConnectionCounter(sharedCounterFlags, entry->key.hostname,
entry->key.port);

/* for completeness, set it to true */
entry->usedReservation = true;
Expand Down Expand Up @@ -295,7 +299,7 @@ MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId,
* EnsureConnectionPossibilityForNodeList.
*/
void
EnsureConnectionPossibilityForRemotePrimaryNodes(void)
EnsureConnectionPossibilityForRemotePrimaryNodes(uint32 connectionFlags)
{
/*
* By using NoLock there is a tiny risk of that we miss to reserve a
Expand All @@ -304,7 +308,7 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void)
* going to access would be on the new node.
*/
List *remoteNodeList = ActivePrimaryRemoteNodeList(NoLock);
EnsureConnectionPossibilityForNodeList(remoteNodeList);
EnsureConnectionPossibilityForNodeList(remoteNodeList, connectionFlags);
}


Expand All @@ -314,7 +318,7 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void)
* If not, the function returns false.
*/
bool
TryConnectionPossibilityForLocalPrimaryNode(void)
TryConnectionPossibilityForLocalPrimaryNode(uint32 connectionFlags)
{
bool nodeIsInMetadata = false;
WorkerNode *localNode =
Expand All @@ -330,7 +334,8 @@ TryConnectionPossibilityForLocalPrimaryNode(void)
}

bool waitForConnection = false;
return EnsureConnectionPossibilityForNode(localNode, waitForConnection);
return EnsureConnectionPossibilityForNode(localNode, waitForConnection,
connectionFlags);
}


Expand All @@ -344,7 +349,7 @@ TryConnectionPossibilityForLocalPrimaryNode(void)
* single reservation per backend)
*/
static void
EnsureConnectionPossibilityForNodeList(List *nodeList)
EnsureConnectionPossibilityForNodeList(List *nodeList, uint32 connectionFlags)
{
/*
* We sort the workerList because adaptive connection management
Expand All @@ -363,7 +368,8 @@ EnsureConnectionPossibilityForNodeList(List *nodeList)
foreach_ptr(workerNode, nodeList)
{
bool waitForConnection = true;
EnsureConnectionPossibilityForNode(workerNode, waitForConnection);
EnsureConnectionPossibilityForNode(workerNode, waitForConnection,
connectionFlags);
}
}

Expand All @@ -382,9 +388,10 @@ EnsureConnectionPossibilityForNodeList(List *nodeList)
* return false.
*/
static bool
EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection)
EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection, uint32
connectionFlags)
{
if (!IsReservationPossible())
if (!IsReservationPossible(connectionFlags))
{
return false;
}
Expand Down Expand Up @@ -439,13 +446,17 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio
* Increment the shared counter, we may need to wait if there are
* no space left.
*/
WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort);
int sharedCounterFlags = 0;
WaitLoopForSharedConnection(sharedCounterFlags, workerNode->workerName,
workerNode->workerPort);
}
else
{
bool incremented =
TryToIncrementSharedConnectionCounter(workerNode->workerName,
workerNode->workerPort);
int sharedCounterFlags = 0;
bool incremented = TryToIncrementSharedConnectionCounter(
sharedCounterFlags,
workerNode->workerName,
workerNode->workerPort);
if (!incremented)
{
/*
Expand Down Expand Up @@ -475,9 +486,13 @@ EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnectio
* session is eligible for shared connection reservation.
*/
bool
IsReservationPossible(void)
IsReservationPossible(uint32 connectionFlags)
{
if (GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING)
bool connectionThrottlingDisabled =
connectionFlags & REQUIRE_MAINTENANCE_CONNECTION
? GetMaxMaintenanceSharedPoolSize() <= 0
: GetMaxSharedPoolSize() == DISABLE_CONNECTION_THROTTLING;
if (connectionThrottlingDisabled)
{
/* connection throttling disabled */
return false;
Expand Down
Loading
Loading