Added replication origin session for local shard split and move

operations.
pull/6453/head
Rajesh Kumar Thandapani 2022-12-08 11:09:20 +05:30
parent b8539cb67e
commit af68b32ccb
1 changed files with 105 additions and 35 deletions

View File

@ -33,11 +33,11 @@
*/
static StringInfo LocalCopyBuffer;
#define CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD \
"SELECT pg_catalog.pg_replication_origin_create('citus_cdc_%d') \
where (select pg_catalog.pg_replication_origin_oid('citus_cdc_%d')) IS NULL;"
"SELECT pg_catalog.pg_replication_origin_create('citus_internal_%d') \
where (select pg_catalog.pg_replication_origin_oid('citus_internal_%d')) IS NULL;"
#define CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD \
"SELECT pg_catalog.pg_replication_origin_session_setup('citus_cdc_%d') \
"SELECT pg_catalog.pg_replication_origin_session_setup('citus_internal_%d') \
where pg_catalog.pg_replication_origin_session_is_setup()='f';"
#define CDC_REPLICATION_ORIGIN_SESION_RESET_CMD \
@ -68,6 +68,9 @@ typedef struct ShardCopyDestReceiver
/* local copy if destination shard in same node */
bool useLocalCopy;
/* Replication Origin Id for local copy*/
RepOriginId originId;
/* EState for per-tuple memory allocation */
EState *executorState;
@ -92,7 +95,9 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
localCopyOutState);
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);
static void CreateReplicationOriginIfNotExists(ShardCopyDestReceiver *dest);
static void ReplicationOriginSessionCreate(ShardCopyDestReceiver *dest);
static void ReplicationOriginSessionSetup(ShardCopyDestReceiver *dest);
static void ReplicationOriginSessionReset(ShardCopyDestReceiver *dest);
static bool
CanUseLocalCopy(uint32_t destinationNodeId)
@ -119,13 +124,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection);
/* Setup Replication Origin Session if not setup already for
* avoiding publication of events more than once. */
char replicationOrginSetupCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE];
int originId = GetLocalNodeId();
snprintf(replicationOrginSetupCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE,
CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD, originId);
ExecuteCriticalRemoteCommand(copyDest->connection, replicationOrginSetupCommand);
ReplicationOriginSessionSetup(copyDest);
StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
@ -209,6 +208,9 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
CopyOutState copyOutState = copyDest->copyOutState;
if (copyDest->useLocalCopy)
{
/* Setup replication origin session for local copy*/
ReplicationOriginSessionSetup(copyDest);
WriteLocalTuple(slot, copyDest);
if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte)
{
@ -284,32 +286,102 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
copyDest->copyOutState = copyOutState;
CreateReplicationOriginIfNotExists(copyDest);
ReplicationOriginSessionCreate(copyDest);
}
/* CreateReplicationOriginIfNotExists creates a replication origin if it does
* not already exist already. To make the origin id unique for different nodes,
* origin node's id is appended to the prefix citus_cdc_.*/
/* ReplicationOriginSessionCreate creates a new replication origin if it does
* not already exist already. To make the replication origin name unique
* for different nodes, origin node's id is appended to the prefix citus_internal_.*/
static void
CreateReplicationOriginIfNotExists(ShardCopyDestReceiver *dest)
ReplicationOriginSessionCreate(ShardCopyDestReceiver *dest)
{
int connectionFlags = OUTSIDE_TRANSACTION;
char *currentUser = CurrentUserName();
WorkerNode *workerNode = FindNodeWithNodeId(dest->destinationNodeId,
false /* missingOk */);
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName,
workerNode->workerPort,
currentUser,
NULL /* database (current) */);
char replicationOrginCreateCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE];
int originId = GetLocalNodeId();
snprintf(replicationOrginCreateCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE,
CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD, originId, originId);
int localid = GetLocalNodeId();
if (dest->useLocalCopy)
{
char originName[64];
snprintf(originName, sizeof(originName), "citus_internal_%d", localid);
RepOriginId originId = replorigin_by_name(originName, true);
if (originId == InvalidRepOriginId)
{
originId = replorigin_create(originName);
}
dest->originId = originId;
}
else
{
int connectionFlags = OUTSIDE_TRANSACTION;
char *currentUser = CurrentUserName();
WorkerNode *workerNode = FindNodeWithNodeId(dest->destinationNodeId,
false /* missingOk */);
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName,
workerNode->workerPort,
currentUser,
NULL /* database (current) */);
ExecuteCriticalRemoteCommand(connection, replicationOrginCreateCommand);
CloseConnection(connection);
char replicationOrginCreateCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE];
snprintf(replicationOrginCreateCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE,
CDC_REPLICATION_ORIGIN_CREATE_IF_NOT_EXISTS_CMD, localid, localid);
ExecuteCriticalRemoteCommand(connection, replicationOrginCreateCommand);
CloseConnection(connection);
dest->originId = InvalidRepOriginId;
}
}
/* ReplicationOriginSessionSetup sets up a new replication origin session in a
* local or remote session depending on the useLocalCopy flag. If useLocalCopy
* is set, a local replication origin session is setup, otherwise a remote
* replication origin session is setup to the destination node.
*/
void
ReplicationOriginSessionSetup(ShardCopyDestReceiver *dest)
{
if (dest->useLocalCopy)
{
/*Setup Replication Origin in local session */
if (replorigin_session_origin == InvalidRepOriginId)
{
replorigin_session_setup(dest->originId);
replorigin_session_origin = dest->originId;
}
}
else
{
/*Setup Replication Origin in remote session */
char replicationOrginSetupCommand[REPLICATION_ORIGIN_CMD_BUFFER_SIZE];
int localId = GetLocalNodeId();
snprintf(replicationOrginSetupCommand, REPLICATION_ORIGIN_CMD_BUFFER_SIZE,
CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD, localId);
ExecuteCriticalRemoteCommand(dest->connection, replicationOrginSetupCommand);
}
}
/* ReplicationOriginSessionReset resets the replication origin session in a
* local or remote session depending on the useLocalCopy flag.
*/
void
ReplicationOriginSessionReset(ShardCopyDestReceiver *dest)
{
if (dest->useLocalCopy)
{
/*Reset Replication Origin in local session */
if (replorigin_session_origin != InvalidRepOriginId)
{
replorigin_session_reset();
replorigin_session_origin = InvalidRepOriginId;
}
dest->originId = InvalidRepOriginId;
}
else
{
/*Reset Replication Origin in remote session */
ExecuteCriticalRemoteCommand(dest->connection,
CDC_REPLICATION_ORIGIN_SESION_RESET_CMD);
}
}
@ -331,6 +403,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
/* end the COPY input */
LocalCopyToShard(copyDest, copyDest->copyOutState);
}
ReplicationOriginSessionReset(copyDest);
}
else if (copyDest->connection != NULL)
{
@ -368,10 +441,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
PQclear(result);
ForgetResults(copyDest->connection);
/*Reset Replication Origin. */
ExecuteCriticalRemoteCommand(copyDest->connection,
CDC_REPLICATION_ORIGIN_SESION_RESET_CMD);
ReplicationOriginSessionReset(copyDest);
CloseConnection(copyDest->connection);
}