added shard name to replication origin for remote sessions,added

replication origin reset during aborts.
pull/6453/head
Rajesh Kumar Thandapani 2022-12-13 16:22:47 +05:30
parent 8b857d484f
commit f6c9383e53
7 changed files with 121 additions and 53 deletions

View File

@ -403,9 +403,9 @@ UndistributeTable(TableConversionParameters *params)
params->conversionType = UNDISTRIBUTE_TABLE;
params->shardCountIsNull = true;
TableConversionState *con = CreateTableConversion(params);
ReplicationOriginSessionSetup(NULL);
SetupReplicationOriginLocalSession();
TableConversionReturn *conv = ConvertTable(con);
ReplicationOriginSessionReset(NULL);
ResetReplicationOriginLocalSession();
return conv;
}
@ -445,9 +445,9 @@ AlterDistributedTable(TableConversionParameters *params)
ereport(DEBUG1, (errmsg("setting multi shard modify mode to sequential")));
SetLocalMultiShardModifyModeToSequential();
}
ReplicationOriginSessionSetup(NULL);
SetupReplicationOriginLocalSession();
TableConversionReturn *conv = ConvertTable(con);
ReplicationOriginSessionReset(NULL);
ResetReplicationOriginLocalSession();
return conv;
}

View File

@ -36,6 +36,7 @@
#include "distributed/local_multi_copy.h"
#include "distributed/shard_utils.h"
#include "distributed/version_compat.h"
#include "distributed/replication_origin_session_utils.h"
/* managed via GUC, default is 512 kB */
int LocalCopyFlushThresholdByte = 512 * 1024;
@ -206,6 +207,8 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat
*/
LocalCopyBuffer = buffer;
SetupReplicationOriginLocalSession();
Oid shardOid = GetTableLocalShardOid(relationId, shardId);
Relation shard = table_open(shardOid, RowExclusiveLock);
ParseState *pState = make_parsestate(NULL);
@ -219,6 +222,7 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat
EndCopyFrom(cstate);
table_close(shard, NoLock);
ResetReplicationOriginLocalSession();
free_parsestate(pState);
}

View File

@ -26,6 +26,7 @@
#include "distributed/multi_executor.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/remote_commands.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/tuplestore.h"
#include "distributed/utils/array_type.h"
#include "distributed/utils/function.h"
@ -540,6 +541,12 @@ PartitionedResultDestReceiverShutdown(DestReceiver *dest)
DestReceiver *partitionDest = self->partitionDestReceivers[i];
partitionDest->rShutdown(partitionDest);
}
i = -1;
while ((i = bms_next_member(self->startedDestReceivers, i)) >= 0)
{
DestReceiver *partitionDest = self->partitionDestReceivers[i];
partitionDest->rDestroy(partitionDest);
}
/* empty the set of started receivers which allows them to be restarted again */
bms_free(self->startedDestReceivers);

View File

@ -105,7 +105,12 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection);
ReplicationOriginSessionSetup(copyDest->connection);
StringInfo fullShardNameString = makeStringInfo();
appendStringInfo(fullShardNameString, "%s.%s",
(char *) linitial(copyDest->destinationShardFullyQualifiedName),
(char *) lsecond(copyDest->destinationShardFullyQualifiedName));
SetupReplicationOriginRemoteSession(copyDest->connection, fullShardNameString->data);
StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
@ -151,7 +156,6 @@ CreateShardCopyDestReceiver(EState *executorState,
copyDest->tuplesSent = 0;
copyDest->connection = NULL;
copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId);
elog(LOG, "using local copy: %d", copyDest->useLocalCopy);
return (DestReceiver *) copyDest;
}
@ -190,7 +194,6 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
if (copyDest->useLocalCopy)
{
/* Setup replication origin session for local copy*/
ReplicationOriginSessionSetup(NULL);
WriteLocalTuple(slot, copyDest);
if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte)
@ -267,6 +270,11 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
copyDest->copyOutState = copyOutState;
if (copyDest->useLocalCopy)
{
/* Setup replication origin session for local copy*/
SetupReplicationOriginLocalSession();
}
}
@ -288,7 +296,6 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
/* end the COPY input */
LocalCopyToShard(copyDest, copyDest->copyOutState);
}
ReplicationOriginSessionReset(NULL);
}
else if (copyDest->connection != NULL)
{
@ -326,7 +333,13 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
PQclear(result);
ForgetResults(copyDest->connection);
ReplicationOriginSessionReset(copyDest->connection);
StringInfo fullShardNameString = makeStringInfo();
appendStringInfo(fullShardNameString, "%s.%s",
(char *) linitial(copyDest->destinationShardFullyQualifiedName),
(char *) lsecond(copyDest->destinationShardFullyQualifiedName));
ResetReplicationOriginRemoteSession(copyDest->connection,
fullShardNameString->data);
CloseConnection(copyDest->connection);
}
@ -340,6 +353,10 @@ static void
ShardCopyDestReceiverDestroy(DestReceiver *dest)
{
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest;
if (copyDest->useLocalCopy)
{
ResetReplicationOriginLocalSession();
}
if (copyDest->copyOutState)
{

View File

@ -34,6 +34,7 @@
#include "distributed/multi_logical_replication.h"
#include "distributed/multi_explain.h"
#include "distributed/repartition_join_execution.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
@ -381,6 +382,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
ResetGlobalVariables();
ResetRelationAccessHash();
/* Reset any local replication origin session since transaction has been aborted.*/
ResetReplicationOriginLocalSession();
/*
* Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus
* so that any created OIDs from the table are cleared and invalidated. We
@ -684,6 +688,9 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
InvalidateMetadataSystemCache();
SetCreateCitusTransactionLevel(0);
}
/* Reset any local replication origin session since subtransaction has been aborted.*/
ResetReplicationOriginLocalSession();
break;
}

View File

@ -11,63 +11,89 @@
#include "distributed/replication_origin_session_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/metadata_cache.h"
#include "utils/builtins.h"
static bool isReplicationOriginSessionSetup(MultiConnection *connection);
static bool isReplicationOriginCreated(MultiConnection *connection, char *originName,
RepOriginId *originId);
static RepOriginId ReplicationOriginSessionCreate(MultiConnection *connection,
static RepOriginId CreateReplicationOriginSession(MultiConnection *connection,
char *originName);
static void ReplicationOriginSessionSetupHelper(MultiConnection *connection,
RepOriginId originId, char *originName);
char *originName);
static bool ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command,
char *expected);
/* ReplicationOriginSessionSetup sets up a new replication origin session in a
* local or remote session depending on the useLocalCopy flag. If useLocalCopy
* is set, a local replication origin session is setup, otherwise a remote
* replication origin session is setup to the destination node.
/* SetupReplicationOriginRemoteSession sets up a new replication origin session in a
* remote session. The identifier is used to create a unique replication origin name
* for the session in the remote node.
*/
void
ReplicationOriginSessionSetup(MultiConnection *connection)
SetupReplicationOriginRemoteSession(MultiConnection *connection, char *identifier)
{
if (!isReplicationOriginSessionSetup(connection))
{
int localid = GetLocalNodeId();
RepOriginId originId = InvalidRepOriginId;
StringInfo originNameString = makeStringInfo();
appendStringInfo(originNameString, "citus_internal_%d", localid);
if (!isReplicationOriginCreated(connection, originNameString->data, &originId))
{
originId = ReplicationOriginSessionCreate(connection, originNameString->data);
}
ReplicationOriginSessionSetupHelper(connection, originId, originNameString->data);
appendStringInfo(originNameString, "citus_internal_%d_%s", GetLocalNodeId(),
identifier);
char *originName = quote_literal_cstr(originNameString->data);
ReplicationOriginSessionSetupHelper(connection, originName);
}
}
/* ReplicationOriginSessionReset resets the replication origin session in a
* local or remote session depending on the useLocalCopy flag.
/* ResetReplicationOriginRemoteSession resets the replication origin session in a
* remote node.
*/
void
ReplicationOriginSessionReset(MultiConnection *connection)
ResetReplicationOriginRemoteSession(MultiConnection *connection, char *identifier)
{
if (connection == NULL)
/* Reset the replication origin in remote session*/
StringInfo replicationOriginSessionResetQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionResetQuery,
"select pg_catalog.pg_replication_origin_session_reset();");
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionResetQuery->data);
/* Drop the replication origin entry created in remote session.*/
StringInfo originNameString = makeStringInfo();
appendStringInfo(originNameString, "citus_internal_%d_%s", GetLocalNodeId(),
identifier);
StringInfo replicationOriginSessionDropQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionDropQuery,
"select pg_catalog.pg_replication_origin_drop(%s);",
quote_literal_cstr(originNameString->data));
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionDropQuery->data);
}
/* SetupReplicationOriginLocalSession sets up a new replication origin session in a
* local session.
*/
void
SetupReplicationOriginLocalSession(void)
{
if (!isReplicationOriginSessionSetup(NULL))
{
/*Reset Replication Origin in local session */
if (replorigin_session_origin != InvalidRepOriginId)
{
replorigin_session_reset();
replorigin_session_origin = InvalidRepOriginId;
}
StringInfo originNameString = makeStringInfo();
appendStringInfo(originNameString, "citus_internal_%d", GetLocalNodeId());
ReplicationOriginSessionSetupHelper(NULL, originNameString->data);
}
else
}
/* ResetReplicationOriginLocalSession resets the replication origin session in a
* local node.
*/
void
ResetReplicationOriginLocalSession(void)
{
/*Reset Replication Origin in local session */
if (replorigin_session_origin != InvalidRepOriginId)
{
/*Reset Replication Origin in remote session */
StringInfo replicationOriginSessionResetQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionResetQuery,
"select pg_catalog.pg_replication_origin_session_reset()");
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionResetQuery->data);
replorigin_session_reset();
replorigin_session_origin = InvalidRepOriginId;
}
}
@ -81,7 +107,7 @@ isReplicationOriginSessionSetup(MultiConnection *connection)
bool result = false;
if (connection == NULL)
{
return replorigin_session_origin != InvalidRepOriginId;
result = (replorigin_session_origin != InvalidRepOriginId);
}
else
{
@ -115,7 +141,7 @@ isReplicationOriginCreated(MultiConnection *connection, char *originName,
/*Setup Replication Origin in remote session */
StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo();
appendStringInfo(isReplicationOriginSessionSetupQuery,
"SELECT pg_catalog.pg_replication_origin_oid('%s');",
"SELECT pg_catalog.pg_replication_origin_oid(%s);",
originName);
/* If the replication origin was already created the above command
@ -130,11 +156,11 @@ isReplicationOriginCreated(MultiConnection *connection, char *originName,
}
/* ReplicationOriginSessionCreate creates a new replication origin if it does
/* CreateReplicationOriginSession creates a new replication origin if it does
* not already exist already. To make the replication origin name unique
* for different nodes, origin node's id is appended to the prefix citus_internal_.*/
static RepOriginId
ReplicationOriginSessionCreate(MultiConnection *connection, char *originName)
CreateReplicationOriginSession(MultiConnection *connection, char *originName)
{
RepOriginId originId = InvalidRepOriginId;
if (connection == NULL)
@ -145,7 +171,7 @@ ReplicationOriginSessionCreate(MultiConnection *connection, char *originName)
{
StringInfo replicationOriginCreateQuery = makeStringInfo();
appendStringInfo(replicationOriginCreateQuery,
"select pg_catalog.pg_replication_origin_create('%s')",
"select pg_catalog.pg_replication_origin_create(%s);",
originName);
ExecuteCriticalRemoteCommand(connection, replicationOriginCreateQuery->data);
}
@ -154,14 +180,18 @@ ReplicationOriginSessionCreate(MultiConnection *connection, char *originName)
/* ReplicationOriginSessionSetupHelper sets up a new replication origin session in a
* local or remote session depending on the useLocalCopy flag. If useLocalCopy
* is set, a local replication origin session is setup, otherwise a remote
* replication origin session is setup to the destination node.
* local or remote session.
*/
static void
ReplicationOriginSessionSetupHelper(MultiConnection *connection,
RepOriginId originId, char *originName)
char *originName)
{
RepOriginId originId = InvalidRepOriginId;
if (!isReplicationOriginCreated(connection, originName, &originId))
{
originId = CreateReplicationOriginSession(connection, originName);
}
if (connection == NULL)
{
/*Setup Replication Origin in local session */
@ -173,7 +203,7 @@ ReplicationOriginSessionSetupHelper(MultiConnection *connection,
/*Setup Replication Origin in remote session */
StringInfo replicationOriginSessionSetupQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionSetupQuery,
"select pg_catalog.pg_replication_origin_session_setup('%s')",
"select pg_catalog.pg_replication_origin_session_setup(%s);",
originName);
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionSetupQuery->data);

View File

@ -15,8 +15,11 @@
#include "replication/origin.h"
#include "distributed/connection_management.h"
void ReplicationOriginSessionSetup(MultiConnection *connection);
void ReplicationOriginSessionReset(MultiConnection *connection);
void SetupReplicationOriginRemoteSession(MultiConnection *connection, char *identifier);
void ResetReplicationOriginRemoteSession(MultiConnection *connection, char *identifier);
void SetupReplicationOriginLocalSession(void);
void ResetReplicationOriginLocalSession(void);
#endif /* REPLICATION_ORIGIN_SESSION_UTILS_H */