From 8b857d484f5932a362c24f18130f3c85bbcecd1b Mon Sep 17 00:00:00 2001 From: Rajesh Kumar Thandapani Date: Mon, 12 Dec 2022 12:30:11 +0530 Subject: [PATCH] Refactored replication origin functions to utils. added replication orgin to undistribute table and alter distribure table. --- .../distributed/commands/alter_table.c | 12 +- .../operations/worker_shard_copy.c | 127 +--------- .../utils/replication_origin_session_utils.c | 222 ++++++++++++++++++ .../replication_origin_session_utils.h | 22 ++ 4 files changed, 260 insertions(+), 123 deletions(-) create mode 100644 src/backend/distributed/utils/replication_origin_session_utils.c create mode 100644 src/include/distributed/replication_origin_session_utils.h diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 90b847b28..301c28435 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -57,6 +57,7 @@ #include "distributed/relation_access_tracking.h" #include "distributed/shared_library_init.h" #include "distributed/shard_utils.h" +#include "distributed/replication_origin_session_utils.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" #include "executor/spi.h" @@ -402,7 +403,10 @@ UndistributeTable(TableConversionParameters *params) params->conversionType = UNDISTRIBUTE_TABLE; params->shardCountIsNull = true; TableConversionState *con = CreateTableConversion(params); - return ConvertTable(con); + ReplicationOriginSessionSetup(NULL); + TableConversionReturn *conv = ConvertTable(con); + ReplicationOriginSessionReset(NULL); + return conv; } @@ -441,7 +445,11 @@ AlterDistributedTable(TableConversionParameters *params) ereport(DEBUG1, (errmsg("setting multi shard modify mode to sequential"))); SetLocalMultiShardModifyModeToSequential(); } - return ConvertTable(con); + ReplicationOriginSessionSetup(NULL); + TableConversionReturn *conv = ConvertTable(con); + ReplicationOriginSessionReset(NULL); + + return conv; } diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 17272f048..46e6e0fc9 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -24,7 +24,7 @@ #include "distributed/relation_utils.h" #include "distributed/version_compat.h" #include "distributed/local_executor.h" -#include "replication/origin.h" +#include "distributed/replication_origin_session_utils.h" /* * LocalCopyBuffer is used in copy callback to return the copied rows. @@ -32,17 +32,6 @@ * argument to the copy callback. */ static StringInfo LocalCopyBuffer; -#define CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD \ - "SELECT pg_catalog.pg_replication_origin_create('citus_internal_%d') \ - where (select pg_catalog.pg_replication_origin_oid('citus_internal_%d')) IS NULL;" - -#define CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD \ - "SELECT pg_catalog.pg_replication_origin_session_setup('citus_internal_%d') \ - where pg_catalog.pg_replication_origin_session_is_setup()='f';" - -#define CDC_REPLICATION_ORIGIN_SESION_RESET_CMD \ - "SELECT pg_catalog.pg_replication_origin_session_reset() \ - where pg_catalog.pg_replication_origin_session_is_setup()='t';" typedef struct ShardCopyDestReceiver { @@ -68,9 +57,6 @@ typedef struct ShardCopyDestReceiver /* local copy if destination shard in same node */ bool useLocalCopy; - /* Replication Origin Id for local copy*/ - RepOriginId originId; - /* EState for per-tuple memory allocation */ EState *executorState; @@ -95,9 +81,6 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); -static void ReplicationOriginSessionCreate(ShardCopyDestReceiver *dest); -static void ReplicationOriginSessionSetup(ShardCopyDestReceiver *dest); -static void ReplicationOriginSessionReset(ShardCopyDestReceiver *dest); static bool CanUseLocalCopy(uint32_t destinationNodeId) @@ -107,8 +90,6 @@ CanUseLocalCopy(uint32_t destinationNodeId) } -#define REPLICATION_ORIGIN_CMD_BUFFER_SIZE 1024 - /* Connect to node with source shard and trigger copy start. */ static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) @@ -124,7 +105,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); - ReplicationOriginSessionSetup(copyDest); + ReplicationOriginSessionSetup(copyDest->connection); StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, @@ -170,7 +151,7 @@ CreateShardCopyDestReceiver(EState *executorState, copyDest->tuplesSent = 0; copyDest->connection = NULL; copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId); - + elog(LOG, "using local copy: %d", copyDest->useLocalCopy); return (DestReceiver *) copyDest; } @@ -209,7 +190,7 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) if (copyDest->useLocalCopy) { /* Setup replication origin session for local copy*/ - ReplicationOriginSessionSetup(copyDest); + ReplicationOriginSessionSetup(NULL); WriteLocalTuple(slot, copyDest); if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte) @@ -286,102 +267,6 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); copyDest->copyOutState = copyOutState; - ReplicationOriginSessionCreate(copyDest); -} - - -/* ReplicationOriginSessionCreate creates a new replication origin if it does - * not already exist already. To make the replication origin name unique - * for different nodes, origin node's id is appended to the prefix citus_internal_.*/ -static void -ReplicationOriginSessionCreate(ShardCopyDestReceiver *dest) -{ - int localid = GetLocalNodeId(); - if (dest->useLocalCopy) - { - char originName[64]; - snprintf(originName, sizeof(originName), "citus_internal_%d", localid); - RepOriginId originId = replorigin_by_name(originName, true); - if (originId == InvalidRepOriginId) - { - originId = replorigin_create(originName); - } - dest->originId = originId; - } - else - { - int connectionFlags = OUTSIDE_TRANSACTION; - char *currentUser = CurrentUserName(); - WorkerNode *workerNode = FindNodeWithNodeId(dest->destinationNodeId, - false /* missingOk */); - MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, - currentUser, - NULL /* database (current) */); - - char replicationOrginCreateCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE]; - snprintf(replicationOrginCreateCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE, - CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD, localid, localid); - - ExecuteCriticalRemoteCommand(connection, replicationOrginCreateCommand); - CloseConnection(connection); - dest->originId = InvalidRepOriginId; - } -} - - -/* ReplicationOriginSessionSetup sets up a new replication origin session in a - * local or remote session depending on the useLocalCopy flag. If useLocalCopy - * is set, a local replication origin session is setup, otherwise a remote - * replication origin session is setup to the destination node. - */ -void -ReplicationOriginSessionSetup(ShardCopyDestReceiver *dest) -{ - if (dest->useLocalCopy) - { - /*Setup Replication Origin in local session */ - if (replorigin_session_origin == InvalidRepOriginId) - { - replorigin_session_setup(dest->originId); - replorigin_session_origin = dest->originId; - } - } - else - { - /*Setup Replication Origin in remote session */ - char replicationOrginSetupCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE]; - int localId = GetLocalNodeId(); - snprintf(replicationOrginSetupCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE, - CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD, localId); - ExecuteCriticalRemoteCommand(dest->connection, replicationOrginSetupCommand); - } -} - - -/* ReplicationOriginSessionReset resets the replication origin session in a - * local or remote session depending on the useLocalCopy flag. - */ -void -ReplicationOriginSessionReset(ShardCopyDestReceiver *dest) -{ - if (dest->useLocalCopy) - { - /*Reset Replication Origin in local session */ - if (replorigin_session_origin != InvalidRepOriginId) - { - replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; - } - dest->originId = InvalidRepOriginId; - } - else - { - /*Reset Replication Origin in remote session */ - ExecuteCriticalRemoteCommand(dest->connection, - CDC_REPLICATION_ORIGIN_SESION_RESET_CMD); - } } @@ -403,7 +288,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) /* end the COPY input */ LocalCopyToShard(copyDest, copyDest->copyOutState); } - ReplicationOriginSessionReset(copyDest); + ReplicationOriginSessionReset(NULL); } else if (copyDest->connection != NULL) { @@ -441,7 +326,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) PQclear(result); ForgetResults(copyDest->connection); - ReplicationOriginSessionReset(copyDest); + ReplicationOriginSessionReset(copyDest->connection); CloseConnection(copyDest->connection); } diff --git a/src/backend/distributed/utils/replication_origin_session_utils.c b/src/backend/distributed/utils/replication_origin_session_utils.c new file mode 100644 index 000000000..ad0118cff --- /dev/null +++ b/src/backend/distributed/utils/replication_origin_session_utils.c @@ -0,0 +1,222 @@ +/*------------------------------------------------------------------------- + * + * replication_origin_session_utils.c + * Functions for managing replication origin session. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "distributed/replication_origin_session_utils.h" +#include "distributed/remote_commands.h" +#include "distributed/metadata_cache.h" + +static bool isReplicationOriginSessionSetup(MultiConnection *connection); +static bool isReplicationOriginCreated(MultiConnection *connection, char *originName, + RepOriginId *originId); +static RepOriginId ReplicationOriginSessionCreate(MultiConnection *connection, + char *originName); +static void ReplicationOriginSessionSetupHelper(MultiConnection *connection, + RepOriginId originId, char *originName); +static bool ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command, + char *expected); + +/* ReplicationOriginSessionSetup sets up a new replication origin session in a + * local or remote session depending on the useLocalCopy flag. If useLocalCopy + * is set, a local replication origin session is setup, otherwise a remote + * replication origin session is setup to the destination node. + */ +void +ReplicationOriginSessionSetup(MultiConnection *connection) +{ + if (!isReplicationOriginSessionSetup(connection)) + { + int localid = GetLocalNodeId(); + RepOriginId originId = InvalidRepOriginId; + StringInfo originNameString = makeStringInfo(); + appendStringInfo(originNameString, "citus_internal_%d", localid); + if (!isReplicationOriginCreated(connection, originNameString->data, &originId)) + { + originId = ReplicationOriginSessionCreate(connection, originNameString->data); + } + ReplicationOriginSessionSetupHelper(connection, originId, originNameString->data); + } +} + + +/* ReplicationOriginSessionReset resets the replication origin session in a + * local or remote session depending on the useLocalCopy flag. + */ +void +ReplicationOriginSessionReset(MultiConnection *connection) +{ + if (connection == NULL) + { + /*Reset Replication Origin in local session */ + if (replorigin_session_origin != InvalidRepOriginId) + { + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; + } + } + else + { + /*Reset Replication Origin in remote session */ + StringInfo replicationOriginSessionResetQuery = makeStringInfo(); + appendStringInfo(replicationOriginSessionResetQuery, + "select pg_catalog.pg_replication_origin_session_reset()"); + ExecuteCriticalRemoteCommand(connection, + replicationOriginSessionResetQuery->data); + } +} + + +/* isReplicationOriginSessionSetup checks if the replication origin is setup + * already in the local or remote session. + */ +static bool +isReplicationOriginSessionSetup(MultiConnection *connection) +{ + bool result = false; + if (connection == NULL) + { + return replorigin_session_origin != InvalidRepOriginId; + } + else + { + /*Setup Replication Origin in remote session */ + StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo(); + appendStringInfo(isReplicationOriginSessionSetupQuery, + "SELECT pg_catalog.pg_replication_origin_session_is_setup()"); + result = + ExecuteRemoteCommandAndCheckResult(connection, + isReplicationOriginSessionSetupQuery->data, + "t"); + } + return result; +} + + +/* isReplicationOriginCreated checks if the replication origin is created + * in the local or remote session.*/ +static bool +isReplicationOriginCreated(MultiConnection *connection, char *originName, + RepOriginId *originId) +{ + bool result = false; + if (connection == NULL) + { + *originId = replorigin_by_name(originName, true); + result = (*originId != InvalidRepOriginId); + } + else + { + /*Setup Replication Origin in remote session */ + StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo(); + appendStringInfo(isReplicationOriginSessionSetupQuery, + "SELECT pg_catalog.pg_replication_origin_oid('%s');", + originName); + + /* If the replication origin was already created the above command + * will return the id of the entry in pg_replication_origin table. + * So to check if the entry is there alreay, the retuen value should + * be non-empty and the condition below checks that. */ + result = !ExecuteRemoteCommandAndCheckResult(connection, + isReplicationOriginSessionSetupQuery + ->data, ""); + } + return result; +} + + +/* ReplicationOriginSessionCreate creates a new replication origin if it does + * not already exist already. To make the replication origin name unique + * for different nodes, origin node's id is appended to the prefix citus_internal_.*/ +static RepOriginId +ReplicationOriginSessionCreate(MultiConnection *connection, char *originName) +{ + RepOriginId originId = InvalidRepOriginId; + if (connection == NULL) + { + originId = replorigin_create(originName); + } + else + { + StringInfo replicationOriginCreateQuery = makeStringInfo(); + appendStringInfo(replicationOriginCreateQuery, + "select pg_catalog.pg_replication_origin_create('%s')", + originName); + ExecuteCriticalRemoteCommand(connection, replicationOriginCreateQuery->data); + } + return originId; +} + + +/* ReplicationOriginSessionSetupHelper sets up a new replication origin session in a + * local or remote session depending on the useLocalCopy flag. If useLocalCopy + * is set, a local replication origin session is setup, otherwise a remote + * replication origin session is setup to the destination node. + */ +static void +ReplicationOriginSessionSetupHelper(MultiConnection *connection, + RepOriginId originId, char *originName) +{ + if (connection == NULL) + { + /*Setup Replication Origin in local session */ + replorigin_session_setup(originId); + replorigin_session_origin = originId; + } + else + { + /*Setup Replication Origin in remote session */ + StringInfo replicationOriginSessionSetupQuery = makeStringInfo(); + appendStringInfo(replicationOriginSessionSetupQuery, + "select pg_catalog.pg_replication_origin_session_setup('%s')", + originName); + ExecuteCriticalRemoteCommand(connection, + replicationOriginSessionSetupQuery->data); + } +} + + +/* ExecuteRemoteCommandAndCheckResult executes the given command in the remote node and + * checks if the result is equal to the expected result. If the result is equal to the + * expected result, the function returns true, otherwise it returns false. + */ +static bool +ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command, + char *expected) +{ + if (!SendRemoteCommand(connection, command)) + { + /* if we cannot connect, we warn and report false */ + ReportConnectionError(connection, WARNING); + return false; + } + bool raiseInterrupts = true; + PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts); + + /* if remote node throws an error, we also throw an error */ + if (!IsResponseOK(queryResult)) + { + ReportResultError(connection, queryResult, ERROR); + } + + StringInfo queryResultString = makeStringInfo(); + + /* Evaluate the queryResult and store it into the queryResultString */ + bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString); + bool result = false; + if (success && strcmp(queryResultString->data, expected) == 0) + { + result = true; + } + + PQclear(queryResult); + bool raiseErrors = false; + ClearResults(connection, raiseErrors); + + return result; +} diff --git a/src/include/distributed/replication_origin_session_utils.h b/src/include/distributed/replication_origin_session_utils.h new file mode 100644 index 000000000..61170d169 --- /dev/null +++ b/src/include/distributed/replication_origin_session_utils.h @@ -0,0 +1,22 @@ +/*------------------------------------------------------------------------- + * + * replication_origin_utils.h + * Utilities related to replication origin. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef REPLICATION_ORIGIN_SESSION_UTILS_H +#define REPLICATION_ORIGIN_SESSION_UTILS_H + +#include "postgres.h" +#include "replication/origin.h" +#include "distributed/connection_management.h" + +void ReplicationOriginSessionSetup(MultiConnection *connection); +void ReplicationOriginSessionReset(MultiConnection *connection); + + +#endif /* REPLICATION_ORIGIN_SESSION_UTILS_H */