mirror of https://github.com/citusdata/citus.git
Added formatting changes by citus_indent utility.
parent
827393ab41
commit
4635c1533f
|
@ -103,11 +103,14 @@ CanUseLocalCopy(uint32_t destinationNodeId)
|
|||
return GetLocalNodeId() == (int32) destinationNodeId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetupReplicationOriginSessionIfNotSetupAlready sets up the replication origin session
|
||||
* if it is not already setup.
|
||||
*/
|
||||
static void SetupReplicationOriginSessionIfNotSetupAlready(MultiConnection *connection) {
|
||||
static void
|
||||
SetupReplicationOriginSessionIfNotSetupAlready(MultiConnection *connection)
|
||||
{
|
||||
/* Setup replication Origin if not setup already */
|
||||
if (!SendRemoteCommand(connection, CDC_REPLICATION_ORIGIN_SESION_SETUP_CMD))
|
||||
{
|
||||
|
@ -116,11 +119,14 @@ CanUseLocalCopy(uint32_t destinationNodeId)
|
|||
ForgetResults(connection);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetReplicationOriginSessionIfSetupAlready resets the replication origin session
|
||||
* if it has been setup currently.
|
||||
*/
|
||||
static void ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connection) {
|
||||
static void
|
||||
ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connection)
|
||||
{
|
||||
if (!SendRemoteCommand(connection, CDC_REPLICATION_ORIGIN_SESION_RESET_CMD))
|
||||
{
|
||||
ReportConnectionError(connection, ERROR);
|
||||
|
@ -128,6 +134,7 @@ static void ResetReplicationOriginSessionIfSetupAlready(MultiConnection *connect
|
|||
ForgetResults(connection);
|
||||
}
|
||||
|
||||
|
||||
/* Connect to node with source shard and trigger copy start. */
|
||||
static void
|
||||
ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
|
||||
|
@ -143,8 +150,8 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
|
|||
NULL /* database (current) */);
|
||||
ClaimConnectionExclusively(copyDest->connection);
|
||||
|
||||
/* Setup Replication Origin Session if not setup already for
|
||||
avoiding publication of events more than once. */
|
||||
/* Setup Replication Origin Session if not setup already for
|
||||
* avoiding publication of events more than once. */
|
||||
SetupReplicationOriginSessionIfNotSetupAlready(copyDest->connection);
|
||||
|
||||
StringInfo copyStatement = ConstructShardCopyStatement(
|
||||
|
|
|
@ -40,10 +40,11 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation,
|
|||
static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
|
||||
TupleDesc sourceTupleDesc,
|
||||
TupleDesc targetTupleDesc);
|
||||
static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id);
|
||||
static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId
|
||||
origin_id);
|
||||
|
||||
static bool PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change);
|
||||
Relation relation, ReorderBufferChange *change);
|
||||
|
||||
/* used in the replication_origin_filter_cb function. */
|
||||
#define InvalidRepOriginId 0
|
||||
|
@ -84,7 +85,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
|
|||
* identified by the "origin_id" of the changes. The origin_id is set to
|
||||
* a non-zero value in the origin node as part of WAL replication.
|
||||
*/
|
||||
static bool
|
||||
static bool
|
||||
replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
||||
{
|
||||
if (origin_id != InvalidRepOriginId)
|
||||
|
@ -94,25 +95,29 @@ replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
|||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
/*
|
||||
* PublishChangesIfCdcSlot checks if the current slot is a CDC slot. If so, it publishes
|
||||
* the changes as the change for the distributed table instead of shard.
|
||||
* the changes as the change for the distributed table instead of shard.
|
||||
* If not, it returns false. It also skips the Citus metadata tables.
|
||||
*/
|
||||
static bool
|
||||
PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change)
|
||||
Relation relation, ReorderBufferChange *change)
|
||||
{
|
||||
char *replicationSlotName = ctx->slot->data.name.data;
|
||||
|
||||
/* Check if the replication slot is CITUS_CDC_SLOT*/
|
||||
if (replicationSlotName != NULL && strcmp(replicationSlotName, CITUS_CDC_SLOT_NAME) == 0)
|
||||
if (replicationSlotName != NULL && strcmp(replicationSlotName, CITUS_CDC_SLOT_NAME) ==
|
||||
0)
|
||||
{
|
||||
/* Skip publishing changes for Citus metadata tables*/
|
||||
ObjectAddress objectAdress = {RelationRelationId, relation->rd_id, 0};
|
||||
if (IsObjectAddressOwnedByCitus(&objectAdress))
|
||||
ObjectAddress objectAdress = { RelationRelationId, relation->rd_id, 0 };
|
||||
if (IsObjectAddressOwnedByCitus(&objectAdress))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/* Check if this change is for a shard in distributed table. */
|
||||
if (RelationIsAKnownShard(relation->rd_id))
|
||||
{
|
||||
|
@ -130,7 +135,7 @@ PublishChangesIfCdcSlot(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
}
|
||||
pgoutputChangeCB(ctx, txn, relation, change);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue