mirror of https://github.com/citusdata/citus.git
Fix PR comments
parent
a5aaa08a20
commit
17ae71fa8f
|
@ -16,7 +16,7 @@
|
|||
#include "miscadmin.h"
|
||||
|
||||
#include "safe_lib.h"
|
||||
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "access/hash.h"
|
||||
#include "commands/dbcommands.h"
|
||||
#include "distributed/backend_data.h"
|
||||
|
@ -244,6 +244,27 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetLocalConnectionForSubtransaction establishes a localhost connection.
|
||||
* To avoid creating excessive connections, we try to reuse an existing connection.
|
||||
*/
|
||||
MultiConnection*
|
||||
GetLocalConnectionForSubtransactionAsUser(char *userName)
|
||||
{
|
||||
int connectionFlag = OUTSIDE_TRANSACTION;
|
||||
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag,
|
||||
LocalHostName,
|
||||
PostPortNumber,
|
||||
userName,
|
||||
get_database_name(
|
||||
MyDatabaseId));
|
||||
/* Don't cache connection for the lifetime of the entire session. */
|
||||
connection->forceCloseAtTransactionEnd = true;
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StartNodeUserDatabaseConnection() initiates a connection to a remote node.
|
||||
*
|
||||
|
|
|
@ -670,9 +670,8 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType,
|
|||
nodeGroupId,
|
||||
policy);
|
||||
|
||||
SendCommandListToWorkerOutsideTransaction(LocalHostName,
|
||||
PostPortNumber,
|
||||
CitusExtensionOwnerName(),
|
||||
MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName());
|
||||
SendCommandListToWorkerOutsideTransactionWithConnection(connection,
|
||||
list_make1(command->data));
|
||||
}
|
||||
|
||||
|
@ -691,9 +690,8 @@ DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId)
|
|||
PG_DIST_CLEANUP,
|
||||
recordId);
|
||||
|
||||
SendCommandListToWorkerOutsideTransaction(LocalHostName,
|
||||
PostPortNumber,
|
||||
CitusExtensionOwnerName(),
|
||||
MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName());
|
||||
SendCommandListToWorkerOutsideTransactionWithConnection(connection,
|
||||
list_make1(command->data));
|
||||
}
|
||||
|
||||
|
@ -756,9 +754,8 @@ TryDropShardOutsideTransaction(OperationId operationId, char *qualifiedTableName
|
|||
dropQuery->data);
|
||||
|
||||
/* remove the shard from the node */
|
||||
bool success = SendOptionalCommandListToWorkerOutsideTransaction(nodeName,
|
||||
nodePort,
|
||||
NULL,
|
||||
MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName());
|
||||
bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(connection,
|
||||
dropCommandList);
|
||||
|
||||
return success;
|
||||
|
|
|
@ -2245,14 +2245,7 @@ GetNextShardIdForSplitChild()
|
|||
appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr(
|
||||
"pg_catalog.pg_dist_shardid_seq"));
|
||||
|
||||
int connectionFlag = FORCE_NEW_CONNECTION;
|
||||
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag,
|
||||
LocalHostName,
|
||||
PostPortNumber,
|
||||
CitusExtensionOwnerName(),
|
||||
get_database_name(
|
||||
MyDatabaseId));
|
||||
|
||||
MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName());
|
||||
PGresult *result = NULL;
|
||||
int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data,
|
||||
&result);
|
||||
|
@ -2269,7 +2262,8 @@ GetNextShardIdForSplitChild()
|
|||
}
|
||||
|
||||
shardId = SafeStringToUint64(PQgetvalue(result, 0, 0 /* nodeId column*/));
|
||||
CloseConnection(connection);
|
||||
PQclear(result);
|
||||
ForgetResults(connection);
|
||||
|
||||
return shardId;
|
||||
}
|
||||
|
|
|
@ -1162,7 +1162,7 @@ CreateForeignKeyConstraints(List *logicalRepTargetList)
|
|||
list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"),
|
||||
commandList);
|
||||
|
||||
SendCommandListToWorkerInSeparateTransaction(
|
||||
SendCommandListToWorkerOutsideTransactionWithConnection(
|
||||
target->superuserConnection,
|
||||
commandList);
|
||||
|
||||
|
@ -1542,7 +1542,7 @@ DropUser(MultiConnection *connection, char *username)
|
|||
* The DROP USER command should not propagate, so we temporarily disable
|
||||
* DDL propagation.
|
||||
*/
|
||||
SendCommandListToWorkerInSeparateTransaction(
|
||||
SendCommandListToWorkerOutsideTransactionWithConnection(
|
||||
connection,
|
||||
list_make2(
|
||||
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
|
||||
|
@ -1728,7 +1728,7 @@ CreateSubscriptions(MultiConnection *sourceConnection,
|
|||
* create a user with SUPERUSER permissions and then alter it to NOSUPERUSER.
|
||||
* This prevents permission escalations.
|
||||
*/
|
||||
SendCommandListToWorkerInSeparateTransaction(
|
||||
SendCommandListToWorkerOutsideTransactionWithConnection(
|
||||
target->superuserConnection,
|
||||
list_make2(
|
||||
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
|
||||
|
@ -1787,7 +1787,7 @@ CreateSubscriptions(MultiConnection *sourceConnection,
|
|||
* The ALTER ROLE command should not propagate, so we temporarily
|
||||
* disable DDL propagation.
|
||||
*/
|
||||
SendCommandListToWorkerInSeparateTransaction(
|
||||
SendCommandListToWorkerOutsideTransactionWithConnection(
|
||||
target->superuserConnection,
|
||||
list_make2(
|
||||
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
|
||||
|
|
|
@ -340,24 +340,15 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
|
|||
nodeName, nodePort,
|
||||
nodeUser, NULL);
|
||||
|
||||
MarkRemoteTransactionCritical(workerConnection);
|
||||
RemoteTransactionBegin(workerConnection);
|
||||
|
||||
/* iterate over the commands and execute them in the same connection */
|
||||
const char *commandString = NULL;
|
||||
foreach_ptr(commandString, commandList)
|
||||
{
|
||||
ExecuteCriticalRemoteCommand(workerConnection, commandString);
|
||||
}
|
||||
|
||||
RemoteTransactionCommit(workerConnection);
|
||||
SendCommandListToWorkerOutsideTransactionWithConnection(workerConnection,
|
||||
commandList);
|
||||
CloseConnection(workerConnection);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendCommandListToWorkerInSeparateTransaction sends the command list over the
|
||||
* connection in a separate connection. This opens a new transaction on the
|
||||
* SendCommandListToWorkerOutsideTransactionWithConnection sends the command list
|
||||
* over the connection in a separate connection. This opens a new transaction on the
|
||||
* connection, thus it's important that no transaction is currently open on the
|
||||
* given connection. This function is mainly useful to avoid opening an closing
|
||||
* connections excessively by allowing reusing a single connection to send
|
||||
|
@ -365,7 +356,7 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
|
|||
* any of the queries fail.
|
||||
*/
|
||||
void
|
||||
SendCommandListToWorkerInSeparateTransaction(MultiConnection *workerConnection,
|
||||
SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection,
|
||||
List *commandList)
|
||||
{
|
||||
MarkRemoteTransactionCritical(workerConnection);
|
||||
|
@ -458,21 +449,15 @@ SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList
|
|||
|
||||
|
||||
/*
|
||||
* SendOptionalCommandListToWorkerOutsideTransaction sends the given command
|
||||
* list to the given worker in a single transaction that is outside of the
|
||||
* coordinated tranaction. If any of the commands fail, it rollbacks the
|
||||
* SendOptionalCommandListToWorkerOutsideTransactionWithConnection sends the
|
||||
* given command list to the given worker in a single transaction that is outside
|
||||
* of the coordinated tranaction. If any of the commands fail, it rollbacks the
|
||||
* transaction, and otherwise commits.
|
||||
*
|
||||
*/
|
||||
bool
|
||||
SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
|
||||
const char *nodeUser, List *commandList)
|
||||
SendOptionalCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection, List *commandList)
|
||||
{
|
||||
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||
bool failed = false;
|
||||
|
||||
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||
nodeName, nodePort,
|
||||
nodeUser, NULL);
|
||||
if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
|
||||
{
|
||||
return false;
|
||||
|
@ -480,6 +465,7 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no
|
|||
RemoteTransactionBegin(workerConnection);
|
||||
|
||||
/* iterate over the commands and execute them in the same connection */
|
||||
bool failed = false;
|
||||
const char *commandString = NULL;
|
||||
foreach_ptr(commandString, commandList)
|
||||
{
|
||||
|
@ -499,6 +485,28 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no
|
|||
RemoteTransactionCommit(workerConnection);
|
||||
}
|
||||
|
||||
CloseRemoteTransaction(workerConnection);
|
||||
|
||||
return !failed;
|
||||
}
|
||||
|
||||
/*
|
||||
* SendOptionalCommandListToWorkerOutsideTransaction sends the given command
|
||||
* list to the given worker in a single transaction that is outside of the
|
||||
* coordinated tranaction. If any of the commands fail, it rollbacks the
|
||||
* transaction, and otherwise commits.
|
||||
*/
|
||||
bool
|
||||
SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
|
||||
const char *nodeUser, List *commandList)
|
||||
{
|
||||
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||
|
||||
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||
nodeName, nodePort,
|
||||
nodeUser, NULL);
|
||||
bool failed = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(workerConnection,
|
||||
commandList);
|
||||
CloseConnection(workerConnection);
|
||||
|
||||
return !failed;
|
||||
|
|
|
@ -289,6 +289,7 @@ extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
|
|||
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
|
||||
int32 port, const char *user,
|
||||
const char *database);
|
||||
extern MultiConnection* GetLocalConnectionForSubtransactionAsUser(char* userName);
|
||||
extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
|
||||
const char *hostname,
|
||||
int32 port,
|
||||
|
|
|
@ -60,6 +60,9 @@ extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeNa
|
|||
int32 nodePort,
|
||||
const char *nodeUser,
|
||||
List *commandList);
|
||||
extern bool SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
|
||||
MultiConnection *workerConnection,
|
||||
List *commandList);
|
||||
extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const
|
||||
char *nodeName,
|
||||
int32 nodePort,
|
||||
|
@ -75,7 +78,7 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
|
|||
int32 nodePort,
|
||||
const char *nodeUser,
|
||||
List *commandList);
|
||||
extern void SendCommandListToWorkerInSeparateTransaction(
|
||||
extern void SendCommandListToWorkerOutsideTransactionWithConnection(
|
||||
MultiConnection *workerConnection,
|
||||
List *commandList);
|
||||
extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
|
||||
|
|
Loading…
Reference in New Issue