diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 7f86509cc..0b39ed50e 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -55,6 +55,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" +#include "distributed/replication_origin_session_utils.h" #include "distributed/shared_library_init.h" #include "distributed/shard_utils.h" #include "distributed/worker_protocol.h" @@ -402,7 +403,11 @@ UndistributeTable(TableConversionParameters *params) params->conversionType = UNDISTRIBUTE_TABLE; params->shardCountIsNull = true; TableConversionState *con = CreateTableConversion(params); - return ConvertTable(con); + + SetupReplicationOriginLocalSession(); + TableConversionReturn *conv = ConvertTable(con); + ResetReplicationOriginLocalSession(); + return conv; } diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 2373e49a5..4d04aa4df 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -2224,7 +2224,7 @@ CopyLocalDataIntoShards(Oid distributedRelationId) (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, columnNameList, partitionColumnIndex, - estate, NULL); + estate, NULL, false); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index fbfce7119..5cf01baf4 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -36,6 +36,7 @@ #include "distributed/local_multi_copy.h" #include "distributed/shard_utils.h" #include "distributed/version_compat.h" +#include "distributed/replication_origin_session_utils.h" /* managed via GUC, default is 512 kB */ int LocalCopyFlushThresholdByte = 512 * 1024; @@ -46,7 +47,7 @@ static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDes static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary); static bool ShouldSendCopyNow(StringInfo buffer); static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, - CopyStmt *copyStatement, bool isEndOfCopy); + CopyStmt *copyStatement, bool isEndOfCopy, bool isPublishable); static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); @@ -94,7 +95,7 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in bool isEndOfCopy = false; DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId, - copyDest->copyStatement, isEndOfCopy); + copyDest->copyStatement, isEndOfCopy, copyDest->isPublishable); resetStringInfo(localCopyOutState->fe_msgbuf); } } @@ -133,7 +134,7 @@ FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, } bool isEndOfCopy = true; DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId, - copyDest->copyStatement, isEndOfCopy); + copyDest->copyStatement, isEndOfCopy, copyDest->isPublishable); } @@ -197,7 +198,7 @@ ShouldSendCopyNow(StringInfo buffer) */ static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement, - bool isEndOfCopy) + bool isEndOfCopy, bool isPublishable) { /* * Set the buffer as a global variable to allow ReadFromLocalBufferCallback @@ -205,6 +206,10 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat * ReadFromLocalBufferCallback. */ LocalCopyBuffer = buffer; + if (!isPublishable) + { + SetupReplicationOriginLocalSession(); + } Oid shardOid = GetTableLocalShardOid(relationId, shardId); Relation shard = table_open(shardOid, RowExclusiveLock); @@ -219,6 +224,10 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat EndCopyFrom(cstate); table_close(shard, NoLock); + if (!isPublishable) + { + ResetReplicationOriginLocalSession(); + } free_parsestate(pState); } diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index b5ac6a519..a267397f2 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -85,6 +85,7 @@ #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" +#include "distributed/replication_origin_session_utils.h" #include "distributed/resource_lock.h" #include "distributed/shard_pruning.h" #include "distributed/shared_connection_stats.h" @@ -270,7 +271,8 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool *found, bool shouldUseLocalCopy, CopyOutState - copyOutState, bool isColocatedIntermediateResult); + copyOutState, bool isColocatedIntermediateResult, + bool isPublishable); static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, bool colocatedIntermediateResult); @@ -285,7 +287,8 @@ static void InitializeCopyShardState(CopyShardState *shardState, uint64 shardId, bool canUseLocalCopy, CopyOutState copyOutState, - bool colocatedIntermediateResult); + bool colocatedIntermediateResult, bool + isPublishable); static void StartPlacementStateCopyCommand(CopyPlacementState *placementState, CopyStmt *copyStatement, CopyOutState copyOutState); @@ -494,7 +497,8 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletion *completionTag) /* set up the destination for the COPY */ CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex, - executorState, NULL); + executorState, NULL, + true); /* if the user specified an explicit append-to_shard option, write to it */ uint64 appendShardId = ProcessAppendToShardOption(tableId, copyStatement); @@ -1934,7 +1938,7 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex, EState *executorState, - char *intermediateResultIdPrefix) + char *intermediateResultIdPrefix, bool isPublishable) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0( sizeof(CitusCopyDestReceiver)); @@ -1953,6 +1957,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu copyDest->executorState = executorState; copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix; copyDest->memoryContext = CurrentMemoryContext; + copyDest->isPublishable = isPublishable; return copyDest; } @@ -2318,7 +2323,9 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest &cachedShardStateFound, copyDest->shouldUseLocalCopy, copyDest->copyOutState, - isColocatedIntermediateResult); + isColocatedIntermediateResult, + copyDest->isPublishable); + if (!cachedShardStateFound) { firstTupleInShard = true; @@ -2751,6 +2758,11 @@ ShutdownCopyConnectionState(CopyConnectionState *connectionState, if (activePlacementState != NULL) { EndPlacementStateCopyCommand(activePlacementState, copyOutState); + if (!copyDest->isPublishable) + { + ResetReplicationOriginRemoteSession( + activePlacementState->connectionState->connection); + } } dlist_foreach(iter, &connectionState->bufferedPlacementList) @@ -2764,6 +2776,10 @@ ShutdownCopyConnectionState(CopyConnectionState *connectionState, SendCopyDataToPlacement(placementState->data, shardId, connectionState->connection); EndPlacementStateCopyCommand(placementState, copyOutState); + if (!copyDest->isPublishable) + { + ResetReplicationOriginRemoteSession(connectionState->connection); + } } } @@ -3436,7 +3452,7 @@ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool *found, bool shouldUseLocalCopy, CopyOutState copyOutState, - bool isColocatedIntermediateResult) + bool isColocatedIntermediateResult, bool isPublishable) { CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId, HASH_ENTER, found); @@ -3444,7 +3460,8 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, { InitializeCopyShardState(shardState, connectionStateHash, shardId, shouldUseLocalCopy, - copyOutState, isColocatedIntermediateResult); + copyOutState, isColocatedIntermediateResult, + isPublishable); } return shardState; @@ -3461,7 +3478,8 @@ InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, bool shouldUseLocalCopy, CopyOutState copyOutState, - bool colocatedIntermediateResult) + bool colocatedIntermediateResult, + bool isPublishable) { ListCell *placementCell = NULL; int failedPlacementCount = 0; @@ -3532,6 +3550,12 @@ InitializeCopyShardState(CopyShardState *shardState, RemoteTransactionBeginIfNecessary(connection); } + if (!isPublishable) + { + /*elog(LOG,"InitializeCopyShardState: calling SetupReplicationOriginRemoteSession conn id: %lu", connection->connectionId); */ + SetupReplicationOriginRemoteSession(connection); + } + CopyPlacementState *placementState = palloc0(sizeof(CopyPlacementState)); placementState->shardState = shardState; placementState->data = makeStringInfo(); diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index c5b300fd4..8c538f709 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -1481,6 +1481,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) * - Current cached connections is already at MaxCachedConnectionsPerWorker * - Connection is forced to close at the end of transaction * - Connection is not in OK state + * - Connection has a replication origin setup * - A transaction is still in progress (usually because we are cancelling a distributed transaction) * - A connection reached its maximum lifetime */ @@ -1500,6 +1501,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection PQstatus(connection->pgConn) != CONNECTION_OK || !RemoteTransactionIdle(connection) || connection->requiresReplication || + connection->isReplicationOriginSessionSetup || (MaxCachedConnectionLifetime >= 0 && MillisecondsToTimeout(connection->connectionEstablishmentStart, MaxCachedConnectionLifetime) <= 0); diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 9549846d5..a7b8a3912 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -413,7 +413,8 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, columnNameList, partitionColumnIndex, executorState, - intermediateResultIdPrefix); + intermediateResultIdPrefix, + true); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); @@ -446,7 +447,8 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, - executorState, NULL); + executorState, NULL, + true); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 9239caffb..229bf9f3d 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -24,6 +24,7 @@ #include "distributed/relation_utils.h" #include "distributed/version_compat.h" #include "distributed/local_executor.h" +#include "distributed/replication_origin_session_utils.h" /* * LocalCopyBuffer is used in copy callback to return the copied rows. @@ -80,6 +81,7 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); + static bool CanUseLocalCopy(uint32_t destinationNodeId) { @@ -103,6 +105,12 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); + + RemoteTransactionBeginIfNecessary(copyDest->connection); + + SetupReplicationOriginRemoteSession(copyDest->connection); + + StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary); @@ -184,6 +192,8 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) CopyOutState copyOutState = copyDest->copyOutState; if (copyDest->useLocalCopy) { + /* Setup replication origin session for local copy*/ + WriteLocalTuple(slot, copyDest); if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte) { @@ -259,6 +269,11 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); copyDest->copyOutState = copyOutState; + if (copyDest->useLocalCopy) + { + /* Setup replication origin session for local copy*/ + SetupReplicationOriginLocalSession(); + } } @@ -317,6 +332,9 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) PQclear(result); ForgetResults(copyDest->connection); + + ResetReplicationOriginRemoteSession(copyDest->connection); + CloseConnection(copyDest->connection); } } @@ -329,6 +347,10 @@ static void ShardCopyDestReceiverDestroy(DestReceiver *dest) { ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest; + if (copyDest->useLocalCopy) + { + ResetReplicationOriginLocalSession(); + } if (copyDest->copyOutState) { diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 51a56b36e..2a4befab4 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -10,10 +10,14 @@ #include "postgres.h" #include "distributed/shardinterval_utils.h" #include "distributed/shardsplit_shared_memory.h" +#include "distributed/worker_shard_visibility.h" +#include "distributed/worker_protocol.h" #include "distributed/listutils.h" +#include "distributed/metadata/distobject.h" #include "replication/logical.h" #include "utils/typcache.h" - +#include "utils/lsyscache.h" +#include "catalog/pg_namespace.h" extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); static LogicalDecodeChangeCB pgoutputChangeCB; @@ -37,6 +41,16 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation, static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple, TupleDesc sourceTupleDesc, TupleDesc targetTupleDesc); +static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId + origin_id); + +static bool PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); + +/* used in the replication_origin_filter_cb function. */ +#define InvalidRepOriginId 0 + +#define CITUS_SHARD_PREFIX_SLOT "citus_shard_" /* * Postgres uses 'pgoutput' as default plugin for logical replication. @@ -47,9 +61,10 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) { LogicalOutputPluginInit plugin_init = - (LogicalOutputPluginInit) (void *) load_external_function("pgoutput", - "_PG_output_plugin_init", - false, NULL); + (LogicalOutputPluginInit) (void *) + load_external_function("pgoutput", + "_PG_output_plugin_init", + false, NULL); if (plugin_init == NULL) { @@ -62,6 +77,80 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) /* actual pgoutput callback will be called with the appropriate destination shard */ pgoutputChangeCB = cb->change_cb; cb->change_cb = split_change_cb; + cb->filter_by_origin_cb = replication_origin_filter_cb; +} + + +/* + * replication_origin_filter_cb call back function filters out publication of changes + * originated from any other node other than the current node. This is + * identified by the "origin_id" of the changes. The origin_id is set to + * a non-zero value in the origin node as part of WAL replication. + */ +static bool +replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id) +{ + if (origin_id != InvalidRepOriginId) + { + return true; + } + return false; +} + + +/* + * PublishChangesIfCdcSlot checks if the current slot is a CDC slot. If so, it publishes + * the changes as the change for the distributed table instead of shard. + * If not, it returns false. It also skips the Citus metadata tables. + */ +static bool +PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + char *replicationSlotName = ctx->slot->data.name.data; + + /* Check if the replication slot is CITUS_CDC_SLOT*/ + if (replicationSlotName != NULL && + strncmp(replicationSlotName, CITUS_SHARD_PREFIX_SLOT, strlen( + CITUS_SHARD_PREFIX_SLOT)) != 0) + { + /* Skip publishing changes for system relations in pg_catalog*/ + if (relation->rd_rel->relnamespace == PG_CATALOG_NAMESPACE) + { + return true; + } + + char *shardRelationName = RelationGetRelationName(relation); + uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true); + if (shardId != INVALID_SHARD_ID && ShardExists(shardId)) + { + if (ReferenceTableShardId(shardId)) + { + /*For reference tables, publish the changes only from the coordinator node. */ + if (!IsCoordinator()) + { + int nodeID = GetLocalNodeId(); + elog(LOG, + "PublishChangesIfCdcSlot: Skipping changes for reference table %s in node %d", + shardRelationName, nodeID); + return true; + } + } + else + { + /* try to get the distributed relation id for the shard */ + Oid distributedRelationId = RelationIdForShard(shardId); + if (OidIsValid(distributedRelationId)) + { + relation = RelationIdGetRelation(distributedRelationId); + } + } + } + + pgoutputChangeCB(ctx, txn, relation, change); + return true; + } + return false; } @@ -73,10 +162,20 @@ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { + /*check if Citus extension is loaded. */ + if (!CitusHasBeenLoaded()) + { + return; + } + if (!is_publishable_relation(relation)) { return; } + if (PublishChangesIfCdcSlot(ctx, txn, relation, change)) + { + return; + } char *replicationSlotName = ctx->slot->data.name.data; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 3c67d9b78..77d8e359a 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -74,6 +74,7 @@ #include "distributed/recursive_planning.h" #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" +#include "distributed/replication_origin_session_utils.h" #include "distributed/run_from_same_connection.h" #include "distributed/shard_cleaner.h" #include "distributed/shard_transfer.h" @@ -1288,6 +1289,18 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_replication_origin_session", + gettext_noop("Enable replication origin session for avoiding publication of WAL " + "records for shard splits,moves and " + "create_distributed_table/undistribute_table operations."), + NULL, + &isReplicationOriginSessionFeatureEnabled, + false, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_router_execution", gettext_noop("Enables router execution"), @@ -2406,7 +2419,6 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); - /* warn about config items in the citus namespace that are not registered above */ EmitWarningsOnPlaceholders("citus"); diff --git a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql index 96609e520..83f332bfb 100644 --- a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql +++ b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql @@ -28,3 +28,4 @@ INSERT INTO pg_dist_cleanup WHERE plc.shardstate = 4; DELETE FROM pg_dist_placement WHERE shardstate = 4; +#include "udfs/repl_origin_helper/11.2-1.sql" diff --git a/src/backend/distributed/sql/udfs/repl_origin_helper/11.2-1.sql b/src/backend/distributed/sql/udfs/repl_origin_helper/11.2-1.sql new file mode 100644 index 000000000..9df4b2830 --- /dev/null +++ b/src/backend/distributed/sql/udfs/repl_origin_helper/11.2-1.sql @@ -0,0 +1,22 @@ + +CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_start_no_publish() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$replication_origin_session_start_no_publish$$; +COMMENT ON FUNCTION pg_catalog.replication_origin_session_start_no_publish() + IS 'To start Replication origin session for skipping publishing WAL records'; + +CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_end_no_publish() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$replication_origin_session_end_no_publish$$; +COMMENT ON FUNCTION pg_catalog.replication_origin_session_end_no_publish() + IS 'To finish Replication origin session for skipping publishing WAL records'; + +CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_is_no_publish() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$replication_origin_session_is_no_publish$$; +COMMENT ON FUNCTION pg_catalog.replication_origin_session_is_no_publish() + IS 'To check if Replication origin session is currently active for skipping publishing WAL records'; + diff --git a/src/backend/distributed/sql/udfs/repl_origin_helper/latest.sql b/src/backend/distributed/sql/udfs/repl_origin_helper/latest.sql new file mode 100644 index 000000000..9df4b2830 --- /dev/null +++ b/src/backend/distributed/sql/udfs/repl_origin_helper/latest.sql @@ -0,0 +1,22 @@ + +CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_start_no_publish() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$replication_origin_session_start_no_publish$$; +COMMENT ON FUNCTION pg_catalog.replication_origin_session_start_no_publish() + IS 'To start Replication origin session for skipping publishing WAL records'; + +CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_end_no_publish() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$replication_origin_session_end_no_publish$$; +COMMENT ON FUNCTION pg_catalog.replication_origin_session_end_no_publish() + IS 'To finish Replication origin session for skipping publishing WAL records'; + +CREATE OR REPLACE FUNCTION pg_catalog.replication_origin_session_is_no_publish() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$replication_origin_session_is_no_publish$$; +COMMENT ON FUNCTION pg_catalog.replication_origin_session_is_no_publish() + IS 'To check if Replication origin session is currently active for skipping publishing WAL records'; + diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 0f4c3c80a..5add48009 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -34,6 +34,7 @@ #include "distributed/multi_logical_replication.h" #include "distributed/multi_explain.h" #include "distributed/repartition_join_execution.h" +#include "distributed/replication_origin_session_utils.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" @@ -391,6 +392,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); ResetRelationAccessHash(); + /* Reset any local replication origin session since transaction has been aborted.*/ + ResetReplicationOriginLocalSession(); + /* empty the CitusXactCallbackContext to ensure we're not leaking memory */ MemoryContextReset(CitusXactCallbackContext); @@ -715,6 +719,8 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, SetCreateCitusTransactionLevel(0); } + /* Reset any local replication origin session since subtransaction has been aborted.*/ + ResetReplicationOriginLocalSession(); MemoryContextSwitchTo(previousContext); break; diff --git a/src/backend/distributed/utils/replication_origin_session_utils.c b/src/backend/distributed/utils/replication_origin_session_utils.c new file mode 100644 index 000000000..a545974a4 --- /dev/null +++ b/src/backend/distributed/utils/replication_origin_session_utils.c @@ -0,0 +1,275 @@ +/*------------------------------------------------------------------------- + * + * replication_origin_session_utils.c + * Functions for managing replication origin session. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "distributed/replication_origin_session_utils.h" +#include "distributed/remote_commands.h" +#include "distributed/metadata_cache.h" +#include "utils/builtins.h" +#include "miscadmin.h" + +static bool IsRemoteReplicationOriginSessionSetup(MultiConnection *connection); + +static bool ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command, + char *expected); + +static inline bool IsLocalReplicationOriginSessionActive(void); + +PG_FUNCTION_INFO_V1(replication_origin_session_start_no_publish); +PG_FUNCTION_INFO_V1(replication_origin_session_end_no_publish); +PG_FUNCTION_INFO_V1(replication_origin_session_is_no_publish); + +/* + * This variable is used to remember the replication origin id of the current session + * before resetting it to DoNotReplicateId in SetupReplicationOriginLocalSession. + */ +static RepOriginId originalOriginId = InvalidRepOriginId; + +/* + * Setting that controls whether replication origin sessions are enabled. + */ +bool isReplicationOriginSessionFeatureEnabled = false; + + +/* replication_origin_session_start_no_publish starts a new replication origin session + * in the local node. This function is used to avoid publishing the WAL records to the + * replication slot by setting replication origin to DoNotReplicateId in WAL records. + * It remembers the previous replication origin for the current session which will be + * used to reset the replication origin to the previous value when the session ends. + */ +Datum +replication_origin_session_start_no_publish(PG_FUNCTION_ARGS) +{ + if (!isReplicationOriginSessionFeatureEnabled) + { + PG_RETURN_VOID(); + } + SetupReplicationOriginLocalSession(); + PG_RETURN_VOID(); +} + + +/* replication_origin_session_end_no_publish ends the current replication origin session + * in the local node. This function is used to reset the replication origin to the + * earlier value of replication origin. + */ +Datum +replication_origin_session_end_no_publish(PG_FUNCTION_ARGS) +{ + if (!isReplicationOriginSessionFeatureEnabled) + { + PG_RETURN_VOID(); + } + ResetReplicationOriginLocalSession(); + PG_RETURN_VOID(); +} + + +/* replication_origin_session_is_no_publish checks if the current replication origin + * session is active in the local node. + */ +Datum +replication_origin_session_is_no_publish(PG_FUNCTION_ARGS) +{ + if (!isReplicationOriginSessionFeatureEnabled) + { + PG_RETURN_BOOL(false); + } + bool result = IsLocalReplicationOriginSessionActive(); + PG_RETURN_BOOL(result); +} + + +/* IsLocalReplicationOriginSessionActive checks if the current replication origin + * session is active in the local node. + */ +inline bool +IsLocalReplicationOriginSessionActive(void) +{ + return (replorigin_session_origin != InvalidRepOriginId); +} + + +/* + * SetupReplicationOriginLocalSession sets up a new replication origin session in a + * local session. + */ +void +SetupReplicationOriginLocalSession(void) +{ + if (!isReplicationOriginSessionFeatureEnabled) + { + return; + } + + /*elog(LOG, "Setting up local replication origin session"); */ + if (!IsLocalReplicationOriginSessionActive()) + { + originalOriginId = replorigin_session_origin; + replorigin_session_origin = DoNotReplicateId; + + /* Register a call back for ResetReplicationOriginLocalSession function for error cases */ + MemoryContextCallback *replicationOriginResetCallback = palloc0( + sizeof(MemoryContextCallback)); + replicationOriginResetCallback->func = + ResetReplicationOriginLocalSessionCallbackHandler; + replicationOriginResetCallback->arg = NULL; + MemoryContextRegisterResetCallback(CurrentMemoryContext, + replicationOriginResetCallback); + } +} + + +/* + * ResetReplicationOriginLocalSession resets the replication origin session in a + * local node. + */ +void +ResetReplicationOriginLocalSession(void) +{ + /*elog(LOG, "Resetting local replication origin session"); */ + if (!isReplicationOriginSessionFeatureEnabled) + { + return; + } + + if (IsLocalReplicationOriginSessionActive()) + { + replorigin_session_origin = originalOriginId; + } +} + + +/* + * ResetReplicationOriginLocalSessionCallbackHandler is a callback function that + * resets the replication origin session in a local node. This is used to register + * with MemoryContextRegisterResetCallback to reset the replication origin session + * in case of any error for the given memory context. + */ +void +ResetReplicationOriginLocalSessionCallbackHandler(void *arg) +{ + ResetReplicationOriginLocalSession(); +} + + +/* + * SetupReplicationOriginRemoteSession sets up a new replication origin session in a + * remote session. The identifier is used to create a unique replication origin name + * for the session in the remote node. + */ +void +SetupReplicationOriginRemoteSession(MultiConnection *connection) +{ + if (!isReplicationOriginSessionFeatureEnabled) + { + return; + } + if (connection != NULL && !IsRemoteReplicationOriginSessionSetup(connection)) + { + /*elog(LOG, "After IsReplicationOriginSessionSetup session %s,%d", connection->hostname, connection->port); */ + StringInfo replicationOriginSessionSetupQuery = makeStringInfo(); + appendStringInfo(replicationOriginSessionSetupQuery, + "select pg_catalog.replication_origin_session_start_no_publish();"); + ExecuteCriticalRemoteCommand(connection, + replicationOriginSessionSetupQuery->data); + connection->isReplicationOriginSessionSetup = true; + } +} + + +/* + * ResetReplicationOriginRemoteSession resets the replication origin session in a + * remote node. + */ +void +ResetReplicationOriginRemoteSession(MultiConnection *connection) +{ + if (!isReplicationOriginSessionFeatureEnabled) + { + return; + } + if (connection != NULL && connection->isReplicationOriginSessionSetup) + { + /*elog(LOG, "Resetting remote replication origin session %s,%d", connection->hostname, connection->port); */ + StringInfo replicationOriginSessionResetQuery = makeStringInfo(); + appendStringInfo(replicationOriginSessionResetQuery, + "select pg_catalog.replication_origin_session_end_no_publish();"); + ExecuteCriticalRemoteCommand(connection, + replicationOriginSessionResetQuery->data); + connection->isReplicationOriginSessionSetup = false; + } +} + + +/* + * IsRemoteReplicationOriginSessionSetup(MultiConnection *connection) checks if the replication origin is setup + * already in the local or remote session. + */ +static bool +IsRemoteReplicationOriginSessionSetup(MultiConnection *connection) +{ + /*elog(LOG, "IsReplicationOriginSessionSetup: %s,%d", connection->hostname, connection->port); */ + if (connection->isReplicationOriginSessionSetup) + { + return true; + } + + StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo(); + appendStringInfo(isReplicationOriginSessionSetupQuery, + "SELECT pg_catalog.replication_origin_session_is_no_publish()"); + bool result = + ExecuteRemoteCommandAndCheckResult(connection, + isReplicationOriginSessionSetupQuery->data, + "t"); + + connection->isReplicationOriginSessionSetup = result; + return result; +} + + +/* + * ExecuteRemoteCommandAndCheckResult executes the given command in the remote node and + * checks if the result is equal to the expected result. If the result is equal to the + * expected result, the function returns true, otherwise it returns false. + */ +static bool +ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command, + char *expected) +{ + if (!SendRemoteCommand(connection, command)) + { + /* if we cannot connect, we warn and report false */ + ReportConnectionError(connection, WARNING); + return false; + } + bool raiseInterrupts = true; + PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts); + + /* if remote node throws an error, we also throw an error */ + if (!IsResponseOK(queryResult)) + { + ReportResultError(connection, queryResult, ERROR); + } + + StringInfo queryResultString = makeStringInfo(); + + /* Evaluate the queryResult and store it into the queryResultString */ + bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString); + bool result = false; + if (success && strcmp(queryResultString->data, expected) == 0) + { + result = true; + } + + PQclear(queryResult); + ForgetResults(connection); + + return result; +} diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index 13d589a3a..cecd6329d 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -133,6 +133,9 @@ typedef struct CitusCopyDestReceiver /* if true, should copy to local placements in the current session */ bool shouldUseLocalCopy; + /* if true, the operations in the receiver can be published.*/ + bool isPublishable; + /* * Copy into colocated intermediate result. When this is set, the * COPY assumes there are hypothetical colocated shards to the @@ -161,7 +164,8 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, List *columnNameList, int partitionColumnIndex, EState *executorState, - char *intermediateResultPrefix); + char *intermediateResultPrefix, + bool isPublishable); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index f95fb612d..f743c4fa2 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -173,6 +173,9 @@ typedef struct MultiConnection /* is the connection currently in use, and shouldn't be used by anything else */ bool claimedExclusively; + /* is the replication origin session has already been setup for this connection. */ + bool isReplicationOriginSessionSetup; + /* * Should be used to access/modify metadata. See REQUIRE_METADATA_CONNECTION for * the details. diff --git a/src/include/distributed/replication_origin_session_utils.h b/src/include/distributed/replication_origin_session_utils.h new file mode 100644 index 000000000..d64d2eef2 --- /dev/null +++ b/src/include/distributed/replication_origin_session_utils.h @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * replication_origin_utils.h + * Utilities related to replication origin. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef REPLICATION_ORIGIN_SESSION_UTILS_H +#define REPLICATION_ORIGIN_SESSION_UTILS_H + +#include "postgres.h" +#include "replication/origin.h" +#include "distributed/connection_management.h" + +extern void InitializeReplicationOriginSessionUtils(void); + +extern void SetupReplicationOriginRemoteSession(MultiConnection *connection); +extern void ResetReplicationOriginRemoteSession(MultiConnection *connection); + +extern void SetupReplicationOriginLocalSession(void); +extern void ResetReplicationOriginLocalSession(void); +extern void ResetReplicationOriginLocalSessionCallbackHandler(void *arg); + + +extern bool isReplicationOriginSessionFeatureEnabled; + + +#endif /* REPLICATION_ORIGIN_SESSION_UTILS_H */ \ No newline at end of file diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 7ba049c6c..89ce50521 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1231,7 +1231,10 @@ SELECT * FROM pg_dist_cleanup; SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- -(0 rows) + | function replication_origin_session_end_no_publish() void + | function replication_origin_session_is_no_publish() void + | function replication_origin_session_start_no_publish() void +(3 rows) -- Snapshot of state at 11.2-1 ALTER EXTENSION citus UPDATE TO '11.2-1'; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 7cd2f63c8..0f458ad99 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -215,6 +215,9 @@ ORDER BY 1; function remove_local_tables_from_metadata() function replicate_reference_tables(citus.shard_transfer_mode) function replicate_table_shards(regclass,integer,integer,bigint[],citus.shard_transfer_mode) + function replication_origin_session_end_no_publish() + function replication_origin_session_is_no_publish() + function replication_origin_session_start_no_publish() function role_exists(name) function run_command_on_all_nodes(text,boolean,boolean) function run_command_on_colocated_placements(regclass,regclass,text,boolean) @@ -318,5 +321,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(310 rows) +(313 rows) diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 25615b87b..34186a398 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -485,6 +485,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'"); push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on"); push(@pgOptions, "citus.stat_statements_track = 'all'"); +push(@pgOptions, "citus.enable_replication_origin_session=on"); # Some tests look at shards in pg_class, make sure we can usually see them: push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");