Addressing PR comments

pull/6346/head
Nitish Upreti 2022-09-14 17:44:14 -07:00
parent da527951ca
commit 1d16a7ae62
10 changed files with 142 additions and 53 deletions

View File

@ -16,7 +16,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "safe_lib.h" #include "safe_lib.h"
#include "postmaster/postmaster.h"
#include "access/hash.h" #include "access/hash.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/backend_data.h" #include "distributed/backend_data.h"
@ -244,6 +244,28 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
} }
/*
* GetLocalConnectionForSubtransaction returns a localhost connection for subtransaction.
* To avoid creating excessive connections, we reuse an existing connection.
*/
MultiConnection *
GetLocalConnectionForSubtransactionAsUser(char *userName)
{
int connectionFlag = OUTSIDE_TRANSACTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag,
LocalHostName,
PostPortNumber,
userName,
get_database_name(
MyDatabaseId));
/* Don't cache connection for the lifetime of the entire session. */
connection->forceCloseAtTransactionEnd = true;
return connection;
}
/* /*
* StartNodeUserDatabaseConnection() initiates a connection to a remote node. * StartNodeUserDatabaseConnection() initiates a connection to a remote node.
* *

View File

@ -670,10 +670,10 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType,
nodeGroupId, nodeGroupId,
policy); policy);
SendCommandListToWorkerOutsideTransaction(LocalHostName, MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(
PostPortNumber, CitusExtensionOwnerName());
CitusExtensionOwnerName(), SendCommandListToWorkerOutsideTransactionWithConnection(connection,
list_make1(command->data)); list_make1(command->data));
} }
@ -691,10 +691,10 @@ DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId)
PG_DIST_CLEANUP, PG_DIST_CLEANUP,
recordId); recordId);
SendCommandListToWorkerOutsideTransaction(LocalHostName, MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(
PostPortNumber, CitusExtensionOwnerName());
CitusExtensionOwnerName(), SendCommandListToWorkerOutsideTransactionWithConnection(connection,
list_make1(command->data)); list_make1(command->data));
} }
@ -756,10 +756,16 @@ TryDropShardOutsideTransaction(OperationId operationId, char *qualifiedTableName
dropQuery->data); dropQuery->data);
/* remove the shard from the node */ /* remove the shard from the node */
bool success = SendOptionalCommandListToWorkerOutsideTransaction(nodeName, int connectionFlags = OUTSIDE_TRANSACTION;
nodePort, MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
NULL, nodeName, nodePort,
dropCommandList); CurrentUserName(),
NULL);
workerConnection->forceCloseAtTransactionEnd = true;
bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
workerConnection,
dropCommandList);
return success; return success;
} }

View File

@ -2214,14 +2214,8 @@ GetNextShardIdForSplitChild()
appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr( appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr(
"pg_catalog.pg_dist_shardid_seq")); "pg_catalog.pg_dist_shardid_seq"));
int connectionFlag = FORCE_NEW_CONNECTION; MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, CitusExtensionOwnerName());
LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
PGresult *result = NULL; PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data, int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data,
&result); &result);
@ -2238,7 +2232,8 @@ GetNextShardIdForSplitChild()
} }
shardId = SafeStringToUint64(PQgetvalue(result, 0, 0 /* nodeId column*/)); shardId = SafeStringToUint64(PQgetvalue(result, 0, 0 /* nodeId column*/));
CloseConnection(connection); PQclear(result);
ForgetResults(connection);
return shardId; return shardId;
} }

View File

@ -1203,10 +1203,8 @@ CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList)
list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"), list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"),
commandList); commandList);
SendCommandListToWorkerOutsideTransaction( SendCommandListToWorkerOutsideTransactionWithConnection(
target->superuserConnection->hostname, target->superuserConnection,
target->superuserConnection->port,
target->superuserConnection->user,
commandList); commandList);
MemoryContextReset(localContext); MemoryContextReset(localContext);
@ -1585,8 +1583,8 @@ DropUser(MultiConnection *connection, char *username)
* The DROP USER command should not propagate, so we temporarily disable * The DROP USER command should not propagate, so we temporarily disable
* DDL propagation. * DDL propagation.
*/ */
SendCommandListToWorkerOutsideTransaction( SendCommandListToWorkerOutsideTransactionWithConnection(
connection->hostname, connection->port, connection->user, connection,
list_make2( list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;", "SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf("DROP USER IF EXISTS %s", psprintf("DROP USER IF EXISTS %s",
@ -1771,10 +1769,8 @@ CreateSubscriptions(MultiConnection *sourceConnection,
* create a user with SUPERUSER permissions and then alter it to NOSUPERUSER. * create a user with SUPERUSER permissions and then alter it to NOSUPERUSER.
* This prevents permission escalations. * This prevents permission escalations.
*/ */
SendCommandListToWorkerOutsideTransaction( SendCommandListToWorkerOutsideTransactionWithConnection(
target->superuserConnection->hostname, target->superuserConnection,
target->superuserConnection->port,
target->superuserConnection->user,
list_make2( list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;", "SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf( psprintf(
@ -1832,10 +1828,8 @@ CreateSubscriptions(MultiConnection *sourceConnection,
* The ALTER ROLE command should not propagate, so we temporarily * The ALTER ROLE command should not propagate, so we temporarily
* disable DDL propagation. * disable DDL propagation.
*/ */
SendCommandListToWorkerOutsideTransaction( SendCommandListToWorkerOutsideTransactionWithConnection(
target->superuserConnection->hostname, target->superuserConnection,
target->superuserConnection->port,
target->superuserConnection->user,
list_make2( list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;", "SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf( psprintf(

View File

@ -766,6 +766,17 @@ CloseRemoteTransaction(struct MultiConnection *connection)
/* XXX: Should we error out for a critical transaction? */ /* XXX: Should we error out for a critical transaction? */
dlist_delete(&connection->transactionNode); dlist_delete(&connection->transactionNode);
/*
* If the transaction was completed, we have now cleaned it up, so we
* can reset the state to REMOTE_TRANS_NOT_STARTED. This allows us to
* start a new transaction without running into errors.
*/
if (transaction->transactionState == REMOTE_TRANS_ABORTED ||
transaction->transactionState == REMOTE_TRANS_COMMITTED)
{
transaction->transactionState = REMOTE_TRANS_NOT_STARTED;
}
} }
} }

View File

@ -340,6 +340,25 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
nodeName, nodePort, nodeName, nodePort,
nodeUser, NULL); nodeUser, NULL);
SendCommandListToWorkerOutsideTransactionWithConnection(workerConnection,
commandList);
CloseConnection(workerConnection);
}
/*
* SendCommandListToWorkerOutsideTransactionWithConnection sends the command list
* over the specified connection. This opens a new transaction on the
* connection, thus it's important that no transaction is currently open.
* This function is mainly useful to avoid opening an closing
* connections excessively by allowing reusing a single connection to send
* multiple separately committing transactions. The function raises an error if
* any of the queries fail.
*/
void
SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection,
List *commandList)
{
MarkRemoteTransactionCritical(workerConnection); MarkRemoteTransactionCritical(workerConnection);
RemoteTransactionBegin(workerConnection); RemoteTransactionBegin(workerConnection);
@ -351,7 +370,7 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
} }
RemoteTransactionCommit(workerConnection); RemoteTransactionCommit(workerConnection);
CloseConnection(workerConnection); CloseRemoteTransaction(workerConnection);
} }
@ -430,21 +449,18 @@ SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList
/* /*
* SendOptionalCommandListToWorkerOutsideTransaction sends the given command * SendOptionalCommandListToWorkerOutsideTransactionWithConnection sends the
* list to the given worker in a single transaction that is outside of the * given command list over a specified connection in a single transaction that
* coordinated tranaction. If any of the commands fail, it rollbacks the * is outside of the coordinated tranaction.
* transaction, and otherwise commits. *
* If any of the commands fail, it rollbacks the transaction, and otherwise commits.
* A successful commit is indicated by returning true, and a failed commit by returning
* false.
*/ */
bool bool
SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
const char *nodeUser, List *commandList) MultiConnection *workerConnection, List *commandList)
{ {
int connectionFlags = FORCE_NEW_CONNECTION;
bool failed = false;
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
nodeUser, NULL);
if (PQstatus(workerConnection->pgConn) != CONNECTION_OK) if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
{ {
return false; return false;
@ -452,6 +468,7 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no
RemoteTransactionBegin(workerConnection); RemoteTransactionBegin(workerConnection);
/* iterate over the commands and execute them in the same connection */ /* iterate over the commands and execute them in the same connection */
bool failed = false;
const char *commandString = NULL; const char *commandString = NULL;
foreach_ptr(commandString, commandList) foreach_ptr(commandString, commandList)
{ {
@ -471,6 +488,30 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no
RemoteTransactionCommit(workerConnection); RemoteTransactionCommit(workerConnection);
} }
CloseRemoteTransaction(workerConnection);
return !failed;
}
/*
* SendOptionalCommandListToWorkerOutsideTransaction sends the given command
* list to the given worker in a single transaction that is outside of the
* coordinated tranaction. If any of the commands fail, it rollbacks the
* transaction, and otherwise commits.
*/
bool
SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
const char *nodeUser, List *commandList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
nodeUser, NULL);
bool failed = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
workerConnection,
commandList);
CloseConnection(workerConnection); CloseConnection(workerConnection);
return !failed; return !failed;

View File

@ -289,6 +289,7 @@ extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
int32 port, const char *user, int32 port, const char *user,
const char *database); const char *database);
extern MultiConnection * GetLocalConnectionForSubtransactionAsUser(char *userName);
extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
const char *hostname, const char *hostname,
int32 port, int32 port,

View File

@ -12,6 +12,7 @@
#ifndef WORKER_TRANSACTION_H #ifndef WORKER_TRANSACTION_H
#define WORKER_TRANSACTION_H #define WORKER_TRANSACTION_H
#include "distributed/connection_management.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "storage/lockdefs.h" #include "storage/lockdefs.h"
@ -59,6 +60,10 @@ extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeNa
int32 nodePort, int32 nodePort,
const char *nodeUser, const char *nodeUser,
List *commandList); List *commandList);
extern bool SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection,
List *
commandList);
extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const
char *nodeName, char *nodeName,
int32 nodePort, int32 nodePort,
@ -74,6 +79,9 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
int32 nodePort, int32 nodePort,
const char *nodeUser, const char *nodeUser,
List *commandList); List *commandList);
extern void SendCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection,
List *commandList);
extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction( extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
List *workerNodeList, List *workerNodeList,
const char * const char *

View File

@ -41,8 +41,13 @@ SELECT * FROM shards_in_workers;
103 | worker1 103 | worker1
(4 rows) (4 rows)
-- failure on creating the subscription -- Failure on creating the subscription
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").kill()'); -- Failing exactly on CREATE SUBSCRIPTION is causing flaky test where we fail with either:
-- 1) ERROR: connection to the remote node localhost:xxxxx failed with the following error: ERROR: subscription "citus_shard_move_subscription_xxxxxxx" does not exist
-- another command is already in progress
-- 2) ERROR: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
-- Instead fail on the next step (ALTER SUBSCRIPTION) instead which is also required logically as part of uber CREATE SUBSCRIPTION operation.
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -32,8 +32,14 @@ INSERT INTO t SELECT x, x+1, MD5(random()::text) FROM generate_series(1,100000)
-- Initial shard placements -- Initial shard placements
SELECT * FROM shards_in_workers; SELECT * FROM shards_in_workers;
-- failure on creating the subscription -- Failure on creating the subscription
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").kill()'); -- Failing exactly on CREATE SUBSCRIPTION is causing flaky test where we fail with either:
-- 1) ERROR: connection to the remote node localhost:xxxxx failed with the following error: ERROR: subscription "citus_shard_move_subscription_xxxxxxx" does not exist
-- another command is already in progress
-- 2) ERROR: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
-- Instead fail on the next step (ALTER SUBSCRIPTION) instead which is also required logically as part of uber CREATE SUBSCRIPTION operation.
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- Verify that the shard is not moved and the number of rows are still 100k -- Verify that the shard is not moved and the number of rows are still 100k