diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 301c28435..ec7c71134 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -403,9 +403,9 @@ UndistributeTable(TableConversionParameters *params) params->conversionType = UNDISTRIBUTE_TABLE; params->shardCountIsNull = true; TableConversionState *con = CreateTableConversion(params); - ReplicationOriginSessionSetup(NULL); + SetupReplicationOriginLocalSession(); TableConversionReturn *conv = ConvertTable(con); - ReplicationOriginSessionReset(NULL); + ResetReplicationOriginLocalSession(); return conv; } @@ -445,9 +445,9 @@ AlterDistributedTable(TableConversionParameters *params) ereport(DEBUG1, (errmsg("setting multi shard modify mode to sequential"))); SetLocalMultiShardModifyModeToSequential(); } - ReplicationOriginSessionSetup(NULL); + SetupReplicationOriginLocalSession(); TableConversionReturn *conv = ConvertTable(con); - ReplicationOriginSessionReset(NULL); + ResetReplicationOriginLocalSession(); return conv; } diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index fbfce7119..612b9bc72 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -36,6 +36,7 @@ #include "distributed/local_multi_copy.h" #include "distributed/shard_utils.h" #include "distributed/version_compat.h" +#include "distributed/replication_origin_session_utils.h" /* managed via GUC, default is 512 kB */ int LocalCopyFlushThresholdByte = 512 * 1024; @@ -206,6 +207,8 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat */ LocalCopyBuffer = buffer; + SetupReplicationOriginLocalSession(); + Oid shardOid = GetTableLocalShardOid(relationId, shardId); Relation shard = table_open(shardOid, RowExclusiveLock); ParseState *pState = make_parsestate(NULL); @@ -219,6 +222,7 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat EndCopyFrom(cstate); table_close(shard, NoLock); + ResetReplicationOriginLocalSession(); free_parsestate(pState); } diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index 752552343..8b2e5c648 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -26,6 +26,7 @@ #include "distributed/multi_executor.h" #include "distributed/pg_dist_shard.h" #include "distributed/remote_commands.h" +#include "distributed/replication_origin_session_utils.h" #include "distributed/tuplestore.h" #include "distributed/utils/array_type.h" #include "distributed/utils/function.h" @@ -540,6 +541,12 @@ PartitionedResultDestReceiverShutdown(DestReceiver *dest) DestReceiver *partitionDest = self->partitionDestReceivers[i]; partitionDest->rShutdown(partitionDest); } + i = -1; + while ((i = bms_next_member(self->startedDestReceivers, i)) >= 0) + { + DestReceiver *partitionDest = self->partitionDestReceivers[i]; + partitionDest->rDestroy(partitionDest); + } /* empty the set of started receivers which allows them to be restarted again */ bms_free(self->startedDestReceivers); diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 46e6e0fc9..104beb206 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -105,7 +105,12 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); - ReplicationOriginSessionSetup(copyDest->connection); + StringInfo fullShardNameString = makeStringInfo(); + appendStringInfo(fullShardNameString, "%s.%s", + (char *) linitial(copyDest->destinationShardFullyQualifiedName), + (char *) lsecond(copyDest->destinationShardFullyQualifiedName)); + + SetupReplicationOriginRemoteSession(copyDest->connection, fullShardNameString->data); StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, @@ -151,7 +156,6 @@ CreateShardCopyDestReceiver(EState *executorState, copyDest->tuplesSent = 0; copyDest->connection = NULL; copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId); - elog(LOG, "using local copy: %d", copyDest->useLocalCopy); return (DestReceiver *) copyDest; } @@ -190,7 +194,6 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) if (copyDest->useLocalCopy) { /* Setup replication origin session for local copy*/ - ReplicationOriginSessionSetup(NULL); WriteLocalTuple(slot, copyDest); if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte) @@ -267,6 +270,11 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); copyDest->copyOutState = copyOutState; + if (copyDest->useLocalCopy) + { + /* Setup replication origin session for local copy*/ + SetupReplicationOriginLocalSession(); + } } @@ -288,7 +296,6 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) /* end the COPY input */ LocalCopyToShard(copyDest, copyDest->copyOutState); } - ReplicationOriginSessionReset(NULL); } else if (copyDest->connection != NULL) { @@ -326,7 +333,13 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) PQclear(result); ForgetResults(copyDest->connection); - ReplicationOriginSessionReset(copyDest->connection); + StringInfo fullShardNameString = makeStringInfo(); + appendStringInfo(fullShardNameString, "%s.%s", + (char *) linitial(copyDest->destinationShardFullyQualifiedName), + (char *) lsecond(copyDest->destinationShardFullyQualifiedName)); + + ResetReplicationOriginRemoteSession(copyDest->connection, + fullShardNameString->data); CloseConnection(copyDest->connection); } @@ -340,6 +353,10 @@ static void ShardCopyDestReceiverDestroy(DestReceiver *dest) { ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest; + if (copyDest->useLocalCopy) + { + ResetReplicationOriginLocalSession(); + } if (copyDest->copyOutState) { diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 82c3fb5f7..9c8f02d24 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -34,6 +34,7 @@ #include "distributed/multi_logical_replication.h" #include "distributed/multi_explain.h" #include "distributed/repartition_join_execution.h" +#include "distributed/replication_origin_session_utils.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" @@ -381,6 +382,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); ResetRelationAccessHash(); + /* Reset any local replication origin session since transaction has been aborted.*/ + ResetReplicationOriginLocalSession(); + /* * Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus * so that any created OIDs from the table are cleared and invalidated. We @@ -684,6 +688,9 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, InvalidateMetadataSystemCache(); SetCreateCitusTransactionLevel(0); } + + /* Reset any local replication origin session since subtransaction has been aborted.*/ + ResetReplicationOriginLocalSession(); break; } diff --git a/src/backend/distributed/utils/replication_origin_session_utils.c b/src/backend/distributed/utils/replication_origin_session_utils.c index ad0118cff..fa43cb69d 100644 --- a/src/backend/distributed/utils/replication_origin_session_utils.c +++ b/src/backend/distributed/utils/replication_origin_session_utils.c @@ -11,63 +11,89 @@ #include "distributed/replication_origin_session_utils.h" #include "distributed/remote_commands.h" #include "distributed/metadata_cache.h" +#include "utils/builtins.h" static bool isReplicationOriginSessionSetup(MultiConnection *connection); static bool isReplicationOriginCreated(MultiConnection *connection, char *originName, RepOriginId *originId); -static RepOriginId ReplicationOriginSessionCreate(MultiConnection *connection, +static RepOriginId CreateReplicationOriginSession(MultiConnection *connection, char *originName); static void ReplicationOriginSessionSetupHelper(MultiConnection *connection, - RepOriginId originId, char *originName); + char *originName); static bool ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command, char *expected); -/* ReplicationOriginSessionSetup sets up a new replication origin session in a - * local or remote session depending on the useLocalCopy flag. If useLocalCopy - * is set, a local replication origin session is setup, otherwise a remote - * replication origin session is setup to the destination node. +/* SetupReplicationOriginRemoteSession sets up a new replication origin session in a + * remote session. The identifier is used to create a unique replication origin name + * for the session in the remote node. */ void -ReplicationOriginSessionSetup(MultiConnection *connection) +SetupReplicationOriginRemoteSession(MultiConnection *connection, char *identifier) { if (!isReplicationOriginSessionSetup(connection)) { - int localid = GetLocalNodeId(); - RepOriginId originId = InvalidRepOriginId; StringInfo originNameString = makeStringInfo(); - appendStringInfo(originNameString, "citus_internal_%d", localid); - if (!isReplicationOriginCreated(connection, originNameString->data, &originId)) - { - originId = ReplicationOriginSessionCreate(connection, originNameString->data); - } - ReplicationOriginSessionSetupHelper(connection, originId, originNameString->data); + appendStringInfo(originNameString, "citus_internal_%d_%s", GetLocalNodeId(), + identifier); + char *originName = quote_literal_cstr(originNameString->data); + ReplicationOriginSessionSetupHelper(connection, originName); } } -/* ReplicationOriginSessionReset resets the replication origin session in a - * local or remote session depending on the useLocalCopy flag. +/* ResetReplicationOriginRemoteSession resets the replication origin session in a + * remote node. */ void -ReplicationOriginSessionReset(MultiConnection *connection) +ResetReplicationOriginRemoteSession(MultiConnection *connection, char *identifier) { - if (connection == NULL) + /* Reset the replication origin in remote session*/ + StringInfo replicationOriginSessionResetQuery = makeStringInfo(); + appendStringInfo(replicationOriginSessionResetQuery, + "select pg_catalog.pg_replication_origin_session_reset();"); + ExecuteCriticalRemoteCommand(connection, + replicationOriginSessionResetQuery->data); + + /* Drop the replication origin entry created in remote session.*/ + StringInfo originNameString = makeStringInfo(); + appendStringInfo(originNameString, "citus_internal_%d_%s", GetLocalNodeId(), + identifier); + StringInfo replicationOriginSessionDropQuery = makeStringInfo(); + appendStringInfo(replicationOriginSessionDropQuery, + "select pg_catalog.pg_replication_origin_drop(%s);", + quote_literal_cstr(originNameString->data)); + + ExecuteCriticalRemoteCommand(connection, + replicationOriginSessionDropQuery->data); +} + + +/* SetupReplicationOriginLocalSession sets up a new replication origin session in a + * local session. + */ +void +SetupReplicationOriginLocalSession(void) +{ + if (!isReplicationOriginSessionSetup(NULL)) { - /*Reset Replication Origin in local session */ - if (replorigin_session_origin != InvalidRepOriginId) - { - replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; - } + StringInfo originNameString = makeStringInfo(); + appendStringInfo(originNameString, "citus_internal_%d", GetLocalNodeId()); + ReplicationOriginSessionSetupHelper(NULL, originNameString->data); } - else +} + + +/* ResetReplicationOriginLocalSession resets the replication origin session in a + * local node. + */ +void +ResetReplicationOriginLocalSession(void) +{ + /*Reset Replication Origin in local session */ + if (replorigin_session_origin != InvalidRepOriginId) { - /*Reset Replication Origin in remote session */ - StringInfo replicationOriginSessionResetQuery = makeStringInfo(); - appendStringInfo(replicationOriginSessionResetQuery, - "select pg_catalog.pg_replication_origin_session_reset()"); - ExecuteCriticalRemoteCommand(connection, - replicationOriginSessionResetQuery->data); + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; } } @@ -81,7 +107,7 @@ isReplicationOriginSessionSetup(MultiConnection *connection) bool result = false; if (connection == NULL) { - return replorigin_session_origin != InvalidRepOriginId; + result = (replorigin_session_origin != InvalidRepOriginId); } else { @@ -115,7 +141,7 @@ isReplicationOriginCreated(MultiConnection *connection, char *originName, /*Setup Replication Origin in remote session */ StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo(); appendStringInfo(isReplicationOriginSessionSetupQuery, - "SELECT pg_catalog.pg_replication_origin_oid('%s');", + "SELECT pg_catalog.pg_replication_origin_oid(%s);", originName); /* If the replication origin was already created the above command @@ -130,11 +156,11 @@ isReplicationOriginCreated(MultiConnection *connection, char *originName, } -/* ReplicationOriginSessionCreate creates a new replication origin if it does +/* CreateReplicationOriginSession creates a new replication origin if it does * not already exist already. To make the replication origin name unique * for different nodes, origin node's id is appended to the prefix citus_internal_.*/ static RepOriginId -ReplicationOriginSessionCreate(MultiConnection *connection, char *originName) +CreateReplicationOriginSession(MultiConnection *connection, char *originName) { RepOriginId originId = InvalidRepOriginId; if (connection == NULL) @@ -145,7 +171,7 @@ ReplicationOriginSessionCreate(MultiConnection *connection, char *originName) { StringInfo replicationOriginCreateQuery = makeStringInfo(); appendStringInfo(replicationOriginCreateQuery, - "select pg_catalog.pg_replication_origin_create('%s')", + "select pg_catalog.pg_replication_origin_create(%s);", originName); ExecuteCriticalRemoteCommand(connection, replicationOriginCreateQuery->data); } @@ -154,14 +180,18 @@ ReplicationOriginSessionCreate(MultiConnection *connection, char *originName) /* ReplicationOriginSessionSetupHelper sets up a new replication origin session in a - * local or remote session depending on the useLocalCopy flag. If useLocalCopy - * is set, a local replication origin session is setup, otherwise a remote - * replication origin session is setup to the destination node. + * local or remote session. */ static void ReplicationOriginSessionSetupHelper(MultiConnection *connection, - RepOriginId originId, char *originName) + char *originName) { + RepOriginId originId = InvalidRepOriginId; + if (!isReplicationOriginCreated(connection, originName, &originId)) + { + originId = CreateReplicationOriginSession(connection, originName); + } + if (connection == NULL) { /*Setup Replication Origin in local session */ @@ -173,7 +203,7 @@ ReplicationOriginSessionSetupHelper(MultiConnection *connection, /*Setup Replication Origin in remote session */ StringInfo replicationOriginSessionSetupQuery = makeStringInfo(); appendStringInfo(replicationOriginSessionSetupQuery, - "select pg_catalog.pg_replication_origin_session_setup('%s')", + "select pg_catalog.pg_replication_origin_session_setup(%s);", originName); ExecuteCriticalRemoteCommand(connection, replicationOriginSessionSetupQuery->data); diff --git a/src/include/distributed/replication_origin_session_utils.h b/src/include/distributed/replication_origin_session_utils.h index 61170d169..66cf7917b 100644 --- a/src/include/distributed/replication_origin_session_utils.h +++ b/src/include/distributed/replication_origin_session_utils.h @@ -15,8 +15,11 @@ #include "replication/origin.h" #include "distributed/connection_management.h" -void ReplicationOriginSessionSetup(MultiConnection *connection); -void ReplicationOriginSessionReset(MultiConnection *connection); +void SetupReplicationOriginRemoteSession(MultiConnection *connection, char *identifier); +void ResetReplicationOriginRemoteSession(MultiConnection *connection, char *identifier); + +void SetupReplicationOriginLocalSession(void); +void ResetReplicationOriginLocalSession(void); #endif /* REPLICATION_ORIGIN_SESSION_UTILS_H */