CDC changes with DonotReplicateId

pull/6635/head
Rajesh Kumar 2023-01-20 16:40:02 +05:30
parent 58368b7783
commit 2aa5877a1d
20 changed files with 570 additions and 24 deletions

View File

@ -55,6 +55,7 @@
#include "distributed/multi_partitioning_utils.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/shared_library_init.h"
#include "distributed/shard_utils.h"
#include "distributed/worker_protocol.h"
@ -402,7 +403,11 @@ UndistributeTable(TableConversionParameters *params)
params->conversionType = UNDISTRIBUTE_TABLE;
params->shardCountIsNull = true;
TableConversionState *con = CreateTableConversion(params);
return ConvertTable(con);
SetupReplicationOriginLocalSession();
TableConversionReturn *conv = ConvertTable(con);
ResetReplicationOriginLocalSession();
return conv;
}

View File

@ -2224,7 +2224,7 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
columnNameList,
partitionColumnIndex,
estate, NULL);
estate, NULL, false);
/* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor);

View File

@ -36,6 +36,7 @@
#include "distributed/local_multi_copy.h"
#include "distributed/shard_utils.h"
#include "distributed/version_compat.h"
#include "distributed/replication_origin_session_utils.h"
/* managed via GUC, default is 512 kB */
int LocalCopyFlushThresholdByte = 512 * 1024;
@ -46,7 +47,7 @@ static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDes
static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary);
static bool ShouldSendCopyNow(StringInfo buffer);
static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId,
CopyStmt *copyStatement, bool isEndOfCopy);
CopyStmt *copyStatement, bool isEndOfCopy, bool isPublishable);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
@ -94,7 +95,7 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in
bool isEndOfCopy = false;
DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId,
shardId,
copyDest->copyStatement, isEndOfCopy);
copyDest->copyStatement, isEndOfCopy, copyDest->isPublishable);
resetStringInfo(localCopyOutState->fe_msgbuf);
}
}
@ -133,7 +134,7 @@ FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
}
bool isEndOfCopy = true;
DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId,
copyDest->copyStatement, isEndOfCopy);
copyDest->copyStatement, isEndOfCopy, copyDest->isPublishable);
}
@ -197,7 +198,7 @@ ShouldSendCopyNow(StringInfo buffer)
*/
static void
DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement,
bool isEndOfCopy)
bool isEndOfCopy, bool isPublishable)
{
/*
* Set the buffer as a global variable to allow ReadFromLocalBufferCallback
@ -205,6 +206,10 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat
* ReadFromLocalBufferCallback.
*/
LocalCopyBuffer = buffer;
if (!isPublishable)
{
SetupReplicationOriginLocalSession();
}
Oid shardOid = GetTableLocalShardOid(relationId, shardId);
Relation shard = table_open(shardOid, RowExclusiveLock);
@ -219,6 +224,10 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat
EndCopyFrom(cstate);
table_close(shard, NoLock);
if (!isPublishable)
{
ResetReplicationOriginLocalSession();
}
free_parsestate(pState);
}

View File

@ -85,6 +85,7 @@
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_pruning.h"
#include "distributed/shared_connection_stats.h"
@ -270,7 +271,8 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash,
bool *found, bool shouldUseLocalCopy, CopyOutState
copyOutState, bool isColocatedIntermediateResult);
copyOutState, bool isColocatedIntermediateResult,
bool isPublishable);
static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash,
ShardPlacement *placement,
bool colocatedIntermediateResult);
@ -285,7 +287,8 @@ static void InitializeCopyShardState(CopyShardState *shardState,
uint64 shardId,
bool canUseLocalCopy,
CopyOutState copyOutState,
bool colocatedIntermediateResult);
bool colocatedIntermediateResult, bool
isPublishable);
static void StartPlacementStateCopyCommand(CopyPlacementState *placementState,
CopyStmt *copyStatement,
CopyOutState copyOutState);
@ -494,7 +497,8 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletion *completionTag)
/* set up the destination for the COPY */
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList,
partitionColumnIndex,
executorState, NULL);
executorState, NULL,
true);
/* if the user specified an explicit append-to_shard option, write to it */
uint64 appendShardId = ProcessAppendToShardOption(tableId, copyStatement);
@ -1934,7 +1938,7 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
CitusCopyDestReceiver *
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
EState *executorState,
char *intermediateResultIdPrefix)
char *intermediateResultIdPrefix, bool isPublishable)
{
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0(
sizeof(CitusCopyDestReceiver));
@ -1953,6 +1957,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
copyDest->executorState = executorState;
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
copyDest->memoryContext = CurrentMemoryContext;
copyDest->isPublishable = isPublishable;
return copyDest;
}
@ -2318,7 +2323,9 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
&cachedShardStateFound,
copyDest->shouldUseLocalCopy,
copyDest->copyOutState,
isColocatedIntermediateResult);
isColocatedIntermediateResult,
copyDest->isPublishable);
if (!cachedShardStateFound)
{
firstTupleInShard = true;
@ -2751,6 +2758,11 @@ ShutdownCopyConnectionState(CopyConnectionState *connectionState,
if (activePlacementState != NULL)
{
EndPlacementStateCopyCommand(activePlacementState, copyOutState);
if (!copyDest->isPublishable)
{
ResetReplicationOriginRemoteSession(
activePlacementState->connectionState->connection);
}
}
dlist_foreach(iter, &connectionState->bufferedPlacementList)
@ -2764,6 +2776,10 @@ ShutdownCopyConnectionState(CopyConnectionState *connectionState,
SendCopyDataToPlacement(placementState->data, shardId,
connectionState->connection);
EndPlacementStateCopyCommand(placementState, copyOutState);
if (!copyDest->isPublishable)
{
ResetReplicationOriginRemoteSession(connectionState->connection);
}
}
}
@ -3436,7 +3452,7 @@ static CopyShardState *
GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool *found, bool
shouldUseLocalCopy, CopyOutState copyOutState,
bool isColocatedIntermediateResult)
bool isColocatedIntermediateResult, bool isPublishable)
{
CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId,
HASH_ENTER, found);
@ -3444,7 +3460,8 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
{
InitializeCopyShardState(shardState, connectionStateHash,
shardId, shouldUseLocalCopy,
copyOutState, isColocatedIntermediateResult);
copyOutState, isColocatedIntermediateResult,
isPublishable);
}
return shardState;
@ -3461,7 +3478,8 @@ InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash, uint64 shardId,
bool shouldUseLocalCopy,
CopyOutState copyOutState,
bool colocatedIntermediateResult)
bool colocatedIntermediateResult,
bool isPublishable)
{
ListCell *placementCell = NULL;
int failedPlacementCount = 0;
@ -3532,6 +3550,12 @@ InitializeCopyShardState(CopyShardState *shardState,
RemoteTransactionBeginIfNecessary(connection);
}
if (!isPublishable)
{
/*elog(LOG,"InitializeCopyShardState: calling SetupReplicationOriginRemoteSession conn id: %lu", connection->connectionId); */
SetupReplicationOriginRemoteSession(connection);
}
CopyPlacementState *placementState = palloc0(sizeof(CopyPlacementState));
placementState->shardState = shardState;
placementState->data = makeStringInfo();

View File

@ -1481,6 +1481,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
* - Current cached connections is already at MaxCachedConnectionsPerWorker
* - Connection is forced to close at the end of transaction
* - Connection is not in OK state
* - Connection has a replication origin setup
* - A transaction is still in progress (usually because we are cancelling a distributed transaction)
* - A connection reached its maximum lifetime
*/
@ -1500,6 +1501,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
PQstatus(connection->pgConn) != CONNECTION_OK ||
!RemoteTransactionIdle(connection) ||
connection->requiresReplication ||
connection->isReplicationOriginSessionSetup ||
(MaxCachedConnectionLifetime >= 0 &&
MillisecondsToTimeout(connection->connectionEstablishmentStart,
MaxCachedConnectionLifetime) <= 0);

View File

@ -413,7 +413,8 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
columnNameList,
partitionColumnIndex,
executorState,
intermediateResultIdPrefix);
intermediateResultIdPrefix,
true);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
@ -446,7 +447,8 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
columnNameList,
partitionColumnIndex,
executorState, NULL);
executorState, NULL,
true);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);

View File

@ -24,6 +24,7 @@
#include "distributed/relation_utils.h"
#include "distributed/version_compat.h"
#include "distributed/local_executor.h"
#include "distributed/replication_origin_session_utils.h"
/*
* LocalCopyBuffer is used in copy callback to return the copied rows.
@ -80,6 +81,7 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
localCopyOutState);
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);
static bool
CanUseLocalCopy(uint32_t destinationNodeId)
{
@ -103,6 +105,12 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection);
RemoteTransactionBeginIfNecessary(copyDest->connection);
SetupReplicationOriginRemoteSession(copyDest->connection);
StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary);
@ -184,6 +192,8 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
CopyOutState copyOutState = copyDest->copyOutState;
if (copyDest->useLocalCopy)
{
/* Setup replication origin session for local copy*/
WriteLocalTuple(slot, copyDest);
if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte)
{
@ -259,6 +269,11 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
copyDest->copyOutState = copyOutState;
if (copyDest->useLocalCopy)
{
/* Setup replication origin session for local copy*/
SetupReplicationOriginLocalSession();
}
}
@ -317,6 +332,9 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
PQclear(result);
ForgetResults(copyDest->connection);
ResetReplicationOriginRemoteSession(copyDest->connection);
CloseConnection(copyDest->connection);
}
}
@ -329,6 +347,10 @@ static void
ShardCopyDestReceiverDestroy(DestReceiver *dest)
{
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest;
if (copyDest->useLocalCopy)
{
ResetReplicationOriginLocalSession();
}
if (copyDest->copyOutState)
{

View File

@ -10,10 +10,14 @@
#include "postgres.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/worker_shard_visibility.h"
#include "distributed/worker_protocol.h"
#include "distributed/listutils.h"
#include "distributed/metadata/distobject.h"
#include "replication/logical.h"
#include "utils/typcache.h"
#include "utils/lsyscache.h"
#include "catalog/pg_namespace.h"
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
static LogicalDecodeChangeCB pgoutputChangeCB;
@ -37,6 +41,16 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation,
static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
TupleDesc sourceTupleDesc,
TupleDesc targetTupleDesc);
static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId
origin_id);
static bool PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
/* used in the replication_origin_filter_cb function. */
#define InvalidRepOriginId 0
#define CITUS_SHARD_PREFIX_SLOT "citus_shard_"
/*
* Postgres uses 'pgoutput' as default plugin for logical replication.
@ -47,9 +61,10 @@ void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
LogicalOutputPluginInit plugin_init =
(LogicalOutputPluginInit) (void *) load_external_function("pgoutput",
"_PG_output_plugin_init",
false, NULL);
(LogicalOutputPluginInit) (void *)
load_external_function("pgoutput",
"_PG_output_plugin_init",
false, NULL);
if (plugin_init == NULL)
{
@ -62,6 +77,80 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
/* actual pgoutput callback will be called with the appropriate destination shard */
pgoutputChangeCB = cb->change_cb;
cb->change_cb = split_change_cb;
cb->filter_by_origin_cb = replication_origin_filter_cb;
}
/*
* replication_origin_filter_cb call back function filters out publication of changes
* originated from any other node other than the current node. This is
* identified by the "origin_id" of the changes. The origin_id is set to
* a non-zero value in the origin node as part of WAL replication.
*/
static bool
replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
if (origin_id != InvalidRepOriginId)
{
return true;
}
return false;
}
/*
* PublishChangesIfCdcSlot checks if the current slot is a CDC slot. If so, it publishes
* the changes as the change for the distributed table instead of shard.
* If not, it returns false. It also skips the Citus metadata tables.
*/
static bool
PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
char *replicationSlotName = ctx->slot->data.name.data;
/* Check if the replication slot is CITUS_CDC_SLOT*/
if (replicationSlotName != NULL &&
strncmp(replicationSlotName, CITUS_SHARD_PREFIX_SLOT, strlen(
CITUS_SHARD_PREFIX_SLOT)) != 0)
{
/* Skip publishing changes for system relations in pg_catalog*/
if (relation->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
{
return true;
}
char *shardRelationName = RelationGetRelationName(relation);
uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true);
if (shardId != INVALID_SHARD_ID && ShardExists(shardId))
{
if (ReferenceTableShardId(shardId))
{
/*For reference tables, publish the changes only from the coordinator node. */
if (!IsCoordinator())
{
int nodeID = GetLocalNodeId();
elog(LOG,
"PublishChangesIfCdcSlot: Skipping changes for reference table %s in node %d",
shardRelationName, nodeID);
return true;
}
}
else
{
/* try to get the distributed relation id for the shard */
Oid distributedRelationId = RelationIdForShard(shardId);
if (OidIsValid(distributedRelationId))
{
relation = RelationIdGetRelation(distributedRelationId);
}
}
}
pgoutputChangeCB(ctx, txn, relation, change);
return true;
}
return false;
}
@ -73,10 +162,20 @@ static void
split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
/*check if Citus extension is loaded. */
if (!CitusHasBeenLoaded())
{
return;
}
if (!is_publishable_relation(relation))
{
return;
}
if (PublishChangesIfCdcSlot(ctx, txn, relation, change))
{
return;
}
char *replicationSlotName = ctx->slot->data.name.data;

View File

@ -74,6 +74,7 @@
#include "distributed/recursive_planning.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/run_from_same_connection.h"
#include "distributed/shard_cleaner.h"
#include "distributed/shard_transfer.h"
@ -1288,6 +1289,18 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_replication_origin_session",
gettext_noop("Enable replication origin session for avoiding publication of WAL "
"records for shard splits,moves and "
"create_distributed_table/undistribute_table operations."),
NULL,
&isReplicationOriginSessionFeatureEnabled,
false,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_router_execution",
gettext_noop("Enables router execution"),
@ -2406,7 +2419,6 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
/* warn about config items in the citus namespace that are not registered above */
EmitWarningsOnPlaceholders("citus");

View File

@ -28,3 +28,4 @@ INSERT INTO pg_dist_cleanup
WHERE plc.shardstate = 4;
DELETE FROM pg_dist_placement WHERE shardstate = 4;
#include "udfs/repl_origin_helper/11.2-1.sql"

View File

@ -0,0 +1,22 @@
CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_start_no_publish()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$replication_origin_session_start_no_publish$$;
COMMENT ON FUNCTION pg_catalog.replication_origin_session_start_no_publish()
IS 'To start Replication origin session for skipping publishing WAL records';
CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_end_no_publish()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$replication_origin_session_end_no_publish$$;
COMMENT ON FUNCTION pg_catalog.replication_origin_session_end_no_publish()
IS 'To finish Replication origin session for skipping publishing WAL records';
CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_is_no_publish()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$replication_origin_session_is_no_publish$$;
COMMENT ON FUNCTION pg_catalog.replication_origin_session_is_no_publish()
IS 'To check if Replication origin session is currently active for skipping publishing WAL records';

View File

@ -0,0 +1,22 @@
CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_start_no_publish()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$replication_origin_session_start_no_publish$$;
COMMENT ON FUNCTION pg_catalog.replication_origin_session_start_no_publish()
IS 'To start Replication origin session for skipping publishing WAL records';
CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_end_no_publish()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$replication_origin_session_end_no_publish$$;
COMMENT ON FUNCTION pg_catalog.replication_origin_session_end_no_publish()
IS 'To finish Replication origin session for skipping publishing WAL records';
CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_is_no_publish()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$replication_origin_session_is_no_publish$$;
COMMENT ON FUNCTION pg_catalog.replication_origin_session_is_no_publish()
IS 'To check if Replication origin session is currently active for skipping publishing WAL records';

View File

@ -34,6 +34,7 @@
#include "distributed/multi_logical_replication.h"
#include "distributed/multi_explain.h"
#include "distributed/repartition_join_execution.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
@ -391,6 +392,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
ResetGlobalVariables();
ResetRelationAccessHash();
/* Reset any local replication origin session since transaction has been aborted.*/
ResetReplicationOriginLocalSession();
/* empty the CitusXactCallbackContext to ensure we're not leaking memory */
MemoryContextReset(CitusXactCallbackContext);
@ -715,6 +719,8 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
SetCreateCitusTransactionLevel(0);
}
/* Reset any local replication origin session since subtransaction has been aborted.*/
ResetReplicationOriginLocalSession();
MemoryContextSwitchTo(previousContext);
break;

View File

@ -0,0 +1,275 @@
/*-------------------------------------------------------------------------
*
* replication_origin_session_utils.c
* Functions for managing replication origin session.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "distributed/replication_origin_session_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/metadata_cache.h"
#include "utils/builtins.h"
#include "miscadmin.h"
static bool IsRemoteReplicationOriginSessionSetup(MultiConnection *connection);
static bool ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command,
char *expected);
static inline bool IsLocalReplicationOriginSessionActive(void);
PG_FUNCTION_INFO_V1(replication_origin_session_start_no_publish);
PG_FUNCTION_INFO_V1(replication_origin_session_end_no_publish);
PG_FUNCTION_INFO_V1(replication_origin_session_is_no_publish);
/*
* This variable is used to remember the replication origin id of the current session
* before resetting it to DoNotReplicateId in SetupReplicationOriginLocalSession.
*/
static RepOriginId originalOriginId = InvalidRepOriginId;
/*
* Setting that controls whether replication origin sessions are enabled.
*/
bool isReplicationOriginSessionFeatureEnabled = false;
/* replication_origin_session_start_no_publish starts a new replication origin session
* in the local node. This function is used to avoid publishing the WAL records to the
* replication slot by setting replication origin to DoNotReplicateId in WAL records.
* It remembers the previous replication origin for the current session which will be
* used to reset the replication origin to the previous value when the session ends.
*/
Datum
replication_origin_session_start_no_publish(PG_FUNCTION_ARGS)
{
if (!isReplicationOriginSessionFeatureEnabled)
{
PG_RETURN_VOID();
}
SetupReplicationOriginLocalSession();
PG_RETURN_VOID();
}
/* replication_origin_session_end_no_publish ends the current replication origin session
* in the local node. This function is used to reset the replication origin to the
* earlier value of replication origin.
*/
Datum
replication_origin_session_end_no_publish(PG_FUNCTION_ARGS)
{
if (!isReplicationOriginSessionFeatureEnabled)
{
PG_RETURN_VOID();
}
ResetReplicationOriginLocalSession();
PG_RETURN_VOID();
}
/* replication_origin_session_is_no_publish checks if the current replication origin
* session is active in the local node.
*/
Datum
replication_origin_session_is_no_publish(PG_FUNCTION_ARGS)
{
if (!isReplicationOriginSessionFeatureEnabled)
{
PG_RETURN_BOOL(false);
}
bool result = IsLocalReplicationOriginSessionActive();
PG_RETURN_BOOL(result);
}
/* IsLocalReplicationOriginSessionActive checks if the current replication origin
* session is active in the local node.
*/
inline bool
IsLocalReplicationOriginSessionActive(void)
{
return (replorigin_session_origin != InvalidRepOriginId);
}
/*
* SetupReplicationOriginLocalSession sets up a new replication origin session in a
* local session.
*/
void
SetupReplicationOriginLocalSession(void)
{
if (!isReplicationOriginSessionFeatureEnabled)
{
return;
}
/*elog(LOG, "Setting up local replication origin session"); */
if (!IsLocalReplicationOriginSessionActive())
{
originalOriginId = replorigin_session_origin;
replorigin_session_origin = DoNotReplicateId;
/* Register a call back for ResetReplicationOriginLocalSession function for error cases */
MemoryContextCallback *replicationOriginResetCallback = palloc0(
sizeof(MemoryContextCallback));
replicationOriginResetCallback->func =
ResetReplicationOriginLocalSessionCallbackHandler;
replicationOriginResetCallback->arg = NULL;
MemoryContextRegisterResetCallback(CurrentMemoryContext,
replicationOriginResetCallback);
}
}
/*
* ResetReplicationOriginLocalSession resets the replication origin session in a
* local node.
*/
void
ResetReplicationOriginLocalSession(void)
{
/*elog(LOG, "Resetting local replication origin session"); */
if (!isReplicationOriginSessionFeatureEnabled)
{
return;
}
if (IsLocalReplicationOriginSessionActive())
{
replorigin_session_origin = originalOriginId;
}
}
/*
* ResetReplicationOriginLocalSessionCallbackHandler is a callback function that
* resets the replication origin session in a local node. This is used to register
* with MemoryContextRegisterResetCallback to reset the replication origin session
* in case of any error for the given memory context.
*/
void
ResetReplicationOriginLocalSessionCallbackHandler(void *arg)
{
ResetReplicationOriginLocalSession();
}
/*
* SetupReplicationOriginRemoteSession sets up a new replication origin session in a
* remote session. The identifier is used to create a unique replication origin name
* for the session in the remote node.
*/
void
SetupReplicationOriginRemoteSession(MultiConnection *connection)
{
if (!isReplicationOriginSessionFeatureEnabled)
{
return;
}
if (connection != NULL && !IsRemoteReplicationOriginSessionSetup(connection))
{
/*elog(LOG, "After IsReplicationOriginSessionSetup session %s,%d", connection->hostname, connection->port); */
StringInfo replicationOriginSessionSetupQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionSetupQuery,
"select pg_catalog.replication_origin_session_start_no_publish();");
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionSetupQuery->data);
connection->isReplicationOriginSessionSetup = true;
}
}
/*
* ResetReplicationOriginRemoteSession resets the replication origin session in a
* remote node.
*/
void
ResetReplicationOriginRemoteSession(MultiConnection *connection)
{
if (!isReplicationOriginSessionFeatureEnabled)
{
return;
}
if (connection != NULL && connection->isReplicationOriginSessionSetup)
{
/*elog(LOG, "Resetting remote replication origin session %s,%d", connection->hostname, connection->port); */
StringInfo replicationOriginSessionResetQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionResetQuery,
"select pg_catalog.replication_origin_session_end_no_publish();");
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionResetQuery->data);
connection->isReplicationOriginSessionSetup = false;
}
}
/*
* IsRemoteReplicationOriginSessionSetup(MultiConnection *connection) checks if the replication origin is setup
* already in the local or remote session.
*/
static bool
IsRemoteReplicationOriginSessionSetup(MultiConnection *connection)
{
/*elog(LOG, "IsReplicationOriginSessionSetup: %s,%d", connection->hostname, connection->port); */
if (connection->isReplicationOriginSessionSetup)
{
return true;
}
StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo();
appendStringInfo(isReplicationOriginSessionSetupQuery,
"SELECT pg_catalog.replication_origin_session_is_no_publish()");
bool result =
ExecuteRemoteCommandAndCheckResult(connection,
isReplicationOriginSessionSetupQuery->data,
"t");
connection->isReplicationOriginSessionSetup = result;
return result;
}
/*
* ExecuteRemoteCommandAndCheckResult executes the given command in the remote node and
* checks if the result is equal to the expected result. If the result is equal to the
* expected result, the function returns true, otherwise it returns false.
*/
static bool
ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command,
char *expected)
{
if (!SendRemoteCommand(connection, command))
{
/* if we cannot connect, we warn and report false */
ReportConnectionError(connection, WARNING);
return false;
}
bool raiseInterrupts = true;
PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts);
/* if remote node throws an error, we also throw an error */
if (!IsResponseOK(queryResult))
{
ReportResultError(connection, queryResult, ERROR);
}
StringInfo queryResultString = makeStringInfo();
/* Evaluate the queryResult and store it into the queryResultString */
bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString);
bool result = false;
if (success && strcmp(queryResultString->data, expected) == 0)
{
result = true;
}
PQclear(queryResult);
ForgetResults(connection);
return result;
}

View File

@ -133,6 +133,9 @@ typedef struct CitusCopyDestReceiver
/* if true, should copy to local placements in the current session */
bool shouldUseLocalCopy;
/* if true, the operations in the receiver can be published.*/
bool isPublishable;
/*
* Copy into colocated intermediate result. When this is set, the
* COPY assumes there are hypothetical colocated shards to the
@ -161,7 +164,8 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
List *columnNameList,
int partitionColumnIndex,
EState *executorState,
char *intermediateResultPrefix);
char *intermediateResultPrefix,
bool isPublishable);
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList);

View File

@ -173,6 +173,9 @@ typedef struct MultiConnection
/* is the connection currently in use, and shouldn't be used by anything else */
bool claimedExclusively;
/* is the replication origin session has already been setup for this connection. */
bool isReplicationOriginSessionSetup;
/*
* Should be used to access/modify metadata. See REQUIRE_METADATA_CONNECTION for
* the details.

View File

@ -0,0 +1,31 @@
/*-------------------------------------------------------------------------
*
* replication_origin_utils.h
* Utilities related to replication origin.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef REPLICATION_ORIGIN_SESSION_UTILS_H
#define REPLICATION_ORIGIN_SESSION_UTILS_H
#include "postgres.h"
#include "replication/origin.h"
#include "distributed/connection_management.h"
extern void InitializeReplicationOriginSessionUtils(void);
extern void SetupReplicationOriginRemoteSession(MultiConnection *connection);
extern void ResetReplicationOriginRemoteSession(MultiConnection *connection);
extern void SetupReplicationOriginLocalSession(void);
extern void ResetReplicationOriginLocalSession(void);
extern void ResetReplicationOriginLocalSessionCallbackHandler(void *arg);
extern bool isReplicationOriginSessionFeatureEnabled;
#endif /* REPLICATION_ORIGIN_SESSION_UTILS_H */

View File

@ -1231,7 +1231,10 @@ SELECT * FROM pg_dist_cleanup;
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
| function replication_origin_session_end_no_publish() void
| function replication_origin_session_is_no_publish() void
| function replication_origin_session_start_no_publish() void
(3 rows)
-- Snapshot of state at 11.2-1
ALTER EXTENSION citus UPDATE TO '11.2-1';

View File

@ -215,6 +215,9 @@ ORDER BY 1;
function remove_local_tables_from_metadata()
function replicate_reference_tables(citus.shard_transfer_mode)
function replicate_table_shards(regclass,integer,integer,bigint[],citus.shard_transfer_mode)
function replication_origin_session_end_no_publish()
function replication_origin_session_is_no_publish()
function replication_origin_session_start_no_publish()
function role_exists(name)
function run_command_on_all_nodes(text,boolean,boolean)
function run_command_on_colocated_placements(regclass,regclass,text,boolean)
@ -318,5 +321,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(310 rows)
(313 rows)

View File

@ -485,6 +485,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'");
push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
push(@pgOptions, "citus.stat_statements_track = 'all'");
push(@pgOptions, "citus.enable_replication_origin_session=on");
# Some tests look at shards in pg_class, make sure we can usually see them:
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");