Refactored replication origin functions to utils. added replication

orgin to undistribute table and alter distribure table.
pull/6453/head
Rajesh Kumar Thandapani 2022-12-12 12:30:11 +05:30
parent af68b32ccb
commit 8b857d484f
4 changed files with 260 additions and 123 deletions

View File

@ -57,6 +57,7 @@
#include "distributed/relation_access_tracking.h"
#include "distributed/shared_library_init.h"
#include "distributed/shard_utils.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "executor/spi.h"
@ -402,7 +403,10 @@ UndistributeTable(TableConversionParameters *params)
params->conversionType = UNDISTRIBUTE_TABLE;
params->shardCountIsNull = true;
TableConversionState *con = CreateTableConversion(params);
return ConvertTable(con);
ReplicationOriginSessionSetup(NULL);
TableConversionReturn *conv = ConvertTable(con);
ReplicationOriginSessionReset(NULL);
return conv;
}
@ -441,7 +445,11 @@ AlterDistributedTable(TableConversionParameters *params)
ereport(DEBUG1, (errmsg("setting multi shard modify mode to sequential")));
SetLocalMultiShardModifyModeToSequential();
}
return ConvertTable(con);
ReplicationOriginSessionSetup(NULL);
TableConversionReturn *conv = ConvertTable(con);
ReplicationOriginSessionReset(NULL);
return conv;
}

View File

@ -24,7 +24,7 @@
#include "distributed/relation_utils.h"
#include "distributed/version_compat.h"
#include "distributed/local_executor.h"
#include "replication/origin.h"
#include "distributed/replication_origin_session_utils.h"
/*
* LocalCopyBuffer is used in copy callback to return the copied rows.
@ -32,17 +32,6 @@
* argument to the copy callback.
*/
static StringInfo LocalCopyBuffer;
#define CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD \
"SELECT pg_catalog.pg_replication_origin_create('citus_internal_%d') \
where (select pg_catalog.pg_replication_origin_oid('citus_internal_%d')) IS NULL;"
#define CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD \
"SELECT pg_catalog.pg_replication_origin_session_setup('citus_internal_%d') \
where pg_catalog.pg_replication_origin_session_is_setup()='f';"
#define CDC_REPLICATION_ORIGIN_SESION_RESET_CMD \
"SELECT pg_catalog.pg_replication_origin_session_reset() \
where pg_catalog.pg_replication_origin_session_is_setup()='t';"
typedef struct ShardCopyDestReceiver
{
@ -68,9 +57,6 @@ typedef struct ShardCopyDestReceiver
/* local copy if destination shard in same node */
bool useLocalCopy;
/* Replication Origin Id for local copy*/
RepOriginId originId;
/* EState for per-tuple memory allocation */
EState *executorState;
@ -95,9 +81,6 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
localCopyOutState);
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);
static void ReplicationOriginSessionCreate(ShardCopyDestReceiver *dest);
static void ReplicationOriginSessionSetup(ShardCopyDestReceiver *dest);
static void ReplicationOriginSessionReset(ShardCopyDestReceiver *dest);
static bool
CanUseLocalCopy(uint32_t destinationNodeId)
@ -107,8 +90,6 @@ CanUseLocalCopy(uint32_t destinationNodeId)
}
#define REPLICATION_ORIGIN_CMD_BUFFER_SIZE 1024
/* Connect to node with source shard and trigger copy start. */
static void
ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
@ -124,7 +105,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection);
ReplicationOriginSessionSetup(copyDest);
ReplicationOriginSessionSetup(copyDest->connection);
StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
@ -170,7 +151,7 @@ CreateShardCopyDestReceiver(EState *executorState,
copyDest->tuplesSent = 0;
copyDest->connection = NULL;
copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId);
elog(LOG, "using local copy: %d", copyDest->useLocalCopy);
return (DestReceiver *) copyDest;
}
@ -209,7 +190,7 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
if (copyDest->useLocalCopy)
{
/* Setup replication origin session for local copy*/
ReplicationOriginSessionSetup(copyDest);
ReplicationOriginSessionSetup(NULL);
WriteLocalTuple(slot, copyDest);
if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte)
@ -286,102 +267,6 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
copyDest->copyOutState = copyOutState;
ReplicationOriginSessionCreate(copyDest);
}
/* ReplicationOriginSessionCreate creates a new replication origin if it does
* not already exist already. To make the replication origin name unique
* for different nodes, origin node's id is appended to the prefix citus_internal_.*/
static void
ReplicationOriginSessionCreate(ShardCopyDestReceiver *dest)
{
int localid = GetLocalNodeId();
if (dest->useLocalCopy)
{
char originName[64];
snprintf(originName, sizeof(originName), "citus_internal_%d", localid);
RepOriginId originId = replorigin_by_name(originName, true);
if (originId == InvalidRepOriginId)
{
originId = replorigin_create(originName);
}
dest->originId = originId;
}
else
{
int connectionFlags = OUTSIDE_TRANSACTION;
char *currentUser = CurrentUserName();
WorkerNode *workerNode = FindNodeWithNodeId(dest->destinationNodeId,
false /* missingOk */);
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName,
workerNode->workerPort,
currentUser,
NULL /* database (current) */);
char replicationOrginCreateCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE];
snprintf(replicationOrginCreateCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE,
CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD, localid, localid);
ExecuteCriticalRemoteCommand(connection, replicationOrginCreateCommand);
CloseConnection(connection);
dest->originId = InvalidRepOriginId;
}
}
/* ReplicationOriginSessionSetup sets up a new replication origin session in a
* local or remote session depending on the useLocalCopy flag. If useLocalCopy
* is set, a local replication origin session is setup, otherwise a remote
* replication origin session is setup to the destination node.
*/
void
ReplicationOriginSessionSetup(ShardCopyDestReceiver *dest)
{
if (dest->useLocalCopy)
{
/*Setup Replication Origin in local session */
if (replorigin_session_origin == InvalidRepOriginId)
{
replorigin_session_setup(dest->originId);
replorigin_session_origin = dest->originId;
}
}
else
{
/*Setup Replication Origin in remote session */
char replicationOrginSetupCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE];
int localId = GetLocalNodeId();
snprintf(replicationOrginSetupCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE,
CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD, localId);
ExecuteCriticalRemoteCommand(dest->connection, replicationOrginSetupCommand);
}
}
/* ReplicationOriginSessionReset resets the replication origin session in a
* local or remote session depending on the useLocalCopy flag.
*/
void
ReplicationOriginSessionReset(ShardCopyDestReceiver *dest)
{
if (dest->useLocalCopy)
{
/*Reset Replication Origin in local session */
if (replorigin_session_origin != InvalidRepOriginId)
{
replorigin_session_reset();
replorigin_session_origin = InvalidRepOriginId;
}
dest->originId = InvalidRepOriginId;
}
else
{
/*Reset Replication Origin in remote session */
ExecuteCriticalRemoteCommand(dest->connection,
CDC_REPLICATION_ORIGIN_SESION_RESET_CMD);
}
}
@ -403,7 +288,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
/* end the COPY input */
LocalCopyToShard(copyDest, copyDest->copyOutState);
}
ReplicationOriginSessionReset(copyDest);
ReplicationOriginSessionReset(NULL);
}
else if (copyDest->connection != NULL)
{
@ -441,7 +326,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
PQclear(result);
ForgetResults(copyDest->connection);
ReplicationOriginSessionReset(copyDest);
ReplicationOriginSessionReset(copyDest->connection);
CloseConnection(copyDest->connection);
}

View File

@ -0,0 +1,222 @@
/*-------------------------------------------------------------------------
*
* replication_origin_session_utils.c
* Functions for managing replication origin session.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "distributed/replication_origin_session_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/metadata_cache.h"
static bool isReplicationOriginSessionSetup(MultiConnection *connection);
static bool isReplicationOriginCreated(MultiConnection *connection, char *originName,
RepOriginId *originId);
static RepOriginId ReplicationOriginSessionCreate(MultiConnection *connection,
char *originName);
static void ReplicationOriginSessionSetupHelper(MultiConnection *connection,
RepOriginId originId, char *originName);
static bool ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command,
char *expected);
/* ReplicationOriginSessionSetup sets up a new replication origin session in a
* local or remote session depending on the useLocalCopy flag. If useLocalCopy
* is set, a local replication origin session is setup, otherwise a remote
* replication origin session is setup to the destination node.
*/
void
ReplicationOriginSessionSetup(MultiConnection *connection)
{
if (!isReplicationOriginSessionSetup(connection))
{
int localid = GetLocalNodeId();
RepOriginId originId = InvalidRepOriginId;
StringInfo originNameString = makeStringInfo();
appendStringInfo(originNameString, "citus_internal_%d", localid);
if (!isReplicationOriginCreated(connection, originNameString->data, &originId))
{
originId = ReplicationOriginSessionCreate(connection, originNameString->data);
}
ReplicationOriginSessionSetupHelper(connection, originId, originNameString->data);
}
}
/* ReplicationOriginSessionReset resets the replication origin session in a
* local or remote session depending on the useLocalCopy flag.
*/
void
ReplicationOriginSessionReset(MultiConnection *connection)
{
if (connection == NULL)
{
/*Reset Replication Origin in local session */
if (replorigin_session_origin != InvalidRepOriginId)
{
replorigin_session_reset();
replorigin_session_origin = InvalidRepOriginId;
}
}
else
{
/*Reset Replication Origin in remote session */
StringInfo replicationOriginSessionResetQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionResetQuery,
"select pg_catalog.pg_replication_origin_session_reset()");
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionResetQuery->data);
}
}
/* isReplicationOriginSessionSetup checks if the replication origin is setup
* already in the local or remote session.
*/
static bool
isReplicationOriginSessionSetup(MultiConnection *connection)
{
bool result = false;
if (connection == NULL)
{
return replorigin_session_origin != InvalidRepOriginId;
}
else
{
/*Setup Replication Origin in remote session */
StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo();
appendStringInfo(isReplicationOriginSessionSetupQuery,
"SELECT pg_catalog.pg_replication_origin_session_is_setup()");
result =
ExecuteRemoteCommandAndCheckResult(connection,
isReplicationOriginSessionSetupQuery->data,
"t");
}
return result;
}
/* isReplicationOriginCreated checks if the replication origin is created
* in the local or remote session.*/
static bool
isReplicationOriginCreated(MultiConnection *connection, char *originName,
RepOriginId *originId)
{
bool result = false;
if (connection == NULL)
{
*originId = replorigin_by_name(originName, true);
result = (*originId != InvalidRepOriginId);
}
else
{
/*Setup Replication Origin in remote session */
StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo();
appendStringInfo(isReplicationOriginSessionSetupQuery,
"SELECT pg_catalog.pg_replication_origin_oid('%s');",
originName);
/* If the replication origin was already created the above command
* will return the id of the entry in pg_replication_origin table.
* So to check if the entry is there alreay, the retuen value should
* be non-empty and the condition below checks that. */
result = !ExecuteRemoteCommandAndCheckResult(connection,
isReplicationOriginSessionSetupQuery
->data, "");
}
return result;
}
/* ReplicationOriginSessionCreate creates a new replication origin if it does
* not already exist already. To make the replication origin name unique
* for different nodes, origin node's id is appended to the prefix citus_internal_.*/
static RepOriginId
ReplicationOriginSessionCreate(MultiConnection *connection, char *originName)
{
RepOriginId originId = InvalidRepOriginId;
if (connection == NULL)
{
originId = replorigin_create(originName);
}
else
{
StringInfo replicationOriginCreateQuery = makeStringInfo();
appendStringInfo(replicationOriginCreateQuery,
"select pg_catalog.pg_replication_origin_create('%s')",
originName);
ExecuteCriticalRemoteCommand(connection, replicationOriginCreateQuery->data);
}
return originId;
}
/* ReplicationOriginSessionSetupHelper sets up a new replication origin session in a
* local or remote session depending on the useLocalCopy flag. If useLocalCopy
* is set, a local replication origin session is setup, otherwise a remote
* replication origin session is setup to the destination node.
*/
static void
ReplicationOriginSessionSetupHelper(MultiConnection *connection,
RepOriginId originId, char *originName)
{
if (connection == NULL)
{
/*Setup Replication Origin in local session */
replorigin_session_setup(originId);
replorigin_session_origin = originId;
}
else
{
/*Setup Replication Origin in remote session */
StringInfo replicationOriginSessionSetupQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionSetupQuery,
"select pg_catalog.pg_replication_origin_session_setup('%s')",
originName);
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionSetupQuery->data);
}
}
/* ExecuteRemoteCommandAndCheckResult executes the given command in the remote node and
* checks if the result is equal to the expected result. If the result is equal to the
* expected result, the function returns true, otherwise it returns false.
*/
static bool
ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command,
char *expected)
{
if (!SendRemoteCommand(connection, command))
{
/* if we cannot connect, we warn and report false */
ReportConnectionError(connection, WARNING);
return false;
}
bool raiseInterrupts = true;
PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts);
/* if remote node throws an error, we also throw an error */
if (!IsResponseOK(queryResult))
{
ReportResultError(connection, queryResult, ERROR);
}
StringInfo queryResultString = makeStringInfo();
/* Evaluate the queryResult and store it into the queryResultString */
bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString);
bool result = false;
if (success && strcmp(queryResultString->data, expected) == 0)
{
result = true;
}
PQclear(queryResult);
bool raiseErrors = false;
ClearResults(connection, raiseErrors);
return result;
}

View File

@ -0,0 +1,22 @@
/*-------------------------------------------------------------------------
*
* replication_origin_utils.h
* Utilities related to replication origin.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef REPLICATION_ORIGIN_SESSION_UTILS_H
#define REPLICATION_ORIGIN_SESSION_UTILS_H
#include "postgres.h"
#include "replication/origin.h"
#include "distributed/connection_management.h"
void ReplicationOriginSessionSetup(MultiConnection *connection);
void ReplicationOriginSessionReset(MultiConnection *connection);
#endif /* REPLICATION_ORIGIN_SESSION_UTILS_H */