diff --git a/.circleci/config.yml b/.circleci/config.yml index d0db414ce..995832515 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -455,6 +455,10 @@ jobs: pg_major: << parameters.pg_major >> - configure - enable_core + - run: + name: 'Install DBI.pm' + command: | + apt-get update && apt-get install libdbi-perl && apt-get install libdbd-pg-perl - run: name: 'Run Test' command: | @@ -889,6 +893,10 @@ workflows: <<: *tap-test-citus-15 name: 'test-15_tap-columnar-freezing' suite: columnar_freezing + - tap-test-citus: + <<: *tap-test-citus-15 + name: 'test-15_tap-cdc' + suite: cdc - test-arbitrary-configs: name: 'test-13_check-arbitrary-configs' diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index a51c2a1f5..9c5218b35 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -18,7 +18,7 @@ generated_downgrade_sql_files += $(patsubst %,$(citus_abs_srcdir)/build/sql/%,$( DATA_built = $(generated_sql_files) # directories with source files -SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock +SUBDIRS = . commands cdc connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock # enterprise modules SUBDIRS += replication diff --git a/src/backend/distributed/cdc/cdc_decoder.c b/src/backend/distributed/cdc/cdc_decoder.c new file mode 100644 index 000000000..5df4271e6 --- /dev/null +++ b/src/backend/distributed/cdc/cdc_decoder.c @@ -0,0 +1,388 @@ +/*------------------------------------------------------------------------- + * + * cdc_decoder.c + * CDC Decoder plugin for Citus + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "common/hashfn.h" +#include "utils/typcache.h" +#include "utils/lsyscache.h" +#include "catalog/pg_namespace.h" +#include "distributed/cdc_decoder.h" +#include "distributed/relay_utility.h" +#include "distributed/worker_protocol.h" +#include "distributed/metadata_cache.h" + +static LogicalDecodeChangeCB ouputPluginChangeCB; + + +static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId + origin_id); + +static void TranslateChangesIfSchemaChanged(Relation relation, Relation targetRelation, + ReorderBufferChange *change); + +static void TranslateAndPublishRelationForCDC(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change, Oid shardId, + Oid targetRelationid); + +typedef struct +{ + uint64 shardId; + Oid distributedTableId; + bool isReferenceTable; + bool isNull; +} ShardIdHashEntry; + +static HTAB *shardToDistributedTableMap = NULL; + + +/* + * InitShardToDistributedTableMap initializes the hash table that is used to + * translate the changes in the shard table to the changes in the distributed table. + */ +static void +InitShardToDistributedTableMap() +{ + HASHCTL info; + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(uint64); + info.entrysize = sizeof(ShardIdHashEntry); + info.hash = tag_hash; + info.hcxt = CurrentMemoryContext; + + int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION); + shardToDistributedTableMap = hash_create("CDC Decoder translation hash table", 1024, + &info, hashFlags); +} + + +/* + * AddShardIdToHashTable adds the shardId to the hash table. + */ +static Oid +AddShardIdToHashTable(Oid shardId, ShardIdHashEntry *entry) +{ + entry->shardId = shardId; + entry->distributedTableId = LookupShardRelationFromCatalog(shardId, true); + entry->isReferenceTable = PartitionMethodViaCatalog(entry->distributedTableId) == 'n'; + return entry->distributedTableId; +} + + +static Oid +LookupDistributedTableIdForShardId(Oid shardId, bool *isReferenceTable) +{ + bool found; + Oid distributedTableId = InvalidOid; + ShardIdHashEntry *entry = (ShardIdHashEntry *) hash_search(shardToDistributedTableMap, + &shardId, + HASH_FIND | HASH_ENTER, + &found); + if (found) + { + distributedTableId = entry->distributedTableId; + } + else + { + distributedTableId = AddShardIdToHashTable(shardId, entry); + } + *isReferenceTable = entry->isReferenceTable; + return distributedTableId; +} + + +/* + * InitCDCDecoder is called by from the shard split decoder plugin's init function. + * It sets the call back function for filtering out changes originated from other nodes. + * It also sets the call back function for processing the changes in ouputPluginChangeCB. + * This function is common for both CDC and shard split decoder plugins. + */ +void +InitCDCDecoder(OutputPluginCallbacks *cb, LogicalDecodeChangeCB changeCB) +{ + elog(LOG, "Initializing CDC decoder"); + cb->filter_by_origin_cb = replication_origin_filter_cb; + ouputPluginChangeCB = changeCB; + + /* Initialize the hash table used for mapping shard to shell tables. */ + InitShardToDistributedTableMap(); +} + + +/* + * replication_origin_filter_cb call back function filters out publication of changes + * originated from any other node other than the current node. This is + * identified by the "origin_id" of the changes. The origin_id is set to + * a non-zero value in the origin node as part of WAL replication for internal + * operations like shard split/moves/create_distributed_table etc. + */ +static bool +replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id) +{ + return (origin_id != InvalidRepOriginId); +} + + +/* + * This function is responsible for translating the changes in the shard table to + * the changes in the shell table and publishing the changes as a change to the + * distributed table so that CDD clients are not aware of the shard tables. It also + * handles schema changes to the distributed table. + */ +static void +TranslateAndPublishRelationForCDC(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change, Oid + shardId, Oid targetRelationid) +{ + /* Get the distributed table's relation for this shard.*/ + Relation targetRelation = RelationIdGetRelation(targetRelationid); + + /* + * Check if there has been a schema change (such as a dropped column), by comparing + * the number of attributes in the shard table and the shell table. + */ + TranslateChangesIfSchemaChanged(relation, targetRelation, change); + + /* + * Publish the change to the shard table as the change in the distributed table, + * so that the CDC client can see the change in the distributed table, + * instead of the shard table, by calling the pgoutput's callback function. + */ + ouputPluginChangeCB(ctx, txn, targetRelation, change); + RelationClose(targetRelation); +} + + +/* + * PublishChangesIfCdcSlot checks if the current slot is a CDC slot. If so, it publishes + * the changes as the change for the distributed table instead of shard. + * If not, it returns false. It also skips the Citus metadata tables. + */ +void +PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + char *shardRelationName = RelationGetRelationName(relation); + + /* Skip publishing CDC changes for any system relations in pg_catalog*/ + if (relation->rd_rel->relnamespace == PG_CATALOG_NAMESPACE) + { + return; + } + + /* Check if the relation is a distributed table by checking for shard name. */ + uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true); + + /* If this relation is not distributed, call the pgoutput's callback and return. */ + if (shardId == INVALID_SHARD_ID) + { + ouputPluginChangeCB(ctx, txn, relation, change); + return; + } + + bool isReferenceTable = false; + Oid distRelationId = LookupDistributedTableIdForShardId(shardId, &isReferenceTable); + if (distRelationId == InvalidOid) + { + ouputPluginChangeCB(ctx, txn, relation, change); + return; + } + + /* Publish changes for reference table only from the coordinator node. */ + if (isReferenceTable && !IsCoordinator()) + { + return; + } + + /* translate and publish from shard relation to distributed table relation for CDC. */ + TranslateAndPublishRelationForCDC(ctx, txn, relation, change, shardId, + distRelationId); +} + + +/* + * GetTupleForTargetSchemaForCdc returns a heap tuple with the data from sourceRelationTuple + * to match the schema in targetRelDesc. Either or both source and target relations may have + * dropped columns. This function handles it by adding NULL values for dropped columns in + * target relation and skipping dropped columns in source relation. It returns a heap tuple + * adjusted to the current schema of the target relation. + */ +static HeapTuple +GetTupleForTargetSchemaForCdc(HeapTuple sourceRelationTuple, + TupleDesc sourceRelDesc, + TupleDesc targetRelDesc) +{ + /* Allocate memory for sourceValues and sourceNulls arrays. */ + Datum *sourceValues = (Datum *) palloc0(sourceRelDesc->natts * sizeof(Datum)); + bool *sourceNulls = (bool *) palloc0(sourceRelDesc->natts * sizeof(bool)); + + /* Deform the source tuple to sourceValues and sourceNulls arrays. */ + heap_deform_tuple(sourceRelationTuple, sourceRelDesc, sourceValues, + sourceNulls); + + /* This is the next field to Read in the source relation */ + uint32 sourceIndex = 0; + uint32 targetIndex = 0; + + /* Allocate memory for sourceValues and sourceNulls arrays. */ + Datum *targetValues = (Datum *) palloc0(targetRelDesc->natts * sizeof(Datum)); + bool *targetNulls = (bool *) palloc0(targetRelDesc->natts * sizeof(bool)); + + /* Loop through all source and target attributes one by one and handle any dropped attributes.*/ + while (targetIndex < targetRelDesc->natts) + { + /* If this target attribute has been dropped, add a NULL attribute in targetValues and continue.*/ + if (TupleDescAttr(targetRelDesc, targetIndex)->attisdropped) + { + Datum nullDatum = (Datum) 0; + targetValues[targetIndex] = nullDatum; + targetNulls[targetIndex] = true; + targetIndex++; + } + + /* If this source attribute has been dropped, just skip this source attribute.*/ + else if (TupleDescAttr(sourceRelDesc, sourceIndex)->attisdropped) + { + sourceIndex++; + continue; + } + + /* If both source and target attributes are not dropped, add the attribute field to targetValues. */ + else if (sourceIndex < sourceRelDesc->natts) + { + targetValues[targetIndex] = sourceValues[sourceIndex]; + targetNulls[targetIndex] = sourceNulls[sourceIndex]; + sourceIndex++; + targetIndex++; + } + else + { + /* If there are no more source fields, add a NULL field in targetValues. */ + Datum nullDatum = (Datum) 0; + targetValues[targetIndex] = nullDatum; + targetNulls[targetIndex] = true; + targetIndex++; + } + } + + /* Form a new tuple from the target values created by the above loop. */ + HeapTuple targetRelationTuple = heap_form_tuple(targetRelDesc, targetValues, + targetNulls); + return targetRelationTuple; +} + + +/* HasSchemaChanged function returns if there any schema changes between source and target relations.*/ +static bool +HasSchemaChanged(TupleDesc sourceRelationDesc, TupleDesc targetRelationDesc) +{ + bool hasSchemaChanged = (sourceRelationDesc->natts != targetRelationDesc->natts); + if (hasSchemaChanged) + { + return true; + } + + for (uint32 i = 0; i < sourceRelationDesc->natts; i++) + { + if (TupleDescAttr(sourceRelationDesc, i)->attisdropped || + TupleDescAttr(targetRelationDesc, i)->attisdropped) + { + hasSchemaChanged = true; + break; + } + } + + return hasSchemaChanged; +} + + +/* + * TranslateChangesIfSchemaChanged translates the tuples ReorderBufferChange + * if there is a schema change between source and target relations. + */ +static void +TranslateChangesIfSchemaChanged(Relation sourceRelation, Relation targetRelation, + ReorderBufferChange *change) +{ + TupleDesc sourceRelationDesc = RelationGetDescr(sourceRelation); + TupleDesc targetRelationDesc = RelationGetDescr(targetRelation); + + /* if there are no changes between source and target relations, return. */ + if (!HasSchemaChanged(sourceRelationDesc, targetRelationDesc)) + { + return; + } + + /* Check the ReorderBufferChange's action type and handle them accordingly.*/ + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + /* For insert action, only new tuple should always be translated*/ + HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple); + HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + change->data.tp.newtuple->tuple = *targetRelationNewTuple; + break; + } + + /* + * For update changes both old and new tuples need to be translated for target relation + * if the REPLICA IDENTITY is set to FULL. Otherwise, only the new tuple needs to be + * translated for target relation. + */ + case REORDER_BUFFER_CHANGE_UPDATE: + { + /* For update action, new tuple should always be translated*/ + /* Get the new tuple from the ReorderBufferChange, and translate it to target relation. */ + HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple); + HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + change->data.tp.newtuple->tuple = *targetRelationNewTuple; + + /* + * Format oldtuple according to the target relation. If the column values of replica + * identiy change, then the old tuple is non-null and needs to be formatted according + * to the target relation schema. + */ + if (change->data.tp.oldtuple != NULL) + { + HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple); + HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple->tuple = *targetRelationOldTuple; + } + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + /* For delete action, only old tuple should be translated*/ + HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple); + HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple->tuple = *targetRelationOldTuple; + break; + } + + default: + { + /* Do nothing for other action types. */ + break; + } + } +} diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index f51b62535..c0deadd1e 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -55,6 +55,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" +#include "distributed/replication_origin_session_utils.h" #include "distributed/shared_library_init.h" #include "distributed/shard_utils.h" #include "distributed/worker_protocol.h" @@ -406,7 +407,10 @@ UndistributeTable(TableConversionParameters *params) params->shardCountIsNull = true; TableConversionState *con = CreateTableConversion(params); - return ConvertTable(con); + SetupReplicationOriginLocalSession(); + TableConversionReturn *conv = ConvertTable(con); + ResetReplicationOriginLocalSession(); + return conv; } diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index e38395296..4bb867c6c 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -2406,12 +2406,12 @@ CopyLocalDataIntoShards(Oid distributedRelationId) EState *estate = CreateExecutorState(); ExprContext *econtext = GetPerTupleExprContext(estate); econtext->ecxt_scantuple = slot; - + const bool nonPublishableData = false; DestReceiver *copyDest = (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, columnNameList, partitionColumnIndex, - estate, NULL); + estate, NULL, nonPublishableData); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index fbfce7119..5cf01baf4 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -36,6 +36,7 @@ #include "distributed/local_multi_copy.h" #include "distributed/shard_utils.h" #include "distributed/version_compat.h" +#include "distributed/replication_origin_session_utils.h" /* managed via GUC, default is 512 kB */ int LocalCopyFlushThresholdByte = 512 * 1024; @@ -46,7 +47,7 @@ static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDes static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary); static bool ShouldSendCopyNow(StringInfo buffer); static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, - CopyStmt *copyStatement, bool isEndOfCopy); + CopyStmt *copyStatement, bool isEndOfCopy, bool isPublishable); static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); @@ -94,7 +95,7 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in bool isEndOfCopy = false; DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId, - copyDest->copyStatement, isEndOfCopy); + copyDest->copyStatement, isEndOfCopy, copyDest->isPublishable); resetStringInfo(localCopyOutState->fe_msgbuf); } } @@ -133,7 +134,7 @@ FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, } bool isEndOfCopy = true; DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId, - copyDest->copyStatement, isEndOfCopy); + copyDest->copyStatement, isEndOfCopy, copyDest->isPublishable); } @@ -197,7 +198,7 @@ ShouldSendCopyNow(StringInfo buffer) */ static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement, - bool isEndOfCopy) + bool isEndOfCopy, bool isPublishable) { /* * Set the buffer as a global variable to allow ReadFromLocalBufferCallback @@ -205,6 +206,10 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat * ReadFromLocalBufferCallback. */ LocalCopyBuffer = buffer; + if (!isPublishable) + { + SetupReplicationOriginLocalSession(); + } Oid shardOid = GetTableLocalShardOid(relationId, shardId); Relation shard = table_open(shardOid, RowExclusiveLock); @@ -219,6 +224,10 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat EndCopyFrom(cstate); table_close(shard, NoLock); + if (!isPublishable) + { + ResetReplicationOriginLocalSession(); + } free_parsestate(pState); } diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 1203aeff4..6e3d19b68 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -85,6 +85,7 @@ #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" +#include "distributed/replication_origin_session_utils.h" #include "distributed/resource_lock.h" #include "distributed/shard_pruning.h" #include "distributed/shared_connection_stats.h" @@ -270,7 +271,8 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool *found, bool shouldUseLocalCopy, CopyOutState - copyOutState, bool isColocatedIntermediateResult); + copyOutState, bool isColocatedIntermediateResult, + bool isPublishable); static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, bool colocatedIntermediateResult); @@ -285,7 +287,8 @@ static void InitializeCopyShardState(CopyShardState *shardState, uint64 shardId, bool canUseLocalCopy, CopyOutState copyOutState, - bool colocatedIntermediateResult); + bool colocatedIntermediateResult, bool + isPublishable); static void StartPlacementStateCopyCommand(CopyPlacementState *placementState, CopyStmt *copyStatement, CopyOutState copyOutState); @@ -492,9 +495,11 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletion *completionTag) ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); /* set up the destination for the COPY */ + const bool publishableData = true; CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex, - executorState, NULL); + executorState, NULL, + publishableData); /* if the user specified an explicit append-to_shard option, write to it */ uint64 appendShardId = ProcessAppendToShardOption(tableId, copyStatement); @@ -1934,7 +1939,7 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex, EState *executorState, - char *intermediateResultIdPrefix) + char *intermediateResultIdPrefix, bool isPublishable) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0( sizeof(CitusCopyDestReceiver)); @@ -1953,6 +1958,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu copyDest->executorState = executorState; copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix; copyDest->memoryContext = CurrentMemoryContext; + copyDest->isPublishable = isPublishable; return copyDest; } @@ -2318,7 +2324,9 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest &cachedShardStateFound, copyDest->shouldUseLocalCopy, copyDest->copyOutState, - isColocatedIntermediateResult); + isColocatedIntermediateResult, + copyDest->isPublishable); + if (!cachedShardStateFound) { firstTupleInShard = true; @@ -2751,6 +2759,11 @@ ShutdownCopyConnectionState(CopyConnectionState *connectionState, if (activePlacementState != NULL) { EndPlacementStateCopyCommand(activePlacementState, copyOutState); + if (!copyDest->isPublishable) + { + ResetReplicationOriginRemoteSession( + activePlacementState->connectionState->connection); + } } dlist_foreach(iter, &connectionState->bufferedPlacementList) @@ -2764,6 +2777,10 @@ ShutdownCopyConnectionState(CopyConnectionState *connectionState, SendCopyDataToPlacement(placementState->data, shardId, connectionState->connection); EndPlacementStateCopyCommand(placementState, copyOutState); + if (!copyDest->isPublishable) + { + ResetReplicationOriginRemoteSession(connectionState->connection); + } } } @@ -3436,7 +3453,7 @@ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool *found, bool shouldUseLocalCopy, CopyOutState copyOutState, - bool isColocatedIntermediateResult) + bool isColocatedIntermediateResult, bool isPublishable) { CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId, HASH_ENTER, found); @@ -3444,7 +3461,8 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, { InitializeCopyShardState(shardState, connectionStateHash, shardId, shouldUseLocalCopy, - copyOutState, isColocatedIntermediateResult); + copyOutState, isColocatedIntermediateResult, + isPublishable); } return shardState; @@ -3461,7 +3479,8 @@ InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, bool shouldUseLocalCopy, CopyOutState copyOutState, - bool colocatedIntermediateResult) + bool colocatedIntermediateResult, + bool isPublishable) { ListCell *placementCell = NULL; int failedPlacementCount = 0; @@ -3532,6 +3551,11 @@ InitializeCopyShardState(CopyShardState *shardState, RemoteTransactionBeginIfNecessary(connection); } + if (!isPublishable) + { + SetupReplicationOriginRemoteSession(connection); + } + CopyPlacementState *placementState = palloc0(sizeof(CopyPlacementState)); placementState->shardState = shardState; placementState->data = makeStringInfo(); diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 8ab35ca42..12a5e7b3f 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -1484,6 +1484,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) * - Current cached connections is already at MaxCachedConnectionsPerWorker * - Connection is forced to close at the end of transaction * - Connection is not in OK state + * - Connection has a replication origin setup * - A transaction is still in progress (usually because we are cancelling a distributed transaction) * - A connection reached its maximum lifetime */ @@ -1503,6 +1504,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection PQstatus(connection->pgConn) != CONNECTION_OK || !RemoteTransactionIdle(connection) || connection->requiresReplication || + connection->isReplicationOriginSessionSetup || (MaxCachedConnectionLifetime >= 0 && MillisecondsToTimeout(connection->connectionEstablishmentStart, MaxCachedConnectionLifetime) <= 0); diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 906d78e42..1dfd51781 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -573,6 +573,47 @@ SendRemoteCommand(MultiConnection *connection, const char *command) } +/* + * ExecuteRemoteCommandAndCheckResult executes the given command in the remote node and + * checks if the result is equal to the expected result. If the result is equal to the + * expected result, the function returns true, otherwise it returns false. + */ +bool +ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command, + char *expected) +{ + if (!SendRemoteCommand(connection, command)) + { + /* if we cannot connect, we warn and report false */ + ReportConnectionError(connection, WARNING); + return false; + } + bool raiseInterrupts = true; + PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts); + + /* if remote node throws an error, we also throw an error */ + if (!IsResponseOK(queryResult)) + { + ReportResultError(connection, queryResult, ERROR); + } + + StringInfo queryResultString = makeStringInfo(); + + /* Evaluate the queryResult and store it into the queryResultString */ + bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString); + bool result = false; + if (success && strcmp(queryResultString->data, expected) == 0) + { + result = true; + } + + PQclear(queryResult); + ForgetResults(connection); + + return result; +} + + /* * ReadFirstColumnAsText reads the first column of result tuples from the given * PGresult struct and returns them in a StringInfo list. diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 9549846d5..a69ae0f22 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -409,11 +409,13 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, columnNameList); /* set up a DestReceiver that copies into the intermediate table */ + const bool publishableData = true; CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, executorState, - intermediateResultIdPrefix); + intermediateResultIdPrefix, + publishableData); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); @@ -443,10 +445,12 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, columnNameList); /* set up a DestReceiver that copies into the distributed table */ + const bool publishableData = true; CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, - executorState, NULL); + executorState, NULL, + publishableData); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index e9c2af512..00a5413c9 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -24,6 +24,7 @@ #include "distributed/relation_utils.h" #include "distributed/version_compat.h" #include "distributed/local_executor.h" +#include "distributed/replication_origin_session_utils.h" /* * LocalCopyBuffer is used in copy callback to return the copied rows. @@ -80,6 +81,7 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest); + static bool CanUseLocalCopy(uint32_t destinationNodeId) { @@ -103,6 +105,12 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); + + RemoteTransactionBeginIfNecessary(copyDest->connection); + + SetupReplicationOriginRemoteSession(copyDest->connection); + + StringInfo copyStatement = ConstructShardCopyStatement( copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary, @@ -185,6 +193,8 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) CopyOutState copyOutState = copyDest->copyOutState; if (copyDest->useLocalCopy) { + /* Setup replication origin session for local copy*/ + WriteLocalTuple(slot, copyDest); if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte) { @@ -260,6 +270,11 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); copyDest->copyOutState = copyOutState; + if (copyDest->useLocalCopy) + { + /* Setup replication origin session for local copy*/ + SetupReplicationOriginLocalSession(); + } } @@ -318,6 +333,9 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) PQclear(result); ForgetResults(copyDest->connection); + + ResetReplicationOriginRemoteSession(copyDest->connection); + CloseConnection(copyDest->connection); } } @@ -330,6 +348,10 @@ static void ShardCopyDestReceiverDestroy(DestReceiver *dest) { ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest; + if (copyDest->useLocalCopy) + { + ResetReplicationOriginLocalSession(); + } if (copyDest->copyOutState) { diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 51a56b36e..7693f216e 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -8,21 +8,27 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include "distributed/cdc_decoder.h" #include "distributed/shardinterval_utils.h" #include "distributed/shardsplit_shared_memory.h" +#include "distributed/worker_shard_visibility.h" +#include "distributed/worker_protocol.h" #include "distributed/listutils.h" +#include "distributed/metadata/distobject.h" #include "replication/logical.h" #include "utils/typcache.h" - +#include "utils/lsyscache.h" +#include "catalog/pg_namespace.h" extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); -static LogicalDecodeChangeCB pgoutputChangeCB; +static LogicalDecodeChangeCB ouputPluginChangeCB; static HTAB *SourceToDestinationShardMap = NULL; /* Plugin callback */ -static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change); +static void shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); /* Helper methods */ static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation, @@ -38,6 +44,22 @@ static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple, TupleDesc sourceTupleDesc, TupleDesc targetTupleDesc); +inline static bool IsShardSplitSlot(char *replicationSlotName); + + +#define CITUS_SHARD_SLOT_PREFIX "citus_shard_" +#define CITUS_SHARD_SLOT_PREFIX_SIZE (sizeof(CITUS_SHARD_SLOT_PREFIX) - 1) + +/* build time macro for base decoder plugin name for CDC and Shard Split. */ +#ifndef CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME +#define CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME "pgoutput" +#endif + +/* build time macro for base decoder plugin's initialization function name for CDC and Shard Split. */ +#ifndef CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME +#define CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME "_PG_output_plugin_init" +#endif + /* * Postgres uses 'pgoutput' as default plugin for logical replication. * We want to reuse Postgres pgoutput's functionality as much as possible. @@ -47,9 +69,10 @@ void _PG_output_plugin_init(OutputPluginCallbacks *cb) { LogicalOutputPluginInit plugin_init = - (LogicalOutputPluginInit) (void *) load_external_function("pgoutput", - "_PG_output_plugin_init", - false, NULL); + (LogicalOutputPluginInit) (void *) + load_external_function(CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME, + CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME, + false, NULL); if (plugin_init == NULL) { @@ -60,25 +83,61 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) plugin_init(cb); /* actual pgoutput callback will be called with the appropriate destination shard */ - pgoutputChangeCB = cb->change_cb; - cb->change_cb = split_change_cb; + ouputPluginChangeCB = cb->change_cb; + cb->change_cb = shard_split_and_cdc_change_cb; + InitCDCDecoder(cb, ouputPluginChangeCB); } /* - * split_change function emits the incoming tuple change + * Check if the replication slot is for Shard split by checking for prefix. + */ +inline static +bool +IsShardSplitSlot(char *replicationSlotName) +{ + return strncmp(replicationSlotName, CITUS_SHARD_SLOT_PREFIX, + CITUS_SHARD_SLOT_PREFIX_SIZE) == 0; +} + + +/* + * shard_split_and_cdc_change_cb function emits the incoming tuple change * to the appropriate destination shard. */ static void -split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change) +shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) { + /* + * If Citus has not been loaded yet, pass the changes + * through to the undrelying decoder plugin. + */ + if (!CitusHasBeenLoaded()) + { + ouputPluginChangeCB(ctx, txn, relation, change); + return; + } + + /* check if the relation is publishable.*/ if (!is_publishable_relation(relation)) { return; } char *replicationSlotName = ctx->slot->data.name.data; + if (replicationSlotName == NULL) + { + elog(ERROR, "Replication slot name is NULL!"); + return; + } + + /* check for the internal shard split names, if not, assume the slot is for CDC. */ + if (!IsShardSplitSlot(replicationSlotName)) + { + PublishDistributedTableChanges(ctx, txn, relation, change); + return; + } /* * Initialize SourceToDestinationShardMap if not already initialized. @@ -198,7 +257,7 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } } - pgoutputChangeCB(ctx, txn, targetRelation, change); + ouputPluginChangeCB(ctx, txn, targetRelation, change); RelationClose(targetRelation); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 9bed016a5..bd9ca679b 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -74,6 +74,7 @@ #include "distributed/recursive_planning.h" #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" +#include "distributed/replication_origin_session_utils.h" #include "distributed/run_from_same_connection.h" #include "distributed/shard_cleaner.h" #include "distributed/shard_transfer.h" @@ -1132,6 +1133,16 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_change_data_capture", + gettext_noop("Enables using replication origin tracking for change data capture"), + NULL, + &EnableChangeDataCapture, + false, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_cluster_clock", gettext_noop("When users explicitly call UDF citus_get_transaction_clock() " @@ -2426,7 +2437,6 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); - /* warn about config items in the citus namespace that are not registered above */ EmitWarningsOnPlaceholders("citus"); diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index 981c5f375..148678da7 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -1,4 +1,4 @@ -- citus--11.2-1--11.3-1 - +#include "udfs/repl_origin_helper/11.3-1.sql" -- bump version to 11.3-1 diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql index 7d71235d7..02a92f967 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql @@ -1,2 +1,4 @@ -- citus--11.3-1--11.2-1 --- this is an empty downgrade path since citus--11.2-1--11.3-1.sql is empty for now +DROP FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking(); +DROP FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking(); +DROP FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active(); diff --git a/src/backend/distributed/sql/udfs/repl_origin_helper/11.3-1.sql b/src/backend/distributed/sql/udfs/repl_origin_helper/11.3-1.sql new file mode 100644 index 000000000..5fe5a3bb9 --- /dev/null +++ b/src/backend/distributed/sql/udfs/repl_origin_helper/11.3-1.sql @@ -0,0 +1,20 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_start_replication_origin_tracking$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking() + IS 'To start replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_stop_replication_origin_tracking$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking() + IS 'To stop replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active() +RETURNS boolean +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_is_replication_origin_tracking_active$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active() + IS 'To check if replication origin tracking is active for skipping publishing of duplicated events during internal data movements for CDC'; diff --git a/src/backend/distributed/sql/udfs/repl_origin_helper/latest.sql b/src/backend/distributed/sql/udfs/repl_origin_helper/latest.sql new file mode 100644 index 000000000..5fe5a3bb9 --- /dev/null +++ b/src/backend/distributed/sql/udfs/repl_origin_helper/latest.sql @@ -0,0 +1,20 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_start_replication_origin_tracking$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking() + IS 'To start replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking() +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_stop_replication_origin_tracking$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking() + IS 'To stop replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active() +RETURNS boolean +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_is_replication_origin_tracking_active$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active() + IS 'To check if replication origin tracking is active for skipping publishing of duplicated events during internal data movements for CDC'; diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 0f4c3c80a..5add48009 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -34,6 +34,7 @@ #include "distributed/multi_logical_replication.h" #include "distributed/multi_explain.h" #include "distributed/repartition_join_execution.h" +#include "distributed/replication_origin_session_utils.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" @@ -391,6 +392,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) ResetGlobalVariables(); ResetRelationAccessHash(); + /* Reset any local replication origin session since transaction has been aborted.*/ + ResetReplicationOriginLocalSession(); + /* empty the CitusXactCallbackContext to ensure we're not leaking memory */ MemoryContextReset(CitusXactCallbackContext); @@ -715,6 +719,8 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, SetCreateCitusTransactionLevel(0); } + /* Reset any local replication origin session since subtransaction has been aborted.*/ + ResetReplicationOriginLocalSession(); MemoryContextSwitchTo(previousContext); break; diff --git a/src/backend/distributed/utils/replication_origin_session_utils.c b/src/backend/distributed/utils/replication_origin_session_utils.c new file mode 100644 index 000000000..dbd244271 --- /dev/null +++ b/src/backend/distributed/utils/replication_origin_session_utils.c @@ -0,0 +1,239 @@ +/*------------------------------------------------------------------------- + * + * replication_origin_session_utils.c + * Functions for managing replication origin session. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "distributed/replication_origin_session_utils.h" +#include "distributed/remote_commands.h" +#include "distributed/metadata_cache.h" +#include "utils/builtins.h" +#include "miscadmin.h" + +static bool IsRemoteReplicationOriginSessionSetup(MultiConnection *connection); + +static void SetupMemoryContextResetReplicationOriginHandler(void); + +static void SetupReplicationOriginSessionHelper(bool isContexResetSetupNeeded); + +static inline bool IsLocalReplicationOriginSessionActive(void); + +PG_FUNCTION_INFO_V1(citus_internal_start_replication_origin_tracking); +PG_FUNCTION_INFO_V1(citus_internal_stop_replication_origin_tracking); +PG_FUNCTION_INFO_V1(citus_internal_is_replication_origin_tracking_active); + +/* + * This variable is used to remember the replication origin id of the current session + * before resetting it to DoNotReplicateId in SetupReplicationOriginLocalSession. + */ +static RepOriginId OriginalOriginId = InvalidRepOriginId; + +/* + * Setting that controls whether replication origin tracking is enabled + */ +bool EnableChangeDataCapture = false; + + +/* citus_internal_start_replication_origin_tracking starts a new replication origin session + * in the local node. This function is used to avoid publishing the WAL records to the + * replication slot by setting replication origin to DoNotReplicateId in WAL records. + * It remembers the previous replication origin for the current session which will be + * used to reset the replication origin to the previous value when the session ends. + */ +Datum +citus_internal_start_replication_origin_tracking(PG_FUNCTION_ARGS) +{ + if (!EnableChangeDataCapture) + { + PG_RETURN_VOID(); + } + SetupReplicationOriginSessionHelper(false); + PG_RETURN_VOID(); +} + + +/* citus_internal_stop_replication_origin_tracking ends the current replication origin session + * in the local node. This function is used to reset the replication origin to the + * earlier value of replication origin. + */ +Datum +citus_internal_stop_replication_origin_tracking(PG_FUNCTION_ARGS) +{ + ResetReplicationOriginLocalSession(); + PG_RETURN_VOID(); +} + + +/* citus_internal_is_replication_origin_tracking_active checks if the current replication origin + * session is active in the local node. + */ +Datum +citus_internal_is_replication_origin_tracking_active(PG_FUNCTION_ARGS) +{ + bool result = IsLocalReplicationOriginSessionActive(); + PG_RETURN_BOOL(result); +} + + +/* IsLocalReplicationOriginSessionActive checks if the current replication origin + * session is active in the local node. + */ +inline bool +IsLocalReplicationOriginSessionActive(void) +{ + return (replorigin_session_origin == DoNotReplicateId); +} + + +/* + * SetupMemoryContextResetReplicationOriginHandler registers a callback function + * that resets the replication origin session in case of any error for the current + * memory context. + */ +static void +SetupMemoryContextResetReplicationOriginHandler() +{ + MemoryContextCallback *replicationOriginResetCallback = palloc0( + sizeof(MemoryContextCallback)); + replicationOriginResetCallback->func = + ResetReplicationOriginLocalSessionCallbackHandler; + replicationOriginResetCallback->arg = NULL; + MemoryContextRegisterResetCallback(CurrentMemoryContext, + replicationOriginResetCallback); +} + + +/* + * SetupReplicationOriginSessionHelper sets up a new replication origin session in a + * local session. It takes an argument isContexResetSetupNeeded to decide whether + * to register a callback function that resets the replication origin session in case + * of any error for the current memory context. + */ +static void +SetupReplicationOriginSessionHelper(bool isContexResetSetupNeeded) +{ + if (!EnableChangeDataCapture) + { + return; + } + OriginalOriginId = replorigin_session_origin; + replorigin_session_origin = DoNotReplicateId; + if (isContexResetSetupNeeded) + { + SetupMemoryContextResetReplicationOriginHandler(); + } +} + + +/* + * SetupReplicationOriginLocalSession sets up a new replication origin session in a + * local session. + */ +void +SetupReplicationOriginLocalSession() +{ + SetupReplicationOriginSessionHelper(true); +} + + +/* + * ResetReplicationOriginLocalSession resets the replication origin session in a + * local node. + */ +void +ResetReplicationOriginLocalSession(void) +{ + if (replorigin_session_origin != DoNotReplicateId) + { + return; + } + + replorigin_session_origin = OriginalOriginId; +} + + +/* + * ResetReplicationOriginLocalSessionCallbackHandler is a callback function that + * resets the replication origin session in a local node. This is used to register + * with MemoryContextRegisterResetCallback to reset the replication origin session + * in case of any error for the given memory context. + */ +void +ResetReplicationOriginLocalSessionCallbackHandler(void *arg) +{ + ResetReplicationOriginLocalSession(); +} + + +/* + * SetupReplicationOriginRemoteSession sets up a new replication origin session in a + * remote session. The identifier is used to create a unique replication origin name + * for the session in the remote node. + */ +void +SetupReplicationOriginRemoteSession(MultiConnection *connection) +{ + if (!EnableChangeDataCapture) + { + return; + } + if (connection != NULL && !IsRemoteReplicationOriginSessionSetup(connection)) + { + StringInfo replicationOriginSessionSetupQuery = makeStringInfo(); + appendStringInfo(replicationOriginSessionSetupQuery, + "select pg_catalog.citus_internal_start_replication_origin_tracking();"); + ExecuteCriticalRemoteCommand(connection, + replicationOriginSessionSetupQuery->data); + connection->isReplicationOriginSessionSetup = true; + } +} + + +/* + * ResetReplicationOriginRemoteSession resets the replication origin session in a + * remote node. + */ +void +ResetReplicationOriginRemoteSession(MultiConnection *connection) +{ + if (connection != NULL && connection->isReplicationOriginSessionSetup) + { + StringInfo replicationOriginSessionResetQuery = makeStringInfo(); + appendStringInfo(replicationOriginSessionResetQuery, + "select pg_catalog.citus_internal_stop_replication_origin_tracking();"); + ExecuteCriticalRemoteCommand(connection, + replicationOriginSessionResetQuery->data); + connection->isReplicationOriginSessionSetup = false; + } +} + + +/* + * IsRemoteReplicationOriginSessionSetup checks if the replication origin is setup + * already in the remote session by calliing the UDF + * citus_internal_is_replication_origin_tracking_active(). This is also remembered + * in the connection object to avoid calling the UDF again next time. + */ +static bool +IsRemoteReplicationOriginSessionSetup(MultiConnection *connection) +{ + if (connection->isReplicationOriginSessionSetup) + { + return true; + } + + StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo(); + appendStringInfo(isReplicationOriginSessionSetupQuery, + "SELECT pg_catalog.citus_internal_is_replication_origin_tracking_active()"); + bool result = + ExecuteRemoteCommandAndCheckResult(connection, + isReplicationOriginSessionSetupQuery->data, + "t"); + + connection->isReplicationOriginSessionSetup = result; + return result; +} diff --git a/src/include/distributed/cdc_decoder.h b/src/include/distributed/cdc_decoder.h new file mode 100644 index 000000000..391a3dece --- /dev/null +++ b/src/include/distributed/cdc_decoder.h @@ -0,0 +1,27 @@ +/*------------------------------------------------------------------------- + * + * cdc_decoder..h + * Utility functions and declerations for cdc decoder. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_CDC_DECODER_H +#define CITUS_CDC_DECODER_H + +#include "postgres.h" +#include "fmgr.h" +#include "replication/logical.h" + + +void PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); + +void InitCDCDecoder(OutputPluginCallbacks *cb, LogicalDecodeChangeCB changeCB); + +/* used in the replication_origin_filter_cb function. */ +#define InvalidRepOriginId 0 + +#endif /* CITUS_CDC_DECODER_H */ diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index 13d589a3a..689725e70 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -133,6 +133,12 @@ typedef struct CitusCopyDestReceiver /* if true, should copy to local placements in the current session */ bool shouldUseLocalCopy; + /* + * if true, the data from this dest receiver should be published for CDC clients. + * This is set tot false for internal transfers like shard split/move/rebalance etc. + */ + bool isPublishable; + /* * Copy into colocated intermediate result. When this is set, the * COPY assumes there are hypothetical colocated shards to the @@ -161,7 +167,8 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, List *columnNameList, int partitionColumnIndex, EState *executorState, - char *intermediateResultPrefix); + char *intermediateResultPrefix, + bool isPublishable); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 4ffb83a86..cfd6de499 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -173,6 +173,9 @@ typedef struct MultiConnection /* is the connection currently in use, and shouldn't be used by anything else */ bool claimedExclusively; + /* is the replication origin session has already been setup for this connection. */ + bool isReplicationOriginSessionSetup; + /* * Should be used to access/modify metadata. See REQUIRE_METADATA_CONNECTION for * the details. diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index f903ebe66..71cb9dad2 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -48,6 +48,8 @@ extern void ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command); extern void ExecuteRemoteCommandInConnectionList(List *nodeConnectionList, const char *command); +extern bool ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, + char *command, char *expected); extern int ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command, PGresult **result); diff --git a/src/include/distributed/replication_origin_session_utils.h b/src/include/distributed/replication_origin_session_utils.h new file mode 100644 index 000000000..e90bd8ab8 --- /dev/null +++ b/src/include/distributed/replication_origin_session_utils.h @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * replication_origin_utils.h + * Utilities related to replication origin. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef REPLICATION_ORIGIN_SESSION_UTILS_H +#define REPLICATION_ORIGIN_SESSION_UTILS_H + +#include "postgres.h" +#include "replication/origin.h" +#include "distributed/connection_management.h" + +extern void InitializeReplicationOriginSessionUtils(void); + +extern void SetupReplicationOriginRemoteSession(MultiConnection *connection); +extern void ResetReplicationOriginRemoteSession(MultiConnection *connection); + +extern void SetupReplicationOriginLocalSession(void); +extern void ResetReplicationOriginLocalSession(void); +extern void ResetReplicationOriginLocalSessionCallbackHandler(void *arg); + + +extern bool EnableChangeDataCapture; + + +#endif /* REPLICATION_ORIGIN_SESSION_UTILS_H */ diff --git a/src/test/cdc/Makefile b/src/test/cdc/Makefile new file mode 100644 index 000000000..d67fe5499 --- /dev/null +++ b/src/test/cdc/Makefile @@ -0,0 +1,45 @@ +#------------------------------------------------------------------------- +# +# Makefile for src/test/cdc +# +# Test that CDC publication works correctly. +# +#------------------------------------------------------------------------- + +subdir = src/test/cdc +top_builddir = ../../.. +include $(top_builddir)/Makefile.global + +pg_version = $(shell $(PG_CONFIG) --version 2>/dev/null) +pg_whole_version = $(shell echo "$(pg_version)"| sed -e 's/^PostgreSQL \([0-9]*\)\(\.[0-9]*\)\{0,1\}\(.*\)/\1\2/') +pg_major_version = $(shell echo "$(pg_whole_version)"| sed -e 's/^\([0-9]\{2\}\)\(.*\)/\1/') +export pg_major_version + +test_path = t/*.pl + + +# copied from pgxs/Makefile.global to use postgres' abs build dir for pg_regress +ifeq ($(enable_tap_tests),yes) + +define citus_prove_installcheck +rm -rf '$(CURDIR)'/tmp_check +$(MKDIR_P) '$(CURDIR)'/tmp_check +cd $(srcdir) && \ +TESTDIR='$(CURDIR)' \ +PATH="$(bindir):$$PATH" \ +PGPORT='6$(DEF_PGPORT)' \ +top_builddir='$(CURDIR)/$(top_builddir)' \ +PG_REGRESS='$(pgxsdir)/src/test/regress/pg_regress' \ +TEMP_CONFIG='$(CURDIR)'/postgresql.conf \ +$(PROVE) $(PG_PROVE_FLAGS) $(PROVE_FLAGS) $(if $(PROVE_TESTS),$(PROVE_TESTS),$(test_path)) +endef + +else +citus_prove_installcheck = @echo "TAP tests not enabled when postgres was compiled" +endif + +installcheck: + $(citus_prove_installcheck) + +clean distclean maintainer-clean: + rm -rf tmp_check diff --git a/src/test/cdc/postgresql.conf b/src/test/cdc/postgresql.conf new file mode 100644 index 000000000..d2d6d3efe --- /dev/null +++ b/src/test/cdc/postgresql.conf @@ -0,0 +1,2 @@ +shared_preload_libraries=citus +shared_preload_libraries='citus' diff --git a/src/test/cdc/t/001_cdc_create_distributed_table_test.pl b/src/test/cdc/t/001_cdc_create_distributed_table_test.pl new file mode 100644 index 000000000..7a562da94 --- /dev/null +++ b/src/test/cdc/t/001_cdc_create_distributed_table_test.pl @@ -0,0 +1,98 @@ +# Basic CDC test for create_distributed_table +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; + +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +# Create the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_cdc_client->safe_psql('postgres',$initial_schema); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +create_cdc_replication_slots_for_workers(\@workers); + +# Distribut the sensors table to worker nodes. +$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); + +create_cdc_publication_for_workers(\@workers,'sensors'); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Insert some data to the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' + FROM generate_series(0,10)i;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC basic test - distributed table insert data'); + + +# Update some data in the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," +UPDATE sensors + SET + eventdatetime=NOW(), + measure_data = jsonb_set(measure_data, '{val}', measureid::text::jsonb , TRUE), + measure_status = CASE + WHEN measureid % 2 = 0 + THEN 'y' + ELSE 'n' + END, + measure_comment= 'Comment:' || measureid::text;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC basic test - distributed table update data'); + +# Delete some data from the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," +DELETE FROM sensors + WHERE (measureid % 2) = 0;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC basic test - distributed table delete data'); + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/002_cdc_create_distributed_table_concurrently.pl b/src/test/cdc/t/002_cdc_create_distributed_table_concurrently.pl new file mode 100644 index 000000000..b8906421b --- /dev/null +++ b/src/test/cdc/t/002_cdc_create_distributed_table_concurrently.pl @@ -0,0 +1,100 @@ +# CDC test for create_distributed_table_concurrently +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; + +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +# Creeate the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_cdc_client->safe_psql('postgres',$initial_schema); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +create_cdc_replication_slots_for_workers(\@workers); + +# Distribut the sensors table to worker nodes. +$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table_concurrently('sensors', 'measureid');"); + +create_cdc_publication_for_workers(\@workers,'sensors'); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Insert some data to the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' + FROM generate_series(0,10)i;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC test - create_distributed_table_concurrently insert data'); + + +# Update some data in the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," +UPDATE sensors + SET + eventdatetime=NOW(), + measure_data = jsonb_set(measure_data, '{val}', measureid::text::jsonb , TRUE), + measure_status = CASE + WHEN measureid % 2 = 0 + THEN 'y' + ELSE 'n' + END, + measure_comment= 'Comment:' || measureid::text;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC test - create_distributed_table_concurrently update data'); + +# Delete some data from the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," +DELETE FROM sensors + WHERE (measureid % 2) = 0;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC test - create_distributed_table_concurrently delete data'); + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/003_cdc_parallel_insert.pl b/src/test/cdc/t/003_cdc_parallel_insert.pl new file mode 100644 index 000000000..ecc53ffac --- /dev/null +++ b/src/test/cdc/t/003_cdc_parallel_insert.pl @@ -0,0 +1,83 @@ +# CDC test for inserts during create distributed table concurrently +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + +use threads; + + +# Initialize co-ordinator node +our $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $add_local_meta_data_stmt = qq(SELECT citus_add_local_table_to_metadata('sensors');); +my $result = 0; + +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639, ""); + +# Creeate the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_coordinator->safe_psql('postgres',$add_local_meta_data_stmt); +$node_cdc_client->safe_psql('postgres',$initial_schema); + + +create_cdc_publication_and_replication_slots_for_citus_cluster($node_coordinator,\@workers,'sensors'); +connect_cdc_client_to_citus_cluster_publications($node_coordinator,\@workers,$node_cdc_client); + +#insert data into the sensors table in the coordinator node before distributing the table. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors +SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' +FROM generate_series(0,10)i;"); + +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers); + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC create_distributed_table - insert data'); + +sub create_distributed_table_thread() { + $node_coordinator->safe_psql('postgres',"SELECT create_distributed_table_concurrently('sensors', 'measureid');"); +} + +sub insert_data_into_distributed_table_thread() { + # Insert some data to the sensors table in the coordinator node. + $node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' + FROM generate_series(-10,-1)i;"); +} + +# Create the distributed table concurrently in a separate thread. +my $thr_create = threads->create(\&create_distributed_table_thread); +my $thr_insert = threads->create(\&insert_data_into_distributed_table_thread); +$thr_create->join(); +$thr_insert->join(); + +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers); +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC create_distributed_table - insert data'); + + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/004_cdc_move_shard.pl b/src/test/cdc/t/004_cdc_move_shard.pl new file mode 100644 index 000000000..73045f4c3 --- /dev/null +++ b/src/test/cdc/t/004_cdc_move_shard.pl @@ -0,0 +1,94 @@ +# Schema change CDC test for Citus +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; + +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +print("coordinator port: " . $node_coordinator->port() . "\n"); +print("worker0 port:" . $workers[0]->port() . "\n"); +print("worker1 port:" . $workers[1]->port() . "\n"); +print("cdc_client port:" .$node_cdc_client->port() . "\n"); + +# Creeate the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_cdc_client->safe_psql('postgres',$initial_schema); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +create_cdc_replication_slots_for_workers(\@workers); + +#insert data into the sensors table in the coordinator node before distributing the table. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors +SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' +FROM generate_series(0,100)i;"); + +$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');"); + +create_cdc_publication_for_workers(\@workers,'sensors'); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC create_distributed_table - schema change before move'); + + + +my $shard_to_move = $node_coordinator->safe_psql('postgres', + "SELECT shardid FROM citus_shards ORDER BY shardid LIMIT 1;"); +my $host1 = $node_coordinator->safe_psql('postgres', + "SELECT nodename FROM citus_shards ORDER BY shardid LIMIT 1;"); +my $port1 = $node_coordinator->safe_psql('postgres', + "SELECT nodeport FROM citus_shards ORDER BY shardid LIMIT 1;"); + +my $shard_last = $node_coordinator->safe_psql('postgres', + "SELECT shardid FROM citus_shards ORDER BY shardid DESC LIMIT 1;"); +my $host2 = $node_coordinator->safe_psql('postgres', + "SELECT nodename FROM citus_shards ORDER BY shardid DESC LIMIT 1;"); +my $port2 = $node_coordinator->safe_psql('postgres', + "SELECT nodeport FROM citus_shards ORDER BY shardid DESC LIMIT 1;"); + +my $move_params = "select citus_move_shard_placement($shard_to_move,'$host1',$port1,'$host2',$port2,'force_logical');"; +print("move_params: $move_params\n"); +$node_coordinator->safe_psql('postgres',$move_params); + +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers); + + +#wait_for_cdc_client_to_catch_up_with_workers(\@workers); +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC create_distributed_table - schema change and move shard'); + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/005_cdc_reference_table_test.pl b/src/test/cdc/t/005_cdc_reference_table_test.pl new file mode 100644 index 000000000..5858b9ecd --- /dev/null +++ b/src/test/cdc/t/005_cdc_reference_table_test.pl @@ -0,0 +1,52 @@ +# Basic CDC test for create_distributed_table +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; +my $ref_select_stmt = qq(SELECT * FROM reference_table ORDER BY measureid;); + +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +$node_coordinator->safe_psql('postgres',"CREATE TABLE reference_table(measureid integer PRIMARY KEY);"); +$node_cdc_client->safe_psql('postgres',"CREATE TABLE reference_table(measureid integer PRIMARY KEY);"); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'reference_table'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +# Create the reference table in the coordinator and cdc client nodes. +$node_coordinator->safe_psql('postgres',"SELECT create_reference_table('reference_table');"); + +create_cdc_replication_slots_for_workers(\@workers); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); + +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$ref_select_stmt); +is($result, 1, 'CDC reference taable test 1'); + + +# Insert data to the reference table in the coordinator node. +$node_coordinator->safe_psql('postgres',"INSERT INTO reference_table SELECT i FROM generate_series(0,100)i;"); + +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$ref_select_stmt); +is($result, 1, 'CDC reference taable test 2'); + + +$node_coordinator->safe_psql('postgres',"INSERT INTO reference_table SELECT i FROM generate_series(101,200)i;"); + +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$ref_select_stmt); +is($result, 1, 'CDC reference taable test 3'); + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/006_cdc_schema_change_and_move.pl b/src/test/cdc/t/006_cdc_schema_change_and_move.pl new file mode 100644 index 000000000..9a05d977a --- /dev/null +++ b/src/test/cdc/t/006_cdc_schema_change_and_move.pl @@ -0,0 +1,127 @@ +# Schema change CDC test for Citus +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $select_stmt_after_drop = qq(SELECT measureid, eventdatetime, measure_data, measure_status, measure_comment FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; + +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +print("coordinator port: " . $node_coordinator->port() . "\n"); +print("worker0 port:" . $workers[0]->port() . "\n"); +print("worker1 port:" . $workers[1]->port() . "\n"); +print("cdc_client port:" .$node_cdc_client->port() . "\n"); + +# Creeate the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_cdc_client->safe_psql('postgres',$initial_schema); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +#insert data into the sensors table in the coordinator node before distributing the table. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors +SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' +FROM generate_series(0,100)i;"); + +$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');"); + +#connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +create_cdc_publication_and_slots_for_workers(\@workers,'sensors'); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC create_distributed_table - schema change before move'); + + +$node_coordinator->safe_psql('postgres',"ALTER TABLE sensors DROP COLUMN meaure_quantity;"); + + +my $shard_to_move = $node_coordinator->safe_psql('postgres', + "SELECT shardid FROM citus_shards ORDER BY shardid LIMIT 1;"); +my $host1 = $node_coordinator->safe_psql('postgres', + "SELECT nodename FROM citus_shards ORDER BY shardid LIMIT 1;"); +my $port1 = $node_coordinator->safe_psql('postgres', + "SELECT nodeport FROM citus_shards ORDER BY shardid LIMIT 1;"); + +my $shard_last = $node_coordinator->safe_psql('postgres', + "SELECT shardid FROM citus_shards ORDER BY shardid DESC LIMIT 1;"); +my $host2 = $node_coordinator->safe_psql('postgres', + "SELECT nodename FROM citus_shards ORDER BY shardid DESC LIMIT 1;"); +my $port2 = $node_coordinator->safe_psql('postgres', + "SELECT nodeport FROM citus_shards ORDER BY shardid DESC LIMIT 1;"); + +my $move_params = "select citus_move_shard_placement($shard_to_move,'$host1',$port1,'$host2',$port2,'force_logical');"; +print("move_params: $move_params\n"); +$node_coordinator->safe_psql('postgres',$move_params); + + + +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 'A', 'I <3 Citus' + FROM generate_series(-10,-1)i;"); + + +$node_cdc_client->safe_psql('postgres',"ALTER TABLE sensors DROP COLUMN meaure_quantity;"); + +wait_for_cdc_client_to_catch_up_with_workers(\@workers); +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC create_distributed_table - schema change and move shard'); + +# Update some data in the sensors table to check the schema change handling logic in CDC decoder. +$node_coordinator->safe_psql('postgres'," +UPDATE sensors + SET + measure_status = CASE + WHEN measureid % 2 = 0 + THEN 'y' + ELSE 'n' + END;"); + +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers); +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC create_distributed_table - update data after schema change'); + +# Update some data in the sensors table to check the schema change handling logic in CDC decoder. +$node_coordinator->safe_psql('postgres'," +DELETE FROM sensors + WHERE + measure_status = 'n';"); + +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers); +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC create_distributed_table - delete data after schem change'); + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/007_cdc_undistributed_table_test.pl b/src/test/cdc/t/007_cdc_undistributed_table_test.pl new file mode 100644 index 000000000..c7504ef4d --- /dev/null +++ b/src/test/cdc/t/007_cdc_undistributed_table_test.pl @@ -0,0 +1,111 @@ +# Basic CDC test for create_distributed_table +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; + +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); + +my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);"; +$node_coordinator->safe_psql('postgres',$command); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +# Create the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_cdc_client->safe_psql('postgres',$initial_schema); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +create_cdc_replication_slots_for_workers(\@workers); + +# Distribut the sensors table to worker nodes. +$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); + +create_cdc_publication_for_workers(\@workers,'sensors'); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Insert some data to the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' + FROM generate_series(0,10)i;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC basic test - distributed table insert data'); + + +# Update some data in the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," +UPDATE sensors + SET + eventdatetime=NOW(), + measure_data = jsonb_set(measure_data, '{val}', measureid::text::jsonb , TRUE), + measure_status = CASE + WHEN measureid % 2 = 0 + THEN 'y' + ELSE 'n' + END, + measure_comment= 'Comment:' || measureid::text;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC basic test - distributed table update data'); + +# Delete some data from the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," +DELETE FROM sensors + WHERE (measureid % 2) = 0;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC basic test - distributed table delete data'); + +$node_coordinator->safe_psql('postgres'," +SELECT undistribute_table('sensors',cascade_via_foreign_keys=>true);"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC basic test - distributed table delete data'); + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/008_cdc_shard_split_test.pl b/src/test/cdc/t/008_cdc_shard_split_test.pl new file mode 100644 index 000000000..ba33d6a21 --- /dev/null +++ b/src/test/cdc/t/008_cdc_shard_split_test.pl @@ -0,0 +1,84 @@ +# Basic CDC test for create_distributed_table +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; +my $citus_config = " +citus.shard_count = 2 +citus.shard_replication_factor = 1 +"; +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config); + +my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);"; +$node_coordinator->safe_psql('postgres',$command); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +# Create the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_cdc_client->safe_psql('postgres',$initial_schema); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +create_cdc_replication_slots_for_workers(\@workers); + +# Distribut the sensors table to worker nodes. + +$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); + +create_cdc_publication_for_workers(\@workers,'sensors'); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC split test - distributed table create data'); + +# Insert some data to the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' + FROM generate_series(-100,100)i;"); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC split test - distributed table insert data'); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +$node_coordinator->safe_psql('postgres'," + SELECT citus_split_shard_by_split_points(102008,ARRAY['-50'],ARRAY[1,2], 'block_writes');"); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC split test - distributed table split data'); + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/009_cdc_shard_split_test_non_blocking.pl b/src/test/cdc/t/009_cdc_shard_split_test_non_blocking.pl new file mode 100644 index 000000000..b4f4c9bec --- /dev/null +++ b/src/test/cdc/t/009_cdc_shard_split_test_non_blocking.pl @@ -0,0 +1,84 @@ +# Basic CDC test for create_distributed_table +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; +my $citus_config = " +citus.shard_count = 2 +citus.shard_replication_factor = 1 +"; +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config); + +my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);"; +$node_coordinator->safe_psql('postgres',$command); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +# Create the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_cdc_client->safe_psql('postgres',$initial_schema); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +create_cdc_replication_slots_for_workers(\@workers); + +# Distribut the sensors table to worker nodes. + +$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); + +create_cdc_publication_for_workers(\@workers,'sensors'); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC split test - distributed table create data'); + +# Insert some data to the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' + FROM generate_series(-100,100)i;"); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC split test - distributed table insert data'); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +$node_coordinator->safe_psql('postgres'," + SELECT citus_split_shard_by_split_points(102008,ARRAY['-50'],ARRAY[1,2], 'force_logical');"); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC split test - distributed table split data'); + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/010_cdc_shard_split_parallel_insert.pl b/src/test/cdc/t/010_cdc_shard_split_parallel_insert.pl new file mode 100644 index 000000000..0f3e574bc --- /dev/null +++ b/src/test/cdc/t/010_cdc_shard_split_parallel_insert.pl @@ -0,0 +1,107 @@ +# Basic CDC test for create_distributed_table +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + +use threads; + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; +my $citus_config = " +citus.shard_count = 2 +citus.shard_replication_factor = 1 +"; +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config); + +my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);"; +$node_coordinator->safe_psql('postgres',$command); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +# Create the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_cdc_client->safe_psql('postgres',$initial_schema); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +create_cdc_replication_slots_for_workers(\@workers); + +# Distribut the sensors table to worker nodes. +$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); + +create_cdc_publication_for_workers(\@workers,'sensors'); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC split test - distributed table create data'); + +# Insert some data to the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' + FROM generate_series(-100,100)i;"); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC split test - distributed table insert data'); + + +sub insert_data_into_distributed_table_thread() { + # Insert some data to the sensors table in the coordinator node. + $node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' + FROM generate_series(101,200)i;"); +} + +sub split_distributed_table_thread() { + $node_coordinator->safe_psql('postgres'," + SELECT citus_split_shard_by_split_points(102008,ARRAY['-50'],ARRAY[1,2], 'force_logical');"); +} + +# Create the distributed table concurrently in a separate thread. +my $thr_create = threads->create(\&split_distributed_table_thread); + +# Insert some data to the sensors table in the coordinator node while the table is being distributed. +my $thr_insert = threads->create(\&insert_data_into_distributed_table_thread); + +# Wait for the threads to finish. +$thr_create->join(); +$thr_insert->join(); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC split test - distributed table split data'); + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/011_cdc_alter_distributed_table.pl b/src/test/cdc/t/011_cdc_alter_distributed_table.pl new file mode 100644 index 000000000..29fc2d037 --- /dev/null +++ b/src/test/cdc/t/011_cdc_alter_distributed_table.pl @@ -0,0 +1,95 @@ +# Basic CDC test for create_distributed_table +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $add_local_meta_data_stmt = qq(SELECT citus_add_local_table_to_metadata('sensors');); +my $result = 0; +my $citus_config = " +citus.shard_count = 2 +citus.shard_replication_factor = 1 +"; +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config); + +my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);"; +$node_coordinator->safe_psql('postgres',$command); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +# Create the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_coordinator->safe_psql('postgres',$add_local_meta_data_stmt); +$node_cdc_client->safe_psql('postgres',$initial_schema); + +create_cdc_publication_and_replication_slots_for_citus_cluster($node_coordinator,\@workers,'sensors'); +connect_cdc_client_to_citus_cluster_publications($node_coordinator,\@workers,$node_cdc_client); + +# Distribut the sensors table to worker nodes. +$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table_concurrently('sensors', 'measureid');"); + +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC split test - distributed table create data'); + +# Insert some data to the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," + SELECT alter_distributed_table('sensors', shard_count:=6, cascade_to_colocated:=true);"); + +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC split test - alter distributed table '); + +#$node_cdc_client->safe_psql("postgres","alter subscription cdc_subscription refresh publication;"); +$node_cdc_client->safe_psql("postgres","alter subscription cdc_subscription_1 refresh publication;"); + + +#Drop the CDC client subscription and recreate them , since the +#alter_distributed_table has changed the Oid of the distributed table. +#So the CDC client has to create Oid to table mappings again for +#CDC to work again. +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +create_cdc_publication_and_replication_slots_for_citus_cluster($node_coordinator,\@workers,'sensors'); +connect_cdc_client_to_citus_cluster_publications($node_coordinator,\@workers,$node_cdc_client); + +# Insert some data to the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' + FROM generate_series(0,10)i;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC basic test - distributed table insert data'); + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); + +done_testing(); diff --git a/src/test/cdc/t/012_cdc_restart_test.pl b/src/test/cdc/t/012_cdc_restart_test.pl new file mode 100644 index 000000000..610c0b558 --- /dev/null +++ b/src/test/cdc/t/012_cdc_restart_test.pl @@ -0,0 +1,88 @@ +# Basic CDC test for create_distributed_table +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; + +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +# Create the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_cdc_client->safe_psql('postgres',$initial_schema); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +create_cdc_replication_slots_for_workers(\@workers); + +# Distribut the sensors table to worker nodes. +$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');"); + +create_cdc_publication_for_workers(\@workers,'sensors'); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC restart test - distributed table creation'); + + +# Insert some data to the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' + FROM generate_series(0,10)i;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC restart test - distributed table insert data'); + + +print("stopping worker 0"); +$workers[0]->stop(); +print("starting worker 0 againg.."); +$workers[0]->start(); + + +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Insert some data to the sensors table in the coordinator node. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' + FROM generate_series(11,20)i;"); + + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC restart test - distributed table after restart'); + + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/cdctestlib.pm b/src/test/cdc/t/cdctestlib.pm new file mode 100644 index 000000000..e3c72d7aa --- /dev/null +++ b/src/test/cdc/t/cdctestlib.pm @@ -0,0 +1,355 @@ +use strict; +use warnings; + +my $pg_major_version = int($ENV{'pg_major_version'}); +print("working with PG major version : $pg_major_version\n"); +if ($pg_major_version >= 15) { + eval "use PostgreSQL::Test::Cluster"; + eval "use PostgreSQL::Test::Utils"; +} else { + eval "use PostgresNode"; +} + + +#use PostgresNode; +use DBI; + +our $NODE_TYPE_COORDINATOR = 1; +our $NODE_TYPE_WORKER = 2; +our $NODE_TYPE_CDC_CLIENT = 3; + +sub compare_tables_in_different_nodes +{ + my $result = 1; + my ($node1, $node2, $dbname, $stmt) = @_; + + # Connect to the first database node + my $dbh1 = DBI->connect("dbi:Pg:" . $node1->connstr($dbname)); + + # Connect to the second database node + my $dbh2 = DBI->connect("dbi:Pg:" . $node2->connstr($dbname)); + + # Define the SQL query for the first database node + my $sth1 = $dbh1->prepare($stmt); + $sth1->execute(); + + # Define the SQL query for the second database node + my $sth2 = $dbh2->prepare($stmt); + $sth2->execute(); + + # Get the field names for the table + my @field_names = @{$sth2->{NAME}}; + + #$sth1->dump_results(); + #$sth2->dump_results(); + + our @row1, our @row2; + + # Use a cursor to iterate over the first database node's data + while (1) { + + @row1 = $sth1->fetchrow_array(); + @row2 = $sth2->fetchrow_array(); + #print("row1: @row1\n"); + #print("row2: @row2\n"); + + # Use a cursor to iterate over the second database node's data + if (@row1 and @row2) { + #print("row1: @row1\n"); + #print("row2: @row2\n"); + my $field_count_row1 = scalar @row1; + my $field_count_row2 = scalar @row2; + if ($field_count_row1 != $field_count_row2) { + print "Field count mismatch: $field_count_row1 != $field_count_row2 \n"; + print "First row: @row1\n"; + #print "Second row: @row2\n"; + for (my $i = 0; $i < scalar @row2; $i++) { + print("Field $i, field name: $field_names[$i], value: $row2[$i] \n"); + } + $result = 0; + last; + } + # Compare the data in each field in each row of the two nodes + for (my $i = 0; $i < scalar @row1; $i++) { + if ($row1[$i] ne $row2[$i]) { + print "Data mismatch in field '$field_names[$i]'\n"; + print "$row1[$i] != $row2[$i]\n"; + print "First row: @row1\n"; + print "Second row: @row2\n"; + $result = 0; + last; + } + } + } elsif (@row1 and !@row2) { + print "First node has more rows than the second node\n"; + $result = 0; + last; + } elsif (!@row1 and @row2) { + print "Second node has more rows than the first node\n"; + $result = 0; + last; + } else { + last; + } + } + + $sth1->finish(); + $sth2->finish(); + $dbh1->disconnect(); + $dbh2->disconnect(); + return $result; +} + +sub create_node { + my ($name,$node_type,$host, $port, $config) = @_; + if (!defined($config)) { + $config = "" + } + + our $node; + + if ($pg_major_version >= 15) { + $PostgreSQL::Test::Cluster::use_unix_sockets = 0; + $PostgreSQL::Test::Cluster::use_tcp = 1; + $PostgreSQL::Test::Cluster::test_pghost = 'localhost'; + my %params = ( "port" => $port, "host" => "localhost"); + $node = PostgreSQL::Test::Cluster->new($name, %params); + } else { + $PostgresNode::use_tcp = 1; + $PostgresNode::test_pghost = '127.0.0.1'; + my %params = ( "port" => $port, "host" => "localhost"); + $node = get_new_node($name, %params); + } + print("node's port:" . $node->port . "\n"); + + $port += 1; + + my $citus_config_options = " +max_connections = 100 +max_wal_senders = 100 +max_replication_slots = 100 +citus.enable_change_data_capture = on +log_statement = 'all' +citus.override_table_visibility = off"; + + if ($config ne "") { + $citus_config_options = $citus_config_options . $config + } + + my $client_config_options = " +max_connections = 100 +max_wal_senders = 100 +max_replication_slots = 100 +"; + $node->init(allows_streaming => 'logical'); + if ($node_type == $NODE_TYPE_COORDINATOR || $node_type == $NODE_TYPE_WORKER) { + $node->append_conf("postgresql.conf",$citus_config_options); + } else { + $node->append_conf("postgresql.conf",$citus_config_options); + } + + $node->start(); + + if ($node_type == $NODE_TYPE_COORDINATOR || $node_type == $NODE_TYPE_WORKER) { + $node->safe_psql('postgres', "CREATE EXTENSION citus;"); + my $value = $node->safe_psql('postgres', "SHOW citus.enable_change_data_capture;"); + print("citus.enable_change_data_capture value is $value\n") + } + + return $node; +} + +# Create a Citus cluster with the given number of workers +sub create_citus_cluster { + my ($no_workers,$host,$port,$citus_config) = @_; + my @workers = (); + my $node_coordinator; + print("citus_config :", $citus_config); + if ($citus_config ne "") { + $node_coordinator = create_node('coordinator', $NODE_TYPE_COORDINATOR,$host, $port, $citus_config); + } else { + $node_coordinator = create_node('coordinator', $NODE_TYPE_COORDINATOR,$host, $port); + } + my $coord_host = $node_coordinator->host(); + my $coord_port = $node_coordinator->port(); + $node_coordinator->safe_psql('postgres',"SELECT pg_catalog.citus_set_coordinator_host('$coord_host', $coord_port);"); + for (my $i = 0; $i < $no_workers; $i++) { + $port = $port + 1; + my $node_worker; + if ($citus_config ne "") { + $node_worker = create_node("worker$i", $NODE_TYPE_WORKER,"localhost", $port, $citus_config); + } else { + $node_worker = create_node("worker$i", $NODE_TYPE_WORKER,"localhost", $port); + } + my $node_worker_host = $node_worker->host(); + my $node_worker_port = $node_worker->port(); + $node_coordinator->safe_psql('postgres',"SELECT pg_catalog.citus_add_node('$node_worker_host', $node_worker_port);"); + push @workers, $node_worker; + } + return $node_coordinator, @workers; +} + +sub create_cdc_publication_and_replication_slots_for_citus_cluster { + my $node_coordinator = $_[0]; + my $workersref = $_[1]; + my $table_names = $_[2]; + + create_cdc_publication_and_slots_for_coordinator($node_coordinator, $table_names); + create_cdc_publication_and_slots_for_workers($workersref, $table_names); +} + +sub create_cdc_publication_and_slots_for_coordinator { + my $node_coordinator = $_[0]; + my $table_names = $_[1]; + print("node node_coordinator connstr: \n" . $node_coordinator->connstr()); + my $pub = $node_coordinator->safe_psql('postgres',"SELECT * FROM pg_publication WHERE pubname = 'cdc_publication';"); + if ($pub ne "") { + $node_coordinator->safe_psql('postgres',"DROP PUBLICATION IF EXISTS cdc_publication;"); + } + $node_coordinator->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR TABLE $table_names;"); + $node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,false)"); +} + +sub create_cdc_publication_and_slots_for_workers { + my $workersref = $_[0]; + my $table_names = $_[1]; + create_cdc_publication_for_workers($workersref, $table_names); + create_cdc_replication_slots_for_workers($workersref); +} + +sub create_cdc_publication_for_workers { + my $workersref = $_[0]; + my $table_names = $_[1]; + for (@$workersref) { + my $pub = $_->safe_psql('postgres',"SELECT * FROM pg_publication WHERE pubname = 'cdc_publication';"); + if ($pub ne "") { + $_->safe_psql('postgres',"DROP PUBLICATION IF EXISTS cdc_publication;"); + } + if ($table_names eq "all") { + $_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR ALL TABLES;"); + } else { + $_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR TABLE $table_names;"); + } + } +} + + +sub create_cdc_replication_slots_for_workers { + my $workersref = $_[0]; + for (@$workersref) { + my $slot = $_->safe_psql('postgres',"select * from pg_replication_slots where slot_name = 'cdc_replication_slot';"); + if ($slot ne "") { + $_->safe_psql('postgres',"SELECT pg_catalog.pg_drop_replication_slot('cdc_replication_slot');"); + } + $_->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,true)"); + } +} + + +sub connect_cdc_client_to_citus_cluster_publications { + my $node_coordinator = $_[0]; + my $workersref = $_[1]; + my $node_cdc_client = $_[2]; + my $num_args = scalar(@_); + + + if ($num_args > 3) { + my $copy_arg = $_[3]; + connect_cdc_client_to_coordinator_publication($node_coordinator,$node_cdc_client, $copy_arg); + } else { + connect_cdc_client_to_coordinator_publication($node_coordinator,$node_cdc_client); + } + connect_cdc_client_to_workers_publication($workersref, $node_cdc_client); +} + +sub connect_cdc_client_to_coordinator_publication { + my $node_coordinator = $_[0]; + my $node_cdc_client = $_[1]; + my $num_args = scalar(@_); + my $copy_data = ""; + if ($num_args > 2) { + my $copy_arg = $_[2]; + $copy_data = 'copy_data='. $copy_arg; + } else { + $copy_data = 'copy_data=false'; + } + + my $conn_str = $node_coordinator->connstr() . " dbname=postgres"; + my $subscription = 'cdc_subscription'; + print "creating subscription $subscription for coordinator: $conn_str\n"; + $node_cdc_client->safe_psql('postgres'," + CREATE SUBSCRIPTION $subscription + CONNECTION '$conn_str' + PUBLICATION cdc_publication + WITH ( + create_slot=false, + enabled=true, + slot_name=cdc_replication_slot," + . $copy_data. ");" + ); +} + +sub connect_cdc_client_to_workers_publication { + my $workersref = $_[0]; + my $node_cdc_client = $_[1]; + my $i = 1; + for (@$workersref) { + my $conn_str = $_->connstr() . " dbname=postgres"; + my $subscription = 'cdc_subscription_' . $i; + print "creating subscription $subscription for node$i: $conn_str\n"; + my $subscription_stmt = "CREATE SUBSCRIPTION $subscription + CONNECTION '$conn_str' + PUBLICATION cdc_publication + WITH ( + create_slot=false, + enabled=true, + slot_name=cdc_replication_slot, + copy_data=false); + "; + + $node_cdc_client->safe_psql('postgres',$subscription_stmt); + $i++; + } +} + +sub wait_for_cdc_client_to_catch_up_with_citus_cluster { + my $node_coordinator = $_[0]; + my ($workersref) = $_[1]; + + my $subscription = 'cdc_subscription'; + print "coordinator: waiting for cdc client subscription $subscription to catch up\n"; + $node_coordinator->wait_for_catchup($subscription); + wait_for_cdc_client_to_catch_up_with_workers($workersref); +} + +sub wait_for_cdc_client_to_catch_up_with_coordinator { + my $node_coordinator = $_[0]; + my $subscription = 'cdc_subscription'; + print "coordinator: waiting for cdc client subscription $subscription to catch up\n"; + $node_coordinator->wait_for_catchup($subscription); +} + +sub wait_for_cdc_client_to_catch_up_with_workers { + my ($workersref) = $_[0]; + my $i = 1; + for (@$workersref) { + my $subscription = 'cdc_subscription_' . $i; + print "node$i: waiting for cdc client subscription $subscription to catch up\n"; + $_->wait_for_catchup($subscription); + $i++; + } +} + +sub drop_cdc_client_subscriptions { + my $node = $_[0]; + my ($workersref) = $_[1]; + + $node->safe_psql('postgres',"drop subscription cdc_subscription"); + my $i = 1; + for (@$workersref) { + my $subscription = 'cdc_subscription_' . $i; + $node->safe_psql('postgres',"drop subscription " . $subscription); + $i++; + } +} + diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 5964267ec..25c9d4ecd 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +from __future__ import annotations import argparse import os diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index ead2a5b85..044a5f909 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1360,9 +1360,12 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 11.3-1 ALTER EXTENSION citus UPDATE TO '11.3-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- -(0 rows) + | function citus_internal_is_replication_origin_tracking_active() boolean + | function citus_internal_start_replication_origin_tracking() void + | function citus_internal_stop_replication_origin_tracking() void +(3 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 7cd2f63c8..d05d5a67f 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -74,7 +74,10 @@ ORDER BY 1; function citus_internal_delete_partition_metadata(regclass) function citus_internal_delete_shard_metadata(bigint) function citus_internal_global_blocked_processes() + function citus_internal_is_replication_origin_tracking_active() function citus_internal_local_blocked_processes() + function citus_internal_start_replication_origin_tracking() + function citus_internal_stop_replication_origin_tracking() function citus_internal_update_placement_metadata(bigint,integer,integer) function citus_internal_update_relation_colocation(oid,integer) function citus_is_clock_after(cluster_clock,cluster_clock) @@ -318,5 +321,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(310 rows) +(313 rows) diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index af594c1d4..544cd6ba1 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -90,6 +90,7 @@ my $workerCount = 2; my $serversAreShutdown = "TRUE"; my $usingWindows = 0; my $mitmPid = 0; +my $workerCount = 2; if ($Config{osname} eq "MSWin32") { @@ -487,6 +488,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'"); push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on"); push(@pgOptions, "citus.stat_statements_track = 'all'"); +push(@pgOptions, "citus.enable_change_data_capture=on"); # Some tests look at shards in pg_class, make sure we can usually see them: push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");