mirror of https://github.com/citusdata/citus.git
CDC implementation for Citus using Logical Replication (#6623)
Description: Implementing CDC changes using Logical Replication to avoid re-publishing events multiple times by setting up replication origin session, which will add "DoNotReplicateId" to every WAL entry. - shard splits - shard moves - create distributed table - undistribute table - alter distributed tables (for some cases) - reference table operations The citus decoder which will be decoding WAL events for CDC clients, ignores any WAL entry with replication origin that is not zero. It also maps the shard names to distributed table names.pull/6775/head
parent
616b5018a0
commit
85b8a2c7a1
|
@ -455,6 +455,10 @@ jobs:
|
|||
pg_major: << parameters.pg_major >>
|
||||
- configure
|
||||
- enable_core
|
||||
- run:
|
||||
name: 'Install DBI.pm'
|
||||
command: |
|
||||
apt-get update && apt-get install libdbi-perl && apt-get install libdbd-pg-perl
|
||||
- run:
|
||||
name: 'Run Test'
|
||||
command: |
|
||||
|
@ -889,6 +893,10 @@ workflows:
|
|||
<<: *tap-test-citus-15
|
||||
name: 'test-15_tap-columnar-freezing'
|
||||
suite: columnar_freezing
|
||||
- tap-test-citus:
|
||||
<<: *tap-test-citus-15
|
||||
name: 'test-15_tap-cdc'
|
||||
suite: cdc
|
||||
|
||||
- test-arbitrary-configs:
|
||||
name: 'test-13_check-arbitrary-configs'
|
||||
|
|
|
@ -18,7 +18,7 @@ generated_downgrade_sql_files += $(patsubst %,$(citus_abs_srcdir)/build/sql/%,$(
|
|||
DATA_built = $(generated_sql_files)
|
||||
|
||||
# directories with source files
|
||||
SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock
|
||||
SUBDIRS = . commands cdc connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock
|
||||
# enterprise modules
|
||||
SUBDIRS += replication
|
||||
|
||||
|
|
|
@ -0,0 +1,388 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* cdc_decoder.c
|
||||
* CDC Decoder plugin for Citus
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "utils/typcache.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "catalog/pg_namespace.h"
|
||||
#include "distributed/cdc_decoder.h"
|
||||
#include "distributed/relay_utility.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
||||
static LogicalDecodeChangeCB ouputPluginChangeCB;
|
||||
|
||||
|
||||
static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId
|
||||
origin_id);
|
||||
|
||||
static void TranslateChangesIfSchemaChanged(Relation relation, Relation targetRelation,
|
||||
ReorderBufferChange *change);
|
||||
|
||||
static void TranslateAndPublishRelationForCDC(LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
Relation relation,
|
||||
ReorderBufferChange *change, Oid shardId,
|
||||
Oid targetRelationid);
|
||||
|
||||
typedef struct
|
||||
{
|
||||
uint64 shardId;
|
||||
Oid distributedTableId;
|
||||
bool isReferenceTable;
|
||||
bool isNull;
|
||||
} ShardIdHashEntry;
|
||||
|
||||
static HTAB *shardToDistributedTableMap = NULL;
|
||||
|
||||
|
||||
/*
|
||||
* InitShardToDistributedTableMap initializes the hash table that is used to
|
||||
* translate the changes in the shard table to the changes in the distributed table.
|
||||
*/
|
||||
static void
|
||||
InitShardToDistributedTableMap()
|
||||
{
|
||||
HASHCTL info;
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(uint64);
|
||||
info.entrysize = sizeof(ShardIdHashEntry);
|
||||
info.hash = tag_hash;
|
||||
info.hcxt = CurrentMemoryContext;
|
||||
|
||||
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION);
|
||||
shardToDistributedTableMap = hash_create("CDC Decoder translation hash table", 1024,
|
||||
&info, hashFlags);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AddShardIdToHashTable adds the shardId to the hash table.
|
||||
*/
|
||||
static Oid
|
||||
AddShardIdToHashTable(Oid shardId, ShardIdHashEntry *entry)
|
||||
{
|
||||
entry->shardId = shardId;
|
||||
entry->distributedTableId = LookupShardRelationFromCatalog(shardId, true);
|
||||
entry->isReferenceTable = PartitionMethodViaCatalog(entry->distributedTableId) == 'n';
|
||||
return entry->distributedTableId;
|
||||
}
|
||||
|
||||
|
||||
static Oid
|
||||
LookupDistributedTableIdForShardId(Oid shardId, bool *isReferenceTable)
|
||||
{
|
||||
bool found;
|
||||
Oid distributedTableId = InvalidOid;
|
||||
ShardIdHashEntry *entry = (ShardIdHashEntry *) hash_search(shardToDistributedTableMap,
|
||||
&shardId,
|
||||
HASH_FIND | HASH_ENTER,
|
||||
&found);
|
||||
if (found)
|
||||
{
|
||||
distributedTableId = entry->distributedTableId;
|
||||
}
|
||||
else
|
||||
{
|
||||
distributedTableId = AddShardIdToHashTable(shardId, entry);
|
||||
}
|
||||
*isReferenceTable = entry->isReferenceTable;
|
||||
return distributedTableId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitCDCDecoder is called by from the shard split decoder plugin's init function.
|
||||
* It sets the call back function for filtering out changes originated from other nodes.
|
||||
* It also sets the call back function for processing the changes in ouputPluginChangeCB.
|
||||
* This function is common for both CDC and shard split decoder plugins.
|
||||
*/
|
||||
void
|
||||
InitCDCDecoder(OutputPluginCallbacks *cb, LogicalDecodeChangeCB changeCB)
|
||||
{
|
||||
elog(LOG, "Initializing CDC decoder");
|
||||
cb->filter_by_origin_cb = replication_origin_filter_cb;
|
||||
ouputPluginChangeCB = changeCB;
|
||||
|
||||
/* Initialize the hash table used for mapping shard to shell tables. */
|
||||
InitShardToDistributedTableMap();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* replication_origin_filter_cb call back function filters out publication of changes
|
||||
* originated from any other node other than the current node. This is
|
||||
* identified by the "origin_id" of the changes. The origin_id is set to
|
||||
* a non-zero value in the origin node as part of WAL replication for internal
|
||||
* operations like shard split/moves/create_distributed_table etc.
|
||||
*/
|
||||
static bool
|
||||
replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
||||
{
|
||||
return (origin_id != InvalidRepOriginId);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* This function is responsible for translating the changes in the shard table to
|
||||
* the changes in the shell table and publishing the changes as a change to the
|
||||
* distributed table so that CDD clients are not aware of the shard tables. It also
|
||||
* handles schema changes to the distributed table.
|
||||
*/
|
||||
static void
|
||||
TranslateAndPublishRelationForCDC(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change, Oid
|
||||
shardId, Oid targetRelationid)
|
||||
{
|
||||
/* Get the distributed table's relation for this shard.*/
|
||||
Relation targetRelation = RelationIdGetRelation(targetRelationid);
|
||||
|
||||
/*
|
||||
* Check if there has been a schema change (such as a dropped column), by comparing
|
||||
* the number of attributes in the shard table and the shell table.
|
||||
*/
|
||||
TranslateChangesIfSchemaChanged(relation, targetRelation, change);
|
||||
|
||||
/*
|
||||
* Publish the change to the shard table as the change in the distributed table,
|
||||
* so that the CDC client can see the change in the distributed table,
|
||||
* instead of the shard table, by calling the pgoutput's callback function.
|
||||
*/
|
||||
ouputPluginChangeCB(ctx, txn, targetRelation, change);
|
||||
RelationClose(targetRelation);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PublishChangesIfCdcSlot checks if the current slot is a CDC slot. If so, it publishes
|
||||
* the changes as the change for the distributed table instead of shard.
|
||||
* If not, it returns false. It also skips the Citus metadata tables.
|
||||
*/
|
||||
void
|
||||
PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change)
|
||||
{
|
||||
char *shardRelationName = RelationGetRelationName(relation);
|
||||
|
||||
/* Skip publishing CDC changes for any system relations in pg_catalog*/
|
||||
if (relation->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/* Check if the relation is a distributed table by checking for shard name. */
|
||||
uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true);
|
||||
|
||||
/* If this relation is not distributed, call the pgoutput's callback and return. */
|
||||
if (shardId == INVALID_SHARD_ID)
|
||||
{
|
||||
ouputPluginChangeCB(ctx, txn, relation, change);
|
||||
return;
|
||||
}
|
||||
|
||||
bool isReferenceTable = false;
|
||||
Oid distRelationId = LookupDistributedTableIdForShardId(shardId, &isReferenceTable);
|
||||
if (distRelationId == InvalidOid)
|
||||
{
|
||||
ouputPluginChangeCB(ctx, txn, relation, change);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Publish changes for reference table only from the coordinator node. */
|
||||
if (isReferenceTable && !IsCoordinator())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/* translate and publish from shard relation to distributed table relation for CDC. */
|
||||
TranslateAndPublishRelationForCDC(ctx, txn, relation, change, shardId,
|
||||
distRelationId);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetTupleForTargetSchemaForCdc returns a heap tuple with the data from sourceRelationTuple
|
||||
* to match the schema in targetRelDesc. Either or both source and target relations may have
|
||||
* dropped columns. This function handles it by adding NULL values for dropped columns in
|
||||
* target relation and skipping dropped columns in source relation. It returns a heap tuple
|
||||
* adjusted to the current schema of the target relation.
|
||||
*/
|
||||
static HeapTuple
|
||||
GetTupleForTargetSchemaForCdc(HeapTuple sourceRelationTuple,
|
||||
TupleDesc sourceRelDesc,
|
||||
TupleDesc targetRelDesc)
|
||||
{
|
||||
/* Allocate memory for sourceValues and sourceNulls arrays. */
|
||||
Datum *sourceValues = (Datum *) palloc0(sourceRelDesc->natts * sizeof(Datum));
|
||||
bool *sourceNulls = (bool *) palloc0(sourceRelDesc->natts * sizeof(bool));
|
||||
|
||||
/* Deform the source tuple to sourceValues and sourceNulls arrays. */
|
||||
heap_deform_tuple(sourceRelationTuple, sourceRelDesc, sourceValues,
|
||||
sourceNulls);
|
||||
|
||||
/* This is the next field to Read in the source relation */
|
||||
uint32 sourceIndex = 0;
|
||||
uint32 targetIndex = 0;
|
||||
|
||||
/* Allocate memory for sourceValues and sourceNulls arrays. */
|
||||
Datum *targetValues = (Datum *) palloc0(targetRelDesc->natts * sizeof(Datum));
|
||||
bool *targetNulls = (bool *) palloc0(targetRelDesc->natts * sizeof(bool));
|
||||
|
||||
/* Loop through all source and target attributes one by one and handle any dropped attributes.*/
|
||||
while (targetIndex < targetRelDesc->natts)
|
||||
{
|
||||
/* If this target attribute has been dropped, add a NULL attribute in targetValues and continue.*/
|
||||
if (TupleDescAttr(targetRelDesc, targetIndex)->attisdropped)
|
||||
{
|
||||
Datum nullDatum = (Datum) 0;
|
||||
targetValues[targetIndex] = nullDatum;
|
||||
targetNulls[targetIndex] = true;
|
||||
targetIndex++;
|
||||
}
|
||||
|
||||
/* If this source attribute has been dropped, just skip this source attribute.*/
|
||||
else if (TupleDescAttr(sourceRelDesc, sourceIndex)->attisdropped)
|
||||
{
|
||||
sourceIndex++;
|
||||
continue;
|
||||
}
|
||||
|
||||
/* If both source and target attributes are not dropped, add the attribute field to targetValues. */
|
||||
else if (sourceIndex < sourceRelDesc->natts)
|
||||
{
|
||||
targetValues[targetIndex] = sourceValues[sourceIndex];
|
||||
targetNulls[targetIndex] = sourceNulls[sourceIndex];
|
||||
sourceIndex++;
|
||||
targetIndex++;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* If there are no more source fields, add a NULL field in targetValues. */
|
||||
Datum nullDatum = (Datum) 0;
|
||||
targetValues[targetIndex] = nullDatum;
|
||||
targetNulls[targetIndex] = true;
|
||||
targetIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
/* Form a new tuple from the target values created by the above loop. */
|
||||
HeapTuple targetRelationTuple = heap_form_tuple(targetRelDesc, targetValues,
|
||||
targetNulls);
|
||||
return targetRelationTuple;
|
||||
}
|
||||
|
||||
|
||||
/* HasSchemaChanged function returns if there any schema changes between source and target relations.*/
|
||||
static bool
|
||||
HasSchemaChanged(TupleDesc sourceRelationDesc, TupleDesc targetRelationDesc)
|
||||
{
|
||||
bool hasSchemaChanged = (sourceRelationDesc->natts != targetRelationDesc->natts);
|
||||
if (hasSchemaChanged)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
for (uint32 i = 0; i < sourceRelationDesc->natts; i++)
|
||||
{
|
||||
if (TupleDescAttr(sourceRelationDesc, i)->attisdropped ||
|
||||
TupleDescAttr(targetRelationDesc, i)->attisdropped)
|
||||
{
|
||||
hasSchemaChanged = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return hasSchemaChanged;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TranslateChangesIfSchemaChanged translates the tuples ReorderBufferChange
|
||||
* if there is a schema change between source and target relations.
|
||||
*/
|
||||
static void
|
||||
TranslateChangesIfSchemaChanged(Relation sourceRelation, Relation targetRelation,
|
||||
ReorderBufferChange *change)
|
||||
{
|
||||
TupleDesc sourceRelationDesc = RelationGetDescr(sourceRelation);
|
||||
TupleDesc targetRelationDesc = RelationGetDescr(targetRelation);
|
||||
|
||||
/* if there are no changes between source and target relations, return. */
|
||||
if (!HasSchemaChanged(sourceRelationDesc, targetRelationDesc))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/* Check the ReorderBufferChange's action type and handle them accordingly.*/
|
||||
switch (change->action)
|
||||
{
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
{
|
||||
/* For insert action, only new tuple should always be translated*/
|
||||
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
|
||||
HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc(
|
||||
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
|
||||
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* For update changes both old and new tuples need to be translated for target relation
|
||||
* if the REPLICA IDENTITY is set to FULL. Otherwise, only the new tuple needs to be
|
||||
* translated for target relation.
|
||||
*/
|
||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
{
|
||||
/* For update action, new tuple should always be translated*/
|
||||
/* Get the new tuple from the ReorderBufferChange, and translate it to target relation. */
|
||||
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
|
||||
HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc(
|
||||
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
|
||||
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
|
||||
|
||||
/*
|
||||
* Format oldtuple according to the target relation. If the column values of replica
|
||||
* identiy change, then the old tuple is non-null and needs to be formatted according
|
||||
* to the target relation schema.
|
||||
*/
|
||||
if (change->data.tp.oldtuple != NULL)
|
||||
{
|
||||
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
|
||||
HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc(
|
||||
sourceRelationOldTuple,
|
||||
sourceRelationDesc,
|
||||
targetRelationDesc);
|
||||
|
||||
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
{
|
||||
/* For delete action, only old tuple should be translated*/
|
||||
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
|
||||
HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc(
|
||||
sourceRelationOldTuple,
|
||||
sourceRelationDesc,
|
||||
targetRelationDesc);
|
||||
|
||||
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
/* Do nothing for other action types. */
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -55,6 +55,7 @@
|
|||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/replication_origin_session_utils.h"
|
||||
#include "distributed/shared_library_init.h"
|
||||
#include "distributed/shard_utils.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
|
@ -406,7 +407,10 @@ UndistributeTable(TableConversionParameters *params)
|
|||
params->shardCountIsNull = true;
|
||||
TableConversionState *con = CreateTableConversion(params);
|
||||
|
||||
return ConvertTable(con);
|
||||
SetupReplicationOriginLocalSession();
|
||||
TableConversionReturn *conv = ConvertTable(con);
|
||||
ResetReplicationOriginLocalSession();
|
||||
return conv;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -2406,12 +2406,12 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
|
|||
EState *estate = CreateExecutorState();
|
||||
ExprContext *econtext = GetPerTupleExprContext(estate);
|
||||
econtext->ecxt_scantuple = slot;
|
||||
|
||||
const bool nonPublishableData = false;
|
||||
DestReceiver *copyDest =
|
||||
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
|
||||
columnNameList,
|
||||
partitionColumnIndex,
|
||||
estate, NULL);
|
||||
estate, NULL, nonPublishableData);
|
||||
|
||||
/* initialise state for writing to shards, we'll open connections on demand */
|
||||
copyDest->rStartup(copyDest, 0, tupleDescriptor);
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
#include "distributed/local_multi_copy.h"
|
||||
#include "distributed/shard_utils.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/replication_origin_session_utils.h"
|
||||
|
||||
/* managed via GUC, default is 512 kB */
|
||||
int LocalCopyFlushThresholdByte = 512 * 1024;
|
||||
|
@ -46,7 +47,7 @@ static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDes
|
|||
static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary);
|
||||
static bool ShouldSendCopyNow(StringInfo buffer);
|
||||
static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId,
|
||||
CopyStmt *copyStatement, bool isEndOfCopy);
|
||||
CopyStmt *copyStatement, bool isEndOfCopy, bool isPublishable);
|
||||
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
|
||||
|
||||
|
||||
|
@ -94,7 +95,7 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in
|
|||
bool isEndOfCopy = false;
|
||||
DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId,
|
||||
shardId,
|
||||
copyDest->copyStatement, isEndOfCopy);
|
||||
copyDest->copyStatement, isEndOfCopy, copyDest->isPublishable);
|
||||
resetStringInfo(localCopyOutState->fe_msgbuf);
|
||||
}
|
||||
}
|
||||
|
@ -133,7 +134,7 @@ FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
|
|||
}
|
||||
bool isEndOfCopy = true;
|
||||
DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId,
|
||||
copyDest->copyStatement, isEndOfCopy);
|
||||
copyDest->copyStatement, isEndOfCopy, copyDest->isPublishable);
|
||||
}
|
||||
|
||||
|
||||
|
@ -197,7 +198,7 @@ ShouldSendCopyNow(StringInfo buffer)
|
|||
*/
|
||||
static void
|
||||
DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement,
|
||||
bool isEndOfCopy)
|
||||
bool isEndOfCopy, bool isPublishable)
|
||||
{
|
||||
/*
|
||||
* Set the buffer as a global variable to allow ReadFromLocalBufferCallback
|
||||
|
@ -205,6 +206,10 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat
|
|||
* ReadFromLocalBufferCallback.
|
||||
*/
|
||||
LocalCopyBuffer = buffer;
|
||||
if (!isPublishable)
|
||||
{
|
||||
SetupReplicationOriginLocalSession();
|
||||
}
|
||||
|
||||
Oid shardOid = GetTableLocalShardOid(relationId, shardId);
|
||||
Relation shard = table_open(shardOid, RowExclusiveLock);
|
||||
|
@ -219,6 +224,10 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat
|
|||
EndCopyFrom(cstate);
|
||||
|
||||
table_close(shard, NoLock);
|
||||
if (!isPublishable)
|
||||
{
|
||||
ResetReplicationOriginLocalSession();
|
||||
}
|
||||
free_parsestate(pState);
|
||||
}
|
||||
|
||||
|
|
|
@ -85,6 +85,7 @@
|
|||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/remote_transaction.h"
|
||||
#include "distributed/replication_origin_session_utils.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/shard_pruning.h"
|
||||
#include "distributed/shared_connection_stats.h"
|
||||
|
@ -270,7 +271,8 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
|
|||
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
|
||||
HTAB *connectionStateHash,
|
||||
bool *found, bool shouldUseLocalCopy, CopyOutState
|
||||
copyOutState, bool isColocatedIntermediateResult);
|
||||
copyOutState, bool isColocatedIntermediateResult,
|
||||
bool isPublishable);
|
||||
static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash,
|
||||
ShardPlacement *placement,
|
||||
bool colocatedIntermediateResult);
|
||||
|
@ -285,7 +287,8 @@ static void InitializeCopyShardState(CopyShardState *shardState,
|
|||
uint64 shardId,
|
||||
bool canUseLocalCopy,
|
||||
CopyOutState copyOutState,
|
||||
bool colocatedIntermediateResult);
|
||||
bool colocatedIntermediateResult, bool
|
||||
isPublishable);
|
||||
static void StartPlacementStateCopyCommand(CopyPlacementState *placementState,
|
||||
CopyStmt *copyStatement,
|
||||
CopyOutState copyOutState);
|
||||
|
@ -492,9 +495,11 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletion *completionTag)
|
|||
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
|
||||
|
||||
/* set up the destination for the COPY */
|
||||
const bool publishableData = true;
|
||||
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList,
|
||||
partitionColumnIndex,
|
||||
executorState, NULL);
|
||||
executorState, NULL,
|
||||
publishableData);
|
||||
|
||||
/* if the user specified an explicit append-to_shard option, write to it */
|
||||
uint64 appendShardId = ProcessAppendToShardOption(tableId, copyStatement);
|
||||
|
@ -1934,7 +1939,7 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
|
|||
CitusCopyDestReceiver *
|
||||
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
|
||||
EState *executorState,
|
||||
char *intermediateResultIdPrefix)
|
||||
char *intermediateResultIdPrefix, bool isPublishable)
|
||||
{
|
||||
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0(
|
||||
sizeof(CitusCopyDestReceiver));
|
||||
|
@ -1953,6 +1958,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
|
|||
copyDest->executorState = executorState;
|
||||
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
|
||||
copyDest->memoryContext = CurrentMemoryContext;
|
||||
copyDest->isPublishable = isPublishable;
|
||||
|
||||
return copyDest;
|
||||
}
|
||||
|
@ -2318,7 +2324,9 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
|
|||
&cachedShardStateFound,
|
||||
copyDest->shouldUseLocalCopy,
|
||||
copyDest->copyOutState,
|
||||
isColocatedIntermediateResult);
|
||||
isColocatedIntermediateResult,
|
||||
copyDest->isPublishable);
|
||||
|
||||
if (!cachedShardStateFound)
|
||||
{
|
||||
firstTupleInShard = true;
|
||||
|
@ -2751,6 +2759,11 @@ ShutdownCopyConnectionState(CopyConnectionState *connectionState,
|
|||
if (activePlacementState != NULL)
|
||||
{
|
||||
EndPlacementStateCopyCommand(activePlacementState, copyOutState);
|
||||
if (!copyDest->isPublishable)
|
||||
{
|
||||
ResetReplicationOriginRemoteSession(
|
||||
activePlacementState->connectionState->connection);
|
||||
}
|
||||
}
|
||||
|
||||
dlist_foreach(iter, &connectionState->bufferedPlacementList)
|
||||
|
@ -2764,6 +2777,10 @@ ShutdownCopyConnectionState(CopyConnectionState *connectionState,
|
|||
SendCopyDataToPlacement(placementState->data, shardId,
|
||||
connectionState->connection);
|
||||
EndPlacementStateCopyCommand(placementState, copyOutState);
|
||||
if (!copyDest->isPublishable)
|
||||
{
|
||||
ResetReplicationOriginRemoteSession(connectionState->connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3436,7 +3453,7 @@ static CopyShardState *
|
|||
GetShardState(uint64 shardId, HTAB *shardStateHash,
|
||||
HTAB *connectionStateHash, bool *found, bool
|
||||
shouldUseLocalCopy, CopyOutState copyOutState,
|
||||
bool isColocatedIntermediateResult)
|
||||
bool isColocatedIntermediateResult, bool isPublishable)
|
||||
{
|
||||
CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId,
|
||||
HASH_ENTER, found);
|
||||
|
@ -3444,7 +3461,8 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
|
|||
{
|
||||
InitializeCopyShardState(shardState, connectionStateHash,
|
||||
shardId, shouldUseLocalCopy,
|
||||
copyOutState, isColocatedIntermediateResult);
|
||||
copyOutState, isColocatedIntermediateResult,
|
||||
isPublishable);
|
||||
}
|
||||
|
||||
return shardState;
|
||||
|
@ -3461,7 +3479,8 @@ InitializeCopyShardState(CopyShardState *shardState,
|
|||
HTAB *connectionStateHash, uint64 shardId,
|
||||
bool shouldUseLocalCopy,
|
||||
CopyOutState copyOutState,
|
||||
bool colocatedIntermediateResult)
|
||||
bool colocatedIntermediateResult,
|
||||
bool isPublishable)
|
||||
{
|
||||
ListCell *placementCell = NULL;
|
||||
int failedPlacementCount = 0;
|
||||
|
@ -3532,6 +3551,11 @@ InitializeCopyShardState(CopyShardState *shardState,
|
|||
RemoteTransactionBeginIfNecessary(connection);
|
||||
}
|
||||
|
||||
if (!isPublishable)
|
||||
{
|
||||
SetupReplicationOriginRemoteSession(connection);
|
||||
}
|
||||
|
||||
CopyPlacementState *placementState = palloc0(sizeof(CopyPlacementState));
|
||||
placementState->shardState = shardState;
|
||||
placementState->data = makeStringInfo();
|
||||
|
|
|
@ -1484,6 +1484,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
|||
* - Current cached connections is already at MaxCachedConnectionsPerWorker
|
||||
* - Connection is forced to close at the end of transaction
|
||||
* - Connection is not in OK state
|
||||
* - Connection has a replication origin setup
|
||||
* - A transaction is still in progress (usually because we are cancelling a distributed transaction)
|
||||
* - A connection reached its maximum lifetime
|
||||
*/
|
||||
|
@ -1503,6 +1504,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
|||
PQstatus(connection->pgConn) != CONNECTION_OK ||
|
||||
!RemoteTransactionIdle(connection) ||
|
||||
connection->requiresReplication ||
|
||||
connection->isReplicationOriginSessionSetup ||
|
||||
(MaxCachedConnectionLifetime >= 0 &&
|
||||
MillisecondsToTimeout(connection->connectionEstablishmentStart,
|
||||
MaxCachedConnectionLifetime) <= 0);
|
||||
|
|
|
@ -573,6 +573,47 @@ SendRemoteCommand(MultiConnection *connection, const char *command)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteRemoteCommandAndCheckResult executes the given command in the remote node and
|
||||
* checks if the result is equal to the expected result. If the result is equal to the
|
||||
* expected result, the function returns true, otherwise it returns false.
|
||||
*/
|
||||
bool
|
||||
ExecuteRemoteCommandAndCheckResult(MultiConnection *connection, char *command,
|
||||
char *expected)
|
||||
{
|
||||
if (!SendRemoteCommand(connection, command))
|
||||
{
|
||||
/* if we cannot connect, we warn and report false */
|
||||
ReportConnectionError(connection, WARNING);
|
||||
return false;
|
||||
}
|
||||
bool raiseInterrupts = true;
|
||||
PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||
|
||||
/* if remote node throws an error, we also throw an error */
|
||||
if (!IsResponseOK(queryResult))
|
||||
{
|
||||
ReportResultError(connection, queryResult, ERROR);
|
||||
}
|
||||
|
||||
StringInfo queryResultString = makeStringInfo();
|
||||
|
||||
/* Evaluate the queryResult and store it into the queryResultString */
|
||||
bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString);
|
||||
bool result = false;
|
||||
if (success && strcmp(queryResultString->data, expected) == 0)
|
||||
{
|
||||
result = true;
|
||||
}
|
||||
|
||||
PQclear(queryResult);
|
||||
ForgetResults(connection);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReadFirstColumnAsText reads the first column of result tuples from the given
|
||||
* PGresult struct and returns them in a StringInfo list.
|
||||
|
|
|
@ -409,11 +409,13 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
|||
columnNameList);
|
||||
|
||||
/* set up a DestReceiver that copies into the intermediate table */
|
||||
const bool publishableData = true;
|
||||
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
||||
columnNameList,
|
||||
partitionColumnIndex,
|
||||
executorState,
|
||||
intermediateResultIdPrefix);
|
||||
intermediateResultIdPrefix,
|
||||
publishableData);
|
||||
|
||||
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
||||
|
||||
|
@ -443,10 +445,12 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
|
|||
columnNameList);
|
||||
|
||||
/* set up a DestReceiver that copies into the distributed table */
|
||||
const bool publishableData = true;
|
||||
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
|
||||
columnNameList,
|
||||
partitionColumnIndex,
|
||||
executorState, NULL);
|
||||
executorState, NULL,
|
||||
publishableData);
|
||||
|
||||
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "distributed/relation_utils.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/local_executor.h"
|
||||
#include "distributed/replication_origin_session_utils.h"
|
||||
|
||||
/*
|
||||
* LocalCopyBuffer is used in copy callback to return the copied rows.
|
||||
|
@ -80,6 +81,7 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
|
|||
localCopyOutState);
|
||||
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);
|
||||
|
||||
|
||||
static bool
|
||||
CanUseLocalCopy(uint32_t destinationNodeId)
|
||||
{
|
||||
|
@ -103,6 +105,12 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
|
|||
NULL /* database (current) */);
|
||||
ClaimConnectionExclusively(copyDest->connection);
|
||||
|
||||
|
||||
RemoteTransactionBeginIfNecessary(copyDest->connection);
|
||||
|
||||
SetupReplicationOriginRemoteSession(copyDest->connection);
|
||||
|
||||
|
||||
StringInfo copyStatement = ConstructShardCopyStatement(
|
||||
copyDest->destinationShardFullyQualifiedName,
|
||||
copyDest->copyOutState->binary,
|
||||
|
@ -185,6 +193,8 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
|||
CopyOutState copyOutState = copyDest->copyOutState;
|
||||
if (copyDest->useLocalCopy)
|
||||
{
|
||||
/* Setup replication origin session for local copy*/
|
||||
|
||||
WriteLocalTuple(slot, copyDest);
|
||||
if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte)
|
||||
{
|
||||
|
@ -260,6 +270,11 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
|
|||
copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
|
||||
copyOutState->binary);
|
||||
copyDest->copyOutState = copyOutState;
|
||||
if (copyDest->useLocalCopy)
|
||||
{
|
||||
/* Setup replication origin session for local copy*/
|
||||
SetupReplicationOriginLocalSession();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -318,6 +333,9 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
|
|||
|
||||
PQclear(result);
|
||||
ForgetResults(copyDest->connection);
|
||||
|
||||
ResetReplicationOriginRemoteSession(copyDest->connection);
|
||||
|
||||
CloseConnection(copyDest->connection);
|
||||
}
|
||||
}
|
||||
|
@ -330,6 +348,10 @@ static void
|
|||
ShardCopyDestReceiverDestroy(DestReceiver *dest)
|
||||
{
|
||||
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest;
|
||||
if (copyDest->useLocalCopy)
|
||||
{
|
||||
ResetReplicationOriginLocalSession();
|
||||
}
|
||||
|
||||
if (copyDest->copyOutState)
|
||||
{
|
||||
|
|
|
@ -8,21 +8,27 @@
|
|||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
#include "distributed/cdc_decoder.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/shardsplit_shared_memory.h"
|
||||
#include "distributed/worker_shard_visibility.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "replication/logical.h"
|
||||
#include "utils/typcache.h"
|
||||
|
||||
#include "utils/lsyscache.h"
|
||||
#include "catalog/pg_namespace.h"
|
||||
|
||||
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
|
||||
static LogicalDecodeChangeCB pgoutputChangeCB;
|
||||
static LogicalDecodeChangeCB ouputPluginChangeCB;
|
||||
|
||||
static HTAB *SourceToDestinationShardMap = NULL;
|
||||
|
||||
/* Plugin callback */
|
||||
static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change);
|
||||
static void shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx,
|
||||
ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change);
|
||||
|
||||
/* Helper methods */
|
||||
static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation,
|
||||
|
@ -38,6 +44,22 @@ static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
|
|||
TupleDesc sourceTupleDesc,
|
||||
TupleDesc targetTupleDesc);
|
||||
|
||||
inline static bool IsShardSplitSlot(char *replicationSlotName);
|
||||
|
||||
|
||||
#define CITUS_SHARD_SLOT_PREFIX "citus_shard_"
|
||||
#define CITUS_SHARD_SLOT_PREFIX_SIZE (sizeof(CITUS_SHARD_SLOT_PREFIX) - 1)
|
||||
|
||||
/* build time macro for base decoder plugin name for CDC and Shard Split. */
|
||||
#ifndef CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME
|
||||
#define CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME "pgoutput"
|
||||
#endif
|
||||
|
||||
/* build time macro for base decoder plugin's initialization function name for CDC and Shard Split. */
|
||||
#ifndef CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME
|
||||
#define CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME "_PG_output_plugin_init"
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Postgres uses 'pgoutput' as default plugin for logical replication.
|
||||
* We want to reuse Postgres pgoutput's functionality as much as possible.
|
||||
|
@ -47,9 +69,10 @@ void
|
|||
_PG_output_plugin_init(OutputPluginCallbacks *cb)
|
||||
{
|
||||
LogicalOutputPluginInit plugin_init =
|
||||
(LogicalOutputPluginInit) (void *) load_external_function("pgoutput",
|
||||
"_PG_output_plugin_init",
|
||||
false, NULL);
|
||||
(LogicalOutputPluginInit) (void *)
|
||||
load_external_function(CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME,
|
||||
CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME,
|
||||
false, NULL);
|
||||
|
||||
if (plugin_init == NULL)
|
||||
{
|
||||
|
@ -60,25 +83,61 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
|
|||
plugin_init(cb);
|
||||
|
||||
/* actual pgoutput callback will be called with the appropriate destination shard */
|
||||
pgoutputChangeCB = cb->change_cb;
|
||||
cb->change_cb = split_change_cb;
|
||||
ouputPluginChangeCB = cb->change_cb;
|
||||
cb->change_cb = shard_split_and_cdc_change_cb;
|
||||
InitCDCDecoder(cb, ouputPluginChangeCB);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* split_change function emits the incoming tuple change
|
||||
* Check if the replication slot is for Shard split by checking for prefix.
|
||||
*/
|
||||
inline static
|
||||
bool
|
||||
IsShardSplitSlot(char *replicationSlotName)
|
||||
{
|
||||
return strncmp(replicationSlotName, CITUS_SHARD_SLOT_PREFIX,
|
||||
CITUS_SHARD_SLOT_PREFIX_SIZE) == 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* shard_split_and_cdc_change_cb function emits the incoming tuple change
|
||||
* to the appropriate destination shard.
|
||||
*/
|
||||
static void
|
||||
split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change)
|
||||
shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change)
|
||||
{
|
||||
/*
|
||||
* If Citus has not been loaded yet, pass the changes
|
||||
* through to the undrelying decoder plugin.
|
||||
*/
|
||||
if (!CitusHasBeenLoaded())
|
||||
{
|
||||
ouputPluginChangeCB(ctx, txn, relation, change);
|
||||
return;
|
||||
}
|
||||
|
||||
/* check if the relation is publishable.*/
|
||||
if (!is_publishable_relation(relation))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
char *replicationSlotName = ctx->slot->data.name.data;
|
||||
if (replicationSlotName == NULL)
|
||||
{
|
||||
elog(ERROR, "Replication slot name is NULL!");
|
||||
return;
|
||||
}
|
||||
|
||||
/* check for the internal shard split names, if not, assume the slot is for CDC. */
|
||||
if (!IsShardSplitSlot(replicationSlotName))
|
||||
{
|
||||
PublishDistributedTableChanges(ctx, txn, relation, change);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize SourceToDestinationShardMap if not already initialized.
|
||||
|
@ -198,7 +257,7 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
}
|
||||
}
|
||||
|
||||
pgoutputChangeCB(ctx, txn, targetRelation, change);
|
||||
ouputPluginChangeCB(ctx, txn, targetRelation, change);
|
||||
RelationClose(targetRelation);
|
||||
}
|
||||
|
||||
|
|
|
@ -74,6 +74,7 @@
|
|||
#include "distributed/recursive_planning.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/replication_origin_session_utils.h"
|
||||
#include "distributed/run_from_same_connection.h"
|
||||
#include "distributed/shard_cleaner.h"
|
||||
#include "distributed/shard_transfer.h"
|
||||
|
@ -1132,6 +1133,16 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.enable_change_data_capture",
|
||||
gettext_noop("Enables using replication origin tracking for change data capture"),
|
||||
NULL,
|
||||
&EnableChangeDataCapture,
|
||||
false,
|
||||
PGC_USERSET,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.enable_cluster_clock",
|
||||
gettext_noop("When users explicitly call UDF citus_get_transaction_clock() "
|
||||
|
@ -2426,7 +2437,6 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
|
||||
/* warn about config items in the citus namespace that are not registered above */
|
||||
EmitWarningsOnPlaceholders("citus");
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
-- citus--11.2-1--11.3-1
|
||||
|
||||
#include "udfs/repl_origin_helper/11.3-1.sql"
|
||||
-- bump version to 11.3-1
|
||||
|
||||
|
|
|
@ -1,2 +1,4 @@
|
|||
-- citus--11.3-1--11.2-1
|
||||
-- this is an empty downgrade path since citus--11.2-1--11.3-1.sql is empty for now
|
||||
DROP FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking();
|
||||
DROP FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking();
|
||||
DROP FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active();
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_internal_start_replication_origin_tracking$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking()
|
||||
IS 'To start replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_internal_stop_replication_origin_tracking$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking()
|
||||
IS 'To stop replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active()
|
||||
RETURNS boolean
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_internal_is_replication_origin_tracking_active$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active()
|
||||
IS 'To check if replication origin tracking is active for skipping publishing of duplicated events during internal data movements for CDC';
|
|
@ -0,0 +1,20 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_internal_start_replication_origin_tracking$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking()
|
||||
IS 'To start replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_internal_stop_replication_origin_tracking$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking()
|
||||
IS 'To stop replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active()
|
||||
RETURNS boolean
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_internal_is_replication_origin_tracking_active$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active()
|
||||
IS 'To check if replication origin tracking is active for skipping publishing of duplicated events during internal data movements for CDC';
|
|
@ -34,6 +34,7 @@
|
|||
#include "distributed/multi_logical_replication.h"
|
||||
#include "distributed/multi_explain.h"
|
||||
#include "distributed/repartition_join_execution.h"
|
||||
#include "distributed/replication_origin_session_utils.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
|
@ -391,6 +392,9 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
ResetGlobalVariables();
|
||||
ResetRelationAccessHash();
|
||||
|
||||
/* Reset any local replication origin session since transaction has been aborted.*/
|
||||
ResetReplicationOriginLocalSession();
|
||||
|
||||
/* empty the CitusXactCallbackContext to ensure we're not leaking memory */
|
||||
MemoryContextReset(CitusXactCallbackContext);
|
||||
|
||||
|
@ -715,6 +719,8 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
|||
SetCreateCitusTransactionLevel(0);
|
||||
}
|
||||
|
||||
/* Reset any local replication origin session since subtransaction has been aborted.*/
|
||||
ResetReplicationOriginLocalSession();
|
||||
MemoryContextSwitchTo(previousContext);
|
||||
|
||||
break;
|
||||
|
|
|
@ -0,0 +1,239 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* replication_origin_session_utils.c
|
||||
* Functions for managing replication origin session.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "distributed/replication_origin_session_utils.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
static bool IsRemoteReplicationOriginSessionSetup(MultiConnection *connection);
|
||||
|
||||
static void SetupMemoryContextResetReplicationOriginHandler(void);
|
||||
|
||||
static void SetupReplicationOriginSessionHelper(bool isContexResetSetupNeeded);
|
||||
|
||||
static inline bool IsLocalReplicationOriginSessionActive(void);
|
||||
|
||||
PG_FUNCTION_INFO_V1(citus_internal_start_replication_origin_tracking);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_stop_replication_origin_tracking);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_is_replication_origin_tracking_active);
|
||||
|
||||
/*
|
||||
* This variable is used to remember the replication origin id of the current session
|
||||
* before resetting it to DoNotReplicateId in SetupReplicationOriginLocalSession.
|
||||
*/
|
||||
static RepOriginId OriginalOriginId = InvalidRepOriginId;
|
||||
|
||||
/*
|
||||
* Setting that controls whether replication origin tracking is enabled
|
||||
*/
|
||||
bool EnableChangeDataCapture = false;
|
||||
|
||||
|
||||
/* citus_internal_start_replication_origin_tracking starts a new replication origin session
|
||||
* in the local node. This function is used to avoid publishing the WAL records to the
|
||||
* replication slot by setting replication origin to DoNotReplicateId in WAL records.
|
||||
* It remembers the previous replication origin for the current session which will be
|
||||
* used to reset the replication origin to the previous value when the session ends.
|
||||
*/
|
||||
Datum
|
||||
citus_internal_start_replication_origin_tracking(PG_FUNCTION_ARGS)
|
||||
{
|
||||
if (!EnableChangeDataCapture)
|
||||
{
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
SetupReplicationOriginSessionHelper(false);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/* citus_internal_stop_replication_origin_tracking ends the current replication origin session
|
||||
* in the local node. This function is used to reset the replication origin to the
|
||||
* earlier value of replication origin.
|
||||
*/
|
||||
Datum
|
||||
citus_internal_stop_replication_origin_tracking(PG_FUNCTION_ARGS)
|
||||
{
|
||||
ResetReplicationOriginLocalSession();
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/* citus_internal_is_replication_origin_tracking_active checks if the current replication origin
|
||||
* session is active in the local node.
|
||||
*/
|
||||
Datum
|
||||
citus_internal_is_replication_origin_tracking_active(PG_FUNCTION_ARGS)
|
||||
{
|
||||
bool result = IsLocalReplicationOriginSessionActive();
|
||||
PG_RETURN_BOOL(result);
|
||||
}
|
||||
|
||||
|
||||
/* IsLocalReplicationOriginSessionActive checks if the current replication origin
|
||||
* session is active in the local node.
|
||||
*/
|
||||
inline bool
|
||||
IsLocalReplicationOriginSessionActive(void)
|
||||
{
|
||||
return (replorigin_session_origin == DoNotReplicateId);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetupMemoryContextResetReplicationOriginHandler registers a callback function
|
||||
* that resets the replication origin session in case of any error for the current
|
||||
* memory context.
|
||||
*/
|
||||
static void
|
||||
SetupMemoryContextResetReplicationOriginHandler()
|
||||
{
|
||||
MemoryContextCallback *replicationOriginResetCallback = palloc0(
|
||||
sizeof(MemoryContextCallback));
|
||||
replicationOriginResetCallback->func =
|
||||
ResetReplicationOriginLocalSessionCallbackHandler;
|
||||
replicationOriginResetCallback->arg = NULL;
|
||||
MemoryContextRegisterResetCallback(CurrentMemoryContext,
|
||||
replicationOriginResetCallback);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetupReplicationOriginSessionHelper sets up a new replication origin session in a
|
||||
* local session. It takes an argument isContexResetSetupNeeded to decide whether
|
||||
* to register a callback function that resets the replication origin session in case
|
||||
* of any error for the current memory context.
|
||||
*/
|
||||
static void
|
||||
SetupReplicationOriginSessionHelper(bool isContexResetSetupNeeded)
|
||||
{
|
||||
if (!EnableChangeDataCapture)
|
||||
{
|
||||
return;
|
||||
}
|
||||
OriginalOriginId = replorigin_session_origin;
|
||||
replorigin_session_origin = DoNotReplicateId;
|
||||
if (isContexResetSetupNeeded)
|
||||
{
|
||||
SetupMemoryContextResetReplicationOriginHandler();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetupReplicationOriginLocalSession sets up a new replication origin session in a
|
||||
* local session.
|
||||
*/
|
||||
void
|
||||
SetupReplicationOriginLocalSession()
|
||||
{
|
||||
SetupReplicationOriginSessionHelper(true);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetReplicationOriginLocalSession resets the replication origin session in a
|
||||
* local node.
|
||||
*/
|
||||
void
|
||||
ResetReplicationOriginLocalSession(void)
|
||||
{
|
||||
if (replorigin_session_origin != DoNotReplicateId)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
replorigin_session_origin = OriginalOriginId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetReplicationOriginLocalSessionCallbackHandler is a callback function that
|
||||
* resets the replication origin session in a local node. This is used to register
|
||||
* with MemoryContextRegisterResetCallback to reset the replication origin session
|
||||
* in case of any error for the given memory context.
|
||||
*/
|
||||
void
|
||||
ResetReplicationOriginLocalSessionCallbackHandler(void *arg)
|
||||
{
|
||||
ResetReplicationOriginLocalSession();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetupReplicationOriginRemoteSession sets up a new replication origin session in a
|
||||
* remote session. The identifier is used to create a unique replication origin name
|
||||
* for the session in the remote node.
|
||||
*/
|
||||
void
|
||||
SetupReplicationOriginRemoteSession(MultiConnection *connection)
|
||||
{
|
||||
if (!EnableChangeDataCapture)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (connection != NULL && !IsRemoteReplicationOriginSessionSetup(connection))
|
||||
{
|
||||
StringInfo replicationOriginSessionSetupQuery = makeStringInfo();
|
||||
appendStringInfo(replicationOriginSessionSetupQuery,
|
||||
"select pg_catalog.citus_internal_start_replication_origin_tracking();");
|
||||
ExecuteCriticalRemoteCommand(connection,
|
||||
replicationOriginSessionSetupQuery->data);
|
||||
connection->isReplicationOriginSessionSetup = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ResetReplicationOriginRemoteSession resets the replication origin session in a
|
||||
* remote node.
|
||||
*/
|
||||
void
|
||||
ResetReplicationOriginRemoteSession(MultiConnection *connection)
|
||||
{
|
||||
if (connection != NULL && connection->isReplicationOriginSessionSetup)
|
||||
{
|
||||
StringInfo replicationOriginSessionResetQuery = makeStringInfo();
|
||||
appendStringInfo(replicationOriginSessionResetQuery,
|
||||
"select pg_catalog.citus_internal_stop_replication_origin_tracking();");
|
||||
ExecuteCriticalRemoteCommand(connection,
|
||||
replicationOriginSessionResetQuery->data);
|
||||
connection->isReplicationOriginSessionSetup = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsRemoteReplicationOriginSessionSetup checks if the replication origin is setup
|
||||
* already in the remote session by calliing the UDF
|
||||
* citus_internal_is_replication_origin_tracking_active(). This is also remembered
|
||||
* in the connection object to avoid calling the UDF again next time.
|
||||
*/
|
||||
static bool
|
||||
IsRemoteReplicationOriginSessionSetup(MultiConnection *connection)
|
||||
{
|
||||
if (connection->isReplicationOriginSessionSetup)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo();
|
||||
appendStringInfo(isReplicationOriginSessionSetupQuery,
|
||||
"SELECT pg_catalog.citus_internal_is_replication_origin_tracking_active()");
|
||||
bool result =
|
||||
ExecuteRemoteCommandAndCheckResult(connection,
|
||||
isReplicationOriginSessionSetupQuery->data,
|
||||
"t");
|
||||
|
||||
connection->isReplicationOriginSessionSetup = result;
|
||||
return result;
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* cdc_decoder..h
|
||||
* Utility functions and declerations for cdc decoder.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef CITUS_CDC_DECODER_H
|
||||
#define CITUS_CDC_DECODER_H
|
||||
|
||||
#include "postgres.h"
|
||||
#include "fmgr.h"
|
||||
#include "replication/logical.h"
|
||||
|
||||
|
||||
void PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
Relation relation, ReorderBufferChange *change);
|
||||
|
||||
void InitCDCDecoder(OutputPluginCallbacks *cb, LogicalDecodeChangeCB changeCB);
|
||||
|
||||
/* used in the replication_origin_filter_cb function. */
|
||||
#define InvalidRepOriginId 0
|
||||
|
||||
#endif /* CITUS_CDC_DECODER_H */
|
|
@ -133,6 +133,12 @@ typedef struct CitusCopyDestReceiver
|
|||
/* if true, should copy to local placements in the current session */
|
||||
bool shouldUseLocalCopy;
|
||||
|
||||
/*
|
||||
* if true, the data from this dest receiver should be published for CDC clients.
|
||||
* This is set tot false for internal transfers like shard split/move/rebalance etc.
|
||||
*/
|
||||
bool isPublishable;
|
||||
|
||||
/*
|
||||
* Copy into colocated intermediate result. When this is set, the
|
||||
* COPY assumes there are hypothetical colocated shards to the
|
||||
|
@ -161,7 +167,8 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
|||
List *columnNameList,
|
||||
int partitionColumnIndex,
|
||||
EState *executorState,
|
||||
char *intermediateResultPrefix);
|
||||
char *intermediateResultPrefix,
|
||||
bool isPublishable);
|
||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
|
||||
extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList);
|
||||
|
|
|
@ -173,6 +173,9 @@ typedef struct MultiConnection
|
|||
/* is the connection currently in use, and shouldn't be used by anything else */
|
||||
bool claimedExclusively;
|
||||
|
||||
/* is the replication origin session has already been setup for this connection. */
|
||||
bool isReplicationOriginSessionSetup;
|
||||
|
||||
/*
|
||||
* Should be used to access/modify metadata. See REQUIRE_METADATA_CONNECTION for
|
||||
* the details.
|
||||
|
|
|
@ -48,6 +48,8 @@ extern void ExecuteCriticalRemoteCommand(MultiConnection *connection,
|
|||
const char *command);
|
||||
extern void ExecuteRemoteCommandInConnectionList(List *nodeConnectionList,
|
||||
const char *command);
|
||||
extern bool ExecuteRemoteCommandAndCheckResult(MultiConnection *connection,
|
||||
char *command, char *expected);
|
||||
extern int ExecuteOptionalRemoteCommand(MultiConnection *connection,
|
||||
const char *command,
|
||||
PGresult **result);
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* replication_origin_utils.h
|
||||
* Utilities related to replication origin.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef REPLICATION_ORIGIN_SESSION_UTILS_H
|
||||
#define REPLICATION_ORIGIN_SESSION_UTILS_H
|
||||
|
||||
#include "postgres.h"
|
||||
#include "replication/origin.h"
|
||||
#include "distributed/connection_management.h"
|
||||
|
||||
extern void InitializeReplicationOriginSessionUtils(void);
|
||||
|
||||
extern void SetupReplicationOriginRemoteSession(MultiConnection *connection);
|
||||
extern void ResetReplicationOriginRemoteSession(MultiConnection *connection);
|
||||
|
||||
extern void SetupReplicationOriginLocalSession(void);
|
||||
extern void ResetReplicationOriginLocalSession(void);
|
||||
extern void ResetReplicationOriginLocalSessionCallbackHandler(void *arg);
|
||||
|
||||
|
||||
extern bool EnableChangeDataCapture;
|
||||
|
||||
|
||||
#endif /* REPLICATION_ORIGIN_SESSION_UTILS_H */
|
|
@ -0,0 +1,45 @@
|
|||
#-------------------------------------------------------------------------
|
||||
#
|
||||
# Makefile for src/test/cdc
|
||||
#
|
||||
# Test that CDC publication works correctly.
|
||||
#
|
||||
#-------------------------------------------------------------------------
|
||||
|
||||
subdir = src/test/cdc
|
||||
top_builddir = ../../..
|
||||
include $(top_builddir)/Makefile.global
|
||||
|
||||
pg_version = $(shell $(PG_CONFIG) --version 2>/dev/null)
|
||||
pg_whole_version = $(shell echo "$(pg_version)"| sed -e 's/^PostgreSQL \([0-9]*\)\(\.[0-9]*\)\{0,1\}\(.*\)/\1\2/')
|
||||
pg_major_version = $(shell echo "$(pg_whole_version)"| sed -e 's/^\([0-9]\{2\}\)\(.*\)/\1/')
|
||||
export pg_major_version
|
||||
|
||||
test_path = t/*.pl
|
||||
|
||||
|
||||
# copied from pgxs/Makefile.global to use postgres' abs build dir for pg_regress
|
||||
ifeq ($(enable_tap_tests),yes)
|
||||
|
||||
define citus_prove_installcheck
|
||||
rm -rf '$(CURDIR)'/tmp_check
|
||||
$(MKDIR_P) '$(CURDIR)'/tmp_check
|
||||
cd $(srcdir) && \
|
||||
TESTDIR='$(CURDIR)' \
|
||||
PATH="$(bindir):$$PATH" \
|
||||
PGPORT='6$(DEF_PGPORT)' \
|
||||
top_builddir='$(CURDIR)/$(top_builddir)' \
|
||||
PG_REGRESS='$(pgxsdir)/src/test/regress/pg_regress' \
|
||||
TEMP_CONFIG='$(CURDIR)'/postgresql.conf \
|
||||
$(PROVE) $(PG_PROVE_FLAGS) $(PROVE_FLAGS) $(if $(PROVE_TESTS),$(PROVE_TESTS),$(test_path))
|
||||
endef
|
||||
|
||||
else
|
||||
citus_prove_installcheck = @echo "TAP tests not enabled when postgres was compiled"
|
||||
endif
|
||||
|
||||
installcheck:
|
||||
$(citus_prove_installcheck)
|
||||
|
||||
clean distclean maintainer-clean:
|
||||
rm -rf tmp_check
|
|
@ -0,0 +1,2 @@
|
|||
shared_preload_libraries=citus
|
||||
shared_preload_libraries='citus'
|
|
@ -0,0 +1,98 @@
|
|||
# Basic CDC test for create_distributed_table
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
# Initialize co-ordinator node
|
||||
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $result = 0;
|
||||
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
|
||||
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
|
||||
|
||||
# Create the sensors table and ndexes.
|
||||
my $initial_schema = "
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime timestamptz,
|
||||
measure_data jsonb,
|
||||
meaure_quantity decimal(15, 2),
|
||||
measure_status char(1),
|
||||
measure_comment varchar(44),
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
|
||||
|
||||
$node_coordinator->safe_psql('postgres',$initial_schema);
|
||||
$node_cdc_client->safe_psql('postgres',$initial_schema);
|
||||
|
||||
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
|
||||
|
||||
create_cdc_replication_slots_for_workers(\@workers);
|
||||
|
||||
# Distribut the sensors table to worker nodes.
|
||||
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
|
||||
|
||||
create_cdc_publication_for_workers(\@workers,'sensors');
|
||||
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(0,10)i;");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC basic test - distributed table insert data');
|
||||
|
||||
|
||||
# Update some data in the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
UPDATE sensors
|
||||
SET
|
||||
eventdatetime=NOW(),
|
||||
measure_data = jsonb_set(measure_data, '{val}', measureid::text::jsonb , TRUE),
|
||||
measure_status = CASE
|
||||
WHEN measureid % 2 = 0
|
||||
THEN 'y'
|
||||
ELSE 'n'
|
||||
END,
|
||||
measure_comment= 'Comment:' || measureid::text;");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC basic test - distributed table update data');
|
||||
|
||||
# Delete some data from the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
DELETE FROM sensors
|
||||
WHERE (measureid % 2) = 0;");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC basic test - distributed table delete data');
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
done_testing();
|
|
@ -0,0 +1,100 @@
|
|||
# CDC test for create_distributed_table_concurrently
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
|
||||
# Initialize co-ordinator node
|
||||
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $result = 0;
|
||||
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
|
||||
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
|
||||
|
||||
# Creeate the sensors table and ndexes.
|
||||
my $initial_schema = "
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime timestamptz,
|
||||
measure_data jsonb,
|
||||
meaure_quantity decimal(15, 2),
|
||||
measure_status char(1),
|
||||
measure_comment varchar(44),
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
|
||||
|
||||
$node_coordinator->safe_psql('postgres',$initial_schema);
|
||||
$node_cdc_client->safe_psql('postgres',$initial_schema);
|
||||
|
||||
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
|
||||
|
||||
create_cdc_replication_slots_for_workers(\@workers);
|
||||
|
||||
# Distribut the sensors table to worker nodes.
|
||||
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table_concurrently('sensors', 'measureid');");
|
||||
|
||||
create_cdc_publication_for_workers(\@workers,'sensors');
|
||||
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(0,10)i;");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC test - create_distributed_table_concurrently insert data');
|
||||
|
||||
|
||||
# Update some data in the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
UPDATE sensors
|
||||
SET
|
||||
eventdatetime=NOW(),
|
||||
measure_data = jsonb_set(measure_data, '{val}', measureid::text::jsonb , TRUE),
|
||||
measure_status = CASE
|
||||
WHEN measureid % 2 = 0
|
||||
THEN 'y'
|
||||
ELSE 'n'
|
||||
END,
|
||||
measure_comment= 'Comment:' || measureid::text;");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC test - create_distributed_table_concurrently update data');
|
||||
|
||||
# Delete some data from the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
DELETE FROM sensors
|
||||
WHERE (measureid % 2) = 0;");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC test - create_distributed_table_concurrently delete data');
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
done_testing();
|
|
@ -0,0 +1,83 @@
|
|||
# CDC test for inserts during create distributed table concurrently
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
use threads;
|
||||
|
||||
|
||||
# Initialize co-ordinator node
|
||||
our $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $add_local_meta_data_stmt = qq(SELECT citus_add_local_table_to_metadata('sensors'););
|
||||
my $result = 0;
|
||||
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
|
||||
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639, "");
|
||||
|
||||
# Creeate the sensors table and ndexes.
|
||||
my $initial_schema = "
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime timestamptz,
|
||||
measure_data jsonb,
|
||||
meaure_quantity decimal(15, 2),
|
||||
measure_status char(1),
|
||||
measure_comment varchar(44),
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
|
||||
|
||||
$node_coordinator->safe_psql('postgres',$initial_schema);
|
||||
$node_coordinator->safe_psql('postgres',$add_local_meta_data_stmt);
|
||||
$node_cdc_client->safe_psql('postgres',$initial_schema);
|
||||
|
||||
|
||||
create_cdc_publication_and_replication_slots_for_citus_cluster($node_coordinator,\@workers,'sensors');
|
||||
connect_cdc_client_to_citus_cluster_publications($node_coordinator,\@workers,$node_cdc_client);
|
||||
|
||||
#insert data into the sensors table in the coordinator node before distributing the table.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(0,10)i;");
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers);
|
||||
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC create_distributed_table - insert data');
|
||||
|
||||
sub create_distributed_table_thread() {
|
||||
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table_concurrently('sensors', 'measureid');");
|
||||
}
|
||||
|
||||
sub insert_data_into_distributed_table_thread() {
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(-10,-1)i;");
|
||||
}
|
||||
|
||||
# Create the distributed table concurrently in a separate thread.
|
||||
my $thr_create = threads->create(\&create_distributed_table_thread);
|
||||
my $thr_insert = threads->create(\&insert_data_into_distributed_table_thread);
|
||||
$thr_create->join();
|
||||
$thr_insert->join();
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers);
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC create_distributed_table - insert data');
|
||||
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
done_testing();
|
|
@ -0,0 +1,94 @@
|
|||
# Schema change CDC test for Citus
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
|
||||
# Initialize co-ordinator node
|
||||
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $result = 0;
|
||||
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
|
||||
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
|
||||
|
||||
print("coordinator port: " . $node_coordinator->port() . "\n");
|
||||
print("worker0 port:" . $workers[0]->port() . "\n");
|
||||
print("worker1 port:" . $workers[1]->port() . "\n");
|
||||
print("cdc_client port:" .$node_cdc_client->port() . "\n");
|
||||
|
||||
# Creeate the sensors table and ndexes.
|
||||
my $initial_schema = "
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime timestamptz,
|
||||
measure_data jsonb,
|
||||
meaure_quantity decimal(15, 2),
|
||||
measure_status char(1),
|
||||
measure_comment varchar(44),
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
|
||||
|
||||
$node_coordinator->safe_psql('postgres',$initial_schema);
|
||||
$node_cdc_client->safe_psql('postgres',$initial_schema);
|
||||
|
||||
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
|
||||
|
||||
create_cdc_replication_slots_for_workers(\@workers);
|
||||
|
||||
#insert data into the sensors table in the coordinator node before distributing the table.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(0,100)i;");
|
||||
|
||||
$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');");
|
||||
|
||||
create_cdc_publication_for_workers(\@workers,'sensors');
|
||||
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC create_distributed_table - schema change before move');
|
||||
|
||||
|
||||
|
||||
my $shard_to_move = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT shardid FROM citus_shards ORDER BY shardid LIMIT 1;");
|
||||
my $host1 = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT nodename FROM citus_shards ORDER BY shardid LIMIT 1;");
|
||||
my $port1 = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT nodeport FROM citus_shards ORDER BY shardid LIMIT 1;");
|
||||
|
||||
my $shard_last = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT shardid FROM citus_shards ORDER BY shardid DESC LIMIT 1;");
|
||||
my $host2 = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT nodename FROM citus_shards ORDER BY shardid DESC LIMIT 1;");
|
||||
my $port2 = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT nodeport FROM citus_shards ORDER BY shardid DESC LIMIT 1;");
|
||||
|
||||
my $move_params = "select citus_move_shard_placement($shard_to_move,'$host1',$port1,'$host2',$port2,'force_logical');";
|
||||
print("move_params: $move_params\n");
|
||||
$node_coordinator->safe_psql('postgres',$move_params);
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers);
|
||||
|
||||
|
||||
#wait_for_cdc_client_to_catch_up_with_workers(\@workers);
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC create_distributed_table - schema change and move shard');
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
done_testing();
|
|
@ -0,0 +1,52 @@
|
|||
# Basic CDC test for create_distributed_table
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
# Initialize co-ordinator node
|
||||
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $result = 0;
|
||||
my $ref_select_stmt = qq(SELECT * FROM reference_table ORDER BY measureid;);
|
||||
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
|
||||
|
||||
$node_coordinator->safe_psql('postgres',"CREATE TABLE reference_table(measureid integer PRIMARY KEY);");
|
||||
$node_cdc_client->safe_psql('postgres',"CREATE TABLE reference_table(measureid integer PRIMARY KEY);");
|
||||
|
||||
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'reference_table');
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
|
||||
|
||||
# Create the reference table in the coordinator and cdc client nodes.
|
||||
$node_coordinator->safe_psql('postgres',"SELECT create_reference_table('reference_table');");
|
||||
|
||||
create_cdc_replication_slots_for_workers(\@workers);
|
||||
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$ref_select_stmt);
|
||||
is($result, 1, 'CDC reference taable test 1');
|
||||
|
||||
|
||||
# Insert data to the reference table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"INSERT INTO reference_table SELECT i FROM generate_series(0,100)i;");
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$ref_select_stmt);
|
||||
is($result, 1, 'CDC reference taable test 2');
|
||||
|
||||
|
||||
$node_coordinator->safe_psql('postgres',"INSERT INTO reference_table SELECT i FROM generate_series(101,200)i;");
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$ref_select_stmt);
|
||||
is($result, 1, 'CDC reference taable test 3');
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
done_testing();
|
|
@ -0,0 +1,127 @@
|
|||
# Schema change CDC test for Citus
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
|
||||
# Initialize co-ordinator node
|
||||
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $select_stmt_after_drop = qq(SELECT measureid, eventdatetime, measure_data, measure_status, measure_comment FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $result = 0;
|
||||
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
|
||||
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
|
||||
|
||||
print("coordinator port: " . $node_coordinator->port() . "\n");
|
||||
print("worker0 port:" . $workers[0]->port() . "\n");
|
||||
print("worker1 port:" . $workers[1]->port() . "\n");
|
||||
print("cdc_client port:" .$node_cdc_client->port() . "\n");
|
||||
|
||||
# Creeate the sensors table and ndexes.
|
||||
my $initial_schema = "
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime timestamptz,
|
||||
measure_data jsonb,
|
||||
meaure_quantity decimal(15, 2),
|
||||
measure_status char(1),
|
||||
measure_comment varchar(44),
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
|
||||
|
||||
$node_coordinator->safe_psql('postgres',$initial_schema);
|
||||
$node_cdc_client->safe_psql('postgres',$initial_schema);
|
||||
|
||||
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
|
||||
|
||||
#insert data into the sensors table in the coordinator node before distributing the table.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(0,100)i;");
|
||||
|
||||
$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');");
|
||||
|
||||
#connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
|
||||
create_cdc_publication_and_slots_for_workers(\@workers,'sensors');
|
||||
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC create_distributed_table - schema change before move');
|
||||
|
||||
|
||||
$node_coordinator->safe_psql('postgres',"ALTER TABLE sensors DROP COLUMN meaure_quantity;");
|
||||
|
||||
|
||||
my $shard_to_move = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT shardid FROM citus_shards ORDER BY shardid LIMIT 1;");
|
||||
my $host1 = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT nodename FROM citus_shards ORDER BY shardid LIMIT 1;");
|
||||
my $port1 = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT nodeport FROM citus_shards ORDER BY shardid LIMIT 1;");
|
||||
|
||||
my $shard_last = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT shardid FROM citus_shards ORDER BY shardid DESC LIMIT 1;");
|
||||
my $host2 = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT nodename FROM citus_shards ORDER BY shardid DESC LIMIT 1;");
|
||||
my $port2 = $node_coordinator->safe_psql('postgres',
|
||||
"SELECT nodeport FROM citus_shards ORDER BY shardid DESC LIMIT 1;");
|
||||
|
||||
my $move_params = "select citus_move_shard_placement($shard_to_move,'$host1',$port1,'$host2',$port2,'force_logical');";
|
||||
print("move_params: $move_params\n");
|
||||
$node_coordinator->safe_psql('postgres',$move_params);
|
||||
|
||||
|
||||
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 'A', 'I <3 Citus'
|
||||
FROM generate_series(-10,-1)i;");
|
||||
|
||||
|
||||
$node_cdc_client->safe_psql('postgres',"ALTER TABLE sensors DROP COLUMN meaure_quantity;");
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_workers(\@workers);
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC create_distributed_table - schema change and move shard');
|
||||
|
||||
# Update some data in the sensors table to check the schema change handling logic in CDC decoder.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
UPDATE sensors
|
||||
SET
|
||||
measure_status = CASE
|
||||
WHEN measureid % 2 = 0
|
||||
THEN 'y'
|
||||
ELSE 'n'
|
||||
END;");
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers);
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC create_distributed_table - update data after schema change');
|
||||
|
||||
# Update some data in the sensors table to check the schema change handling logic in CDC decoder.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
DELETE FROM sensors
|
||||
WHERE
|
||||
measure_status = 'n';");
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers);
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC create_distributed_table - delete data after schem change');
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
done_testing();
|
|
@ -0,0 +1,111 @@
|
|||
# Basic CDC test for create_distributed_table
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
# Initialize co-ordinator node
|
||||
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $result = 0;
|
||||
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
|
||||
|
||||
my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);";
|
||||
$node_coordinator->safe_psql('postgres',$command);
|
||||
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
|
||||
|
||||
# Create the sensors table and ndexes.
|
||||
my $initial_schema = "
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime timestamptz,
|
||||
measure_data jsonb,
|
||||
meaure_quantity decimal(15, 2),
|
||||
measure_status char(1),
|
||||
measure_comment varchar(44),
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
|
||||
|
||||
$node_coordinator->safe_psql('postgres',$initial_schema);
|
||||
$node_cdc_client->safe_psql('postgres',$initial_schema);
|
||||
|
||||
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
|
||||
|
||||
create_cdc_replication_slots_for_workers(\@workers);
|
||||
|
||||
# Distribut the sensors table to worker nodes.
|
||||
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
|
||||
|
||||
create_cdc_publication_for_workers(\@workers,'sensors');
|
||||
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(0,10)i;");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC basic test - distributed table insert data');
|
||||
|
||||
|
||||
# Update some data in the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
UPDATE sensors
|
||||
SET
|
||||
eventdatetime=NOW(),
|
||||
measure_data = jsonb_set(measure_data, '{val}', measureid::text::jsonb , TRUE),
|
||||
measure_status = CASE
|
||||
WHEN measureid % 2 = 0
|
||||
THEN 'y'
|
||||
ELSE 'n'
|
||||
END,
|
||||
measure_comment= 'Comment:' || measureid::text;");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC basic test - distributed table update data');
|
||||
|
||||
# Delete some data from the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
DELETE FROM sensors
|
||||
WHERE (measureid % 2) = 0;");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC basic test - distributed table delete data');
|
||||
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
SELECT undistribute_table('sensors',cascade_via_foreign_keys=>true);");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC basic test - distributed table delete data');
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
done_testing();
|
|
@ -0,0 +1,84 @@
|
|||
# Basic CDC test for create_distributed_table
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
# Initialize co-ordinator node
|
||||
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $result = 0;
|
||||
my $citus_config = "
|
||||
citus.shard_count = 2
|
||||
citus.shard_replication_factor = 1
|
||||
";
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config);
|
||||
|
||||
my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);";
|
||||
$node_coordinator->safe_psql('postgres',$command);
|
||||
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
|
||||
|
||||
# Create the sensors table and ndexes.
|
||||
my $initial_schema = "
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime timestamptz,
|
||||
measure_data jsonb,
|
||||
meaure_quantity decimal(15, 2),
|
||||
measure_status char(1),
|
||||
measure_comment varchar(44),
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
|
||||
|
||||
$node_coordinator->safe_psql('postgres',$initial_schema);
|
||||
$node_cdc_client->safe_psql('postgres',$initial_schema);
|
||||
|
||||
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
|
||||
|
||||
create_cdc_replication_slots_for_workers(\@workers);
|
||||
|
||||
# Distribut the sensors table to worker nodes.
|
||||
|
||||
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
|
||||
|
||||
create_cdc_publication_for_workers(\@workers,'sensors');
|
||||
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC split test - distributed table create data');
|
||||
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(-100,100)i;");
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC split test - distributed table insert data');
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
SELECT citus_split_shard_by_split_points(102008,ARRAY['-50'],ARRAY[1,2], 'block_writes');");
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC split test - distributed table split data');
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
done_testing();
|
|
@ -0,0 +1,84 @@
|
|||
# Basic CDC test for create_distributed_table
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
# Initialize co-ordinator node
|
||||
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $result = 0;
|
||||
my $citus_config = "
|
||||
citus.shard_count = 2
|
||||
citus.shard_replication_factor = 1
|
||||
";
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config);
|
||||
|
||||
my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);";
|
||||
$node_coordinator->safe_psql('postgres',$command);
|
||||
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
|
||||
|
||||
# Create the sensors table and ndexes.
|
||||
my $initial_schema = "
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime timestamptz,
|
||||
measure_data jsonb,
|
||||
meaure_quantity decimal(15, 2),
|
||||
measure_status char(1),
|
||||
measure_comment varchar(44),
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
|
||||
|
||||
$node_coordinator->safe_psql('postgres',$initial_schema);
|
||||
$node_cdc_client->safe_psql('postgres',$initial_schema);
|
||||
|
||||
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
|
||||
|
||||
create_cdc_replication_slots_for_workers(\@workers);
|
||||
|
||||
# Distribut the sensors table to worker nodes.
|
||||
|
||||
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
|
||||
|
||||
create_cdc_publication_for_workers(\@workers,'sensors');
|
||||
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC split test - distributed table create data');
|
||||
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(-100,100)i;");
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC split test - distributed table insert data');
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
SELECT citus_split_shard_by_split_points(102008,ARRAY['-50'],ARRAY[1,2], 'force_logical');");
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC split test - distributed table split data');
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
done_testing();
|
|
@ -0,0 +1,107 @@
|
|||
# Basic CDC test for create_distributed_table
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
use threads;
|
||||
|
||||
# Initialize co-ordinator node
|
||||
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $result = 0;
|
||||
my $citus_config = "
|
||||
citus.shard_count = 2
|
||||
citus.shard_replication_factor = 1
|
||||
";
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config);
|
||||
|
||||
my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);";
|
||||
$node_coordinator->safe_psql('postgres',$command);
|
||||
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
|
||||
|
||||
# Create the sensors table and ndexes.
|
||||
my $initial_schema = "
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime timestamptz,
|
||||
measure_data jsonb,
|
||||
meaure_quantity decimal(15, 2),
|
||||
measure_status char(1),
|
||||
measure_comment varchar(44),
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
|
||||
|
||||
$node_coordinator->safe_psql('postgres',$initial_schema);
|
||||
$node_cdc_client->safe_psql('postgres',$initial_schema);
|
||||
|
||||
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
|
||||
|
||||
create_cdc_replication_slots_for_workers(\@workers);
|
||||
|
||||
# Distribut the sensors table to worker nodes.
|
||||
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
|
||||
|
||||
create_cdc_publication_for_workers(\@workers,'sensors');
|
||||
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC split test - distributed table create data');
|
||||
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(-100,100)i;");
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC split test - distributed table insert data');
|
||||
|
||||
|
||||
sub insert_data_into_distributed_table_thread() {
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(101,200)i;");
|
||||
}
|
||||
|
||||
sub split_distributed_table_thread() {
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
SELECT citus_split_shard_by_split_points(102008,ARRAY['-50'],ARRAY[1,2], 'force_logical');");
|
||||
}
|
||||
|
||||
# Create the distributed table concurrently in a separate thread.
|
||||
my $thr_create = threads->create(\&split_distributed_table_thread);
|
||||
|
||||
# Insert some data to the sensors table in the coordinator node while the table is being distributed.
|
||||
my $thr_insert = threads->create(\&insert_data_into_distributed_table_thread);
|
||||
|
||||
# Wait for the threads to finish.
|
||||
$thr_create->join();
|
||||
$thr_insert->join();
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC split test - distributed table split data');
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
done_testing();
|
|
@ -0,0 +1,95 @@
|
|||
# Basic CDC test for create_distributed_table
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
# Initialize co-ordinator node
|
||||
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $add_local_meta_data_stmt = qq(SELECT citus_add_local_table_to_metadata('sensors'););
|
||||
my $result = 0;
|
||||
my $citus_config = "
|
||||
citus.shard_count = 2
|
||||
citus.shard_replication_factor = 1
|
||||
";
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config);
|
||||
|
||||
my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);";
|
||||
$node_coordinator->safe_psql('postgres',$command);
|
||||
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
|
||||
|
||||
# Create the sensors table and ndexes.
|
||||
my $initial_schema = "
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime timestamptz,
|
||||
measure_data jsonb,
|
||||
meaure_quantity decimal(15, 2),
|
||||
measure_status char(1),
|
||||
measure_comment varchar(44),
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
|
||||
|
||||
$node_coordinator->safe_psql('postgres',$initial_schema);
|
||||
$node_coordinator->safe_psql('postgres',$add_local_meta_data_stmt);
|
||||
$node_cdc_client->safe_psql('postgres',$initial_schema);
|
||||
|
||||
create_cdc_publication_and_replication_slots_for_citus_cluster($node_coordinator,\@workers,'sensors');
|
||||
connect_cdc_client_to_citus_cluster_publications($node_coordinator,\@workers,$node_cdc_client);
|
||||
|
||||
# Distribut the sensors table to worker nodes.
|
||||
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table_concurrently('sensors', 'measureid');");
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator,\@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC split test - distributed table create data');
|
||||
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
SELECT alter_distributed_table('sensors', shard_count:=6, cascade_to_colocated:=true);");
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Compare the data in the coordinator and cdc client nodes.
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC split test - alter distributed table ');
|
||||
|
||||
#$node_cdc_client->safe_psql("postgres","alter subscription cdc_subscription refresh publication;");
|
||||
$node_cdc_client->safe_psql("postgres","alter subscription cdc_subscription_1 refresh publication;");
|
||||
|
||||
|
||||
#Drop the CDC client subscription and recreate them , since the
|
||||
#alter_distributed_table has changed the Oid of the distributed table.
|
||||
#So the CDC client has to create Oid to table mappings again for
|
||||
#CDC to work again.
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
create_cdc_publication_and_replication_slots_for_citus_cluster($node_coordinator,\@workers,'sensors');
|
||||
connect_cdc_client_to_citus_cluster_publications($node_coordinator,\@workers,$node_cdc_client);
|
||||
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(0,10)i;");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC basic test - distributed table insert data');
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
|
||||
done_testing();
|
|
@ -0,0 +1,88 @@
|
|||
# Basic CDC test for create_distributed_table
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use Test::More;
|
||||
|
||||
use lib './t';
|
||||
use cdctestlib;
|
||||
|
||||
# Initialize co-ordinator node
|
||||
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
|
||||
my $result = 0;
|
||||
|
||||
### Create the citus cluster with coordinator and two worker nodes
|
||||
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
|
||||
|
||||
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
|
||||
|
||||
# Create the sensors table and ndexes.
|
||||
my $initial_schema = "
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime timestamptz,
|
||||
measure_data jsonb,
|
||||
meaure_quantity decimal(15, 2),
|
||||
measure_status char(1),
|
||||
measure_comment varchar(44),
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data));
|
||||
|
||||
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
|
||||
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
|
||||
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
|
||||
|
||||
$node_coordinator->safe_psql('postgres',$initial_schema);
|
||||
$node_cdc_client->safe_psql('postgres',$initial_schema);
|
||||
|
||||
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
|
||||
|
||||
create_cdc_replication_slots_for_workers(\@workers);
|
||||
|
||||
# Distribut the sensors table to worker nodes.
|
||||
$node_coordinator->safe_psql('postgres',"SELECT create_distributed_table('sensors', 'measureid');");
|
||||
|
||||
create_cdc_publication_for_workers(\@workers,'sensors');
|
||||
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC restart test - distributed table creation');
|
||||
|
||||
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(0,10)i;");
|
||||
|
||||
# Wait for the data changes to be replicated to the cdc client node.
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC restart test - distributed table insert data');
|
||||
|
||||
|
||||
print("stopping worker 0");
|
||||
$workers[0]->stop();
|
||||
print("starting worker 0 againg..");
|
||||
$workers[0]->start();
|
||||
|
||||
|
||||
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
|
||||
|
||||
# Insert some data to the sensors table in the coordinator node.
|
||||
$node_coordinator->safe_psql('postgres',"
|
||||
INSERT INTO sensors
|
||||
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
|
||||
FROM generate_series(11,20)i;");
|
||||
|
||||
|
||||
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
|
||||
is($result, 1, 'CDC restart test - distributed table after restart');
|
||||
|
||||
|
||||
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
|
||||
done_testing();
|
|
@ -0,0 +1,355 @@
|
|||
use strict;
|
||||
use warnings;
|
||||
|
||||
my $pg_major_version = int($ENV{'pg_major_version'});
|
||||
print("working with PG major version : $pg_major_version\n");
|
||||
if ($pg_major_version >= 15) {
|
||||
eval "use PostgreSQL::Test::Cluster";
|
||||
eval "use PostgreSQL::Test::Utils";
|
||||
} else {
|
||||
eval "use PostgresNode";
|
||||
}
|
||||
|
||||
|
||||
#use PostgresNode;
|
||||
use DBI;
|
||||
|
||||
our $NODE_TYPE_COORDINATOR = 1;
|
||||
our $NODE_TYPE_WORKER = 2;
|
||||
our $NODE_TYPE_CDC_CLIENT = 3;
|
||||
|
||||
sub compare_tables_in_different_nodes
|
||||
{
|
||||
my $result = 1;
|
||||
my ($node1, $node2, $dbname, $stmt) = @_;
|
||||
|
||||
# Connect to the first database node
|
||||
my $dbh1 = DBI->connect("dbi:Pg:" . $node1->connstr($dbname));
|
||||
|
||||
# Connect to the second database node
|
||||
my $dbh2 = DBI->connect("dbi:Pg:" . $node2->connstr($dbname));
|
||||
|
||||
# Define the SQL query for the first database node
|
||||
my $sth1 = $dbh1->prepare($stmt);
|
||||
$sth1->execute();
|
||||
|
||||
# Define the SQL query for the second database node
|
||||
my $sth2 = $dbh2->prepare($stmt);
|
||||
$sth2->execute();
|
||||
|
||||
# Get the field names for the table
|
||||
my @field_names = @{$sth2->{NAME}};
|
||||
|
||||
#$sth1->dump_results();
|
||||
#$sth2->dump_results();
|
||||
|
||||
our @row1, our @row2;
|
||||
|
||||
# Use a cursor to iterate over the first database node's data
|
||||
while (1) {
|
||||
|
||||
@row1 = $sth1->fetchrow_array();
|
||||
@row2 = $sth2->fetchrow_array();
|
||||
#print("row1: @row1\n");
|
||||
#print("row2: @row2\n");
|
||||
|
||||
# Use a cursor to iterate over the second database node's data
|
||||
if (@row1 and @row2) {
|
||||
#print("row1: @row1\n");
|
||||
#print("row2: @row2\n");
|
||||
my $field_count_row1 = scalar @row1;
|
||||
my $field_count_row2 = scalar @row2;
|
||||
if ($field_count_row1 != $field_count_row2) {
|
||||
print "Field count mismatch: $field_count_row1 != $field_count_row2 \n";
|
||||
print "First row: @row1\n";
|
||||
#print "Second row: @row2\n";
|
||||
for (my $i = 0; $i < scalar @row2; $i++) {
|
||||
print("Field $i, field name: $field_names[$i], value: $row2[$i] \n");
|
||||
}
|
||||
$result = 0;
|
||||
last;
|
||||
}
|
||||
# Compare the data in each field in each row of the two nodes
|
||||
for (my $i = 0; $i < scalar @row1; $i++) {
|
||||
if ($row1[$i] ne $row2[$i]) {
|
||||
print "Data mismatch in field '$field_names[$i]'\n";
|
||||
print "$row1[$i] != $row2[$i]\n";
|
||||
print "First row: @row1\n";
|
||||
print "Second row: @row2\n";
|
||||
$result = 0;
|
||||
last;
|
||||
}
|
||||
}
|
||||
} elsif (@row1 and !@row2) {
|
||||
print "First node has more rows than the second node\n";
|
||||
$result = 0;
|
||||
last;
|
||||
} elsif (!@row1 and @row2) {
|
||||
print "Second node has more rows than the first node\n";
|
||||
$result = 0;
|
||||
last;
|
||||
} else {
|
||||
last;
|
||||
}
|
||||
}
|
||||
|
||||
$sth1->finish();
|
||||
$sth2->finish();
|
||||
$dbh1->disconnect();
|
||||
$dbh2->disconnect();
|
||||
return $result;
|
||||
}
|
||||
|
||||
sub create_node {
|
||||
my ($name,$node_type,$host, $port, $config) = @_;
|
||||
if (!defined($config)) {
|
||||
$config = ""
|
||||
}
|
||||
|
||||
our $node;
|
||||
|
||||
if ($pg_major_version >= 15) {
|
||||
$PostgreSQL::Test::Cluster::use_unix_sockets = 0;
|
||||
$PostgreSQL::Test::Cluster::use_tcp = 1;
|
||||
$PostgreSQL::Test::Cluster::test_pghost = 'localhost';
|
||||
my %params = ( "port" => $port, "host" => "localhost");
|
||||
$node = PostgreSQL::Test::Cluster->new($name, %params);
|
||||
} else {
|
||||
$PostgresNode::use_tcp = 1;
|
||||
$PostgresNode::test_pghost = '127.0.0.1';
|
||||
my %params = ( "port" => $port, "host" => "localhost");
|
||||
$node = get_new_node($name, %params);
|
||||
}
|
||||
print("node's port:" . $node->port . "\n");
|
||||
|
||||
$port += 1;
|
||||
|
||||
my $citus_config_options = "
|
||||
max_connections = 100
|
||||
max_wal_senders = 100
|
||||
max_replication_slots = 100
|
||||
citus.enable_change_data_capture = on
|
||||
log_statement = 'all'
|
||||
citus.override_table_visibility = off";
|
||||
|
||||
if ($config ne "") {
|
||||
$citus_config_options = $citus_config_options . $config
|
||||
}
|
||||
|
||||
my $client_config_options = "
|
||||
max_connections = 100
|
||||
max_wal_senders = 100
|
||||
max_replication_slots = 100
|
||||
";
|
||||
$node->init(allows_streaming => 'logical');
|
||||
if ($node_type == $NODE_TYPE_COORDINATOR || $node_type == $NODE_TYPE_WORKER) {
|
||||
$node->append_conf("postgresql.conf",$citus_config_options);
|
||||
} else {
|
||||
$node->append_conf("postgresql.conf",$citus_config_options);
|
||||
}
|
||||
|
||||
$node->start();
|
||||
|
||||
if ($node_type == $NODE_TYPE_COORDINATOR || $node_type == $NODE_TYPE_WORKER) {
|
||||
$node->safe_psql('postgres', "CREATE EXTENSION citus;");
|
||||
my $value = $node->safe_psql('postgres', "SHOW citus.enable_change_data_capture;");
|
||||
print("citus.enable_change_data_capture value is $value\n")
|
||||
}
|
||||
|
||||
return $node;
|
||||
}
|
||||
|
||||
# Create a Citus cluster with the given number of workers
|
||||
sub create_citus_cluster {
|
||||
my ($no_workers,$host,$port,$citus_config) = @_;
|
||||
my @workers = ();
|
||||
my $node_coordinator;
|
||||
print("citus_config :", $citus_config);
|
||||
if ($citus_config ne "") {
|
||||
$node_coordinator = create_node('coordinator', $NODE_TYPE_COORDINATOR,$host, $port, $citus_config);
|
||||
} else {
|
||||
$node_coordinator = create_node('coordinator', $NODE_TYPE_COORDINATOR,$host, $port);
|
||||
}
|
||||
my $coord_host = $node_coordinator->host();
|
||||
my $coord_port = $node_coordinator->port();
|
||||
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.citus_set_coordinator_host('$coord_host', $coord_port);");
|
||||
for (my $i = 0; $i < $no_workers; $i++) {
|
||||
$port = $port + 1;
|
||||
my $node_worker;
|
||||
if ($citus_config ne "") {
|
||||
$node_worker = create_node("worker$i", $NODE_TYPE_WORKER,"localhost", $port, $citus_config);
|
||||
} else {
|
||||
$node_worker = create_node("worker$i", $NODE_TYPE_WORKER,"localhost", $port);
|
||||
}
|
||||
my $node_worker_host = $node_worker->host();
|
||||
my $node_worker_port = $node_worker->port();
|
||||
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.citus_add_node('$node_worker_host', $node_worker_port);");
|
||||
push @workers, $node_worker;
|
||||
}
|
||||
return $node_coordinator, @workers;
|
||||
}
|
||||
|
||||
sub create_cdc_publication_and_replication_slots_for_citus_cluster {
|
||||
my $node_coordinator = $_[0];
|
||||
my $workersref = $_[1];
|
||||
my $table_names = $_[2];
|
||||
|
||||
create_cdc_publication_and_slots_for_coordinator($node_coordinator, $table_names);
|
||||
create_cdc_publication_and_slots_for_workers($workersref, $table_names);
|
||||
}
|
||||
|
||||
sub create_cdc_publication_and_slots_for_coordinator {
|
||||
my $node_coordinator = $_[0];
|
||||
my $table_names = $_[1];
|
||||
print("node node_coordinator connstr: \n" . $node_coordinator->connstr());
|
||||
my $pub = $node_coordinator->safe_psql('postgres',"SELECT * FROM pg_publication WHERE pubname = 'cdc_publication';");
|
||||
if ($pub ne "") {
|
||||
$node_coordinator->safe_psql('postgres',"DROP PUBLICATION IF EXISTS cdc_publication;");
|
||||
}
|
||||
$node_coordinator->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR TABLE $table_names;");
|
||||
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,false)");
|
||||
}
|
||||
|
||||
sub create_cdc_publication_and_slots_for_workers {
|
||||
my $workersref = $_[0];
|
||||
my $table_names = $_[1];
|
||||
create_cdc_publication_for_workers($workersref, $table_names);
|
||||
create_cdc_replication_slots_for_workers($workersref);
|
||||
}
|
||||
|
||||
sub create_cdc_publication_for_workers {
|
||||
my $workersref = $_[0];
|
||||
my $table_names = $_[1];
|
||||
for (@$workersref) {
|
||||
my $pub = $_->safe_psql('postgres',"SELECT * FROM pg_publication WHERE pubname = 'cdc_publication';");
|
||||
if ($pub ne "") {
|
||||
$_->safe_psql('postgres',"DROP PUBLICATION IF EXISTS cdc_publication;");
|
||||
}
|
||||
if ($table_names eq "all") {
|
||||
$_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR ALL TABLES;");
|
||||
} else {
|
||||
$_->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR TABLE $table_names;");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
sub create_cdc_replication_slots_for_workers {
|
||||
my $workersref = $_[0];
|
||||
for (@$workersref) {
|
||||
my $slot = $_->safe_psql('postgres',"select * from pg_replication_slots where slot_name = 'cdc_replication_slot';");
|
||||
if ($slot ne "") {
|
||||
$_->safe_psql('postgres',"SELECT pg_catalog.pg_drop_replication_slot('cdc_replication_slot');");
|
||||
}
|
||||
$_->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,true)");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
sub connect_cdc_client_to_citus_cluster_publications {
|
||||
my $node_coordinator = $_[0];
|
||||
my $workersref = $_[1];
|
||||
my $node_cdc_client = $_[2];
|
||||
my $num_args = scalar(@_);
|
||||
|
||||
|
||||
if ($num_args > 3) {
|
||||
my $copy_arg = $_[3];
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator,$node_cdc_client, $copy_arg);
|
||||
} else {
|
||||
connect_cdc_client_to_coordinator_publication($node_coordinator,$node_cdc_client);
|
||||
}
|
||||
connect_cdc_client_to_workers_publication($workersref, $node_cdc_client);
|
||||
}
|
||||
|
||||
sub connect_cdc_client_to_coordinator_publication {
|
||||
my $node_coordinator = $_[0];
|
||||
my $node_cdc_client = $_[1];
|
||||
my $num_args = scalar(@_);
|
||||
my $copy_data = "";
|
||||
if ($num_args > 2) {
|
||||
my $copy_arg = $_[2];
|
||||
$copy_data = 'copy_data='. $copy_arg;
|
||||
} else {
|
||||
$copy_data = 'copy_data=false';
|
||||
}
|
||||
|
||||
my $conn_str = $node_coordinator->connstr() . " dbname=postgres";
|
||||
my $subscription = 'cdc_subscription';
|
||||
print "creating subscription $subscription for coordinator: $conn_str\n";
|
||||
$node_cdc_client->safe_psql('postgres',"
|
||||
CREATE SUBSCRIPTION $subscription
|
||||
CONNECTION '$conn_str'
|
||||
PUBLICATION cdc_publication
|
||||
WITH (
|
||||
create_slot=false,
|
||||
enabled=true,
|
||||
slot_name=cdc_replication_slot,"
|
||||
. $copy_data. ");"
|
||||
);
|
||||
}
|
||||
|
||||
sub connect_cdc_client_to_workers_publication {
|
||||
my $workersref = $_[0];
|
||||
my $node_cdc_client = $_[1];
|
||||
my $i = 1;
|
||||
for (@$workersref) {
|
||||
my $conn_str = $_->connstr() . " dbname=postgres";
|
||||
my $subscription = 'cdc_subscription_' . $i;
|
||||
print "creating subscription $subscription for node$i: $conn_str\n";
|
||||
my $subscription_stmt = "CREATE SUBSCRIPTION $subscription
|
||||
CONNECTION '$conn_str'
|
||||
PUBLICATION cdc_publication
|
||||
WITH (
|
||||
create_slot=false,
|
||||
enabled=true,
|
||||
slot_name=cdc_replication_slot,
|
||||
copy_data=false);
|
||||
";
|
||||
|
||||
$node_cdc_client->safe_psql('postgres',$subscription_stmt);
|
||||
$i++;
|
||||
}
|
||||
}
|
||||
|
||||
sub wait_for_cdc_client_to_catch_up_with_citus_cluster {
|
||||
my $node_coordinator = $_[0];
|
||||
my ($workersref) = $_[1];
|
||||
|
||||
my $subscription = 'cdc_subscription';
|
||||
print "coordinator: waiting for cdc client subscription $subscription to catch up\n";
|
||||
$node_coordinator->wait_for_catchup($subscription);
|
||||
wait_for_cdc_client_to_catch_up_with_workers($workersref);
|
||||
}
|
||||
|
||||
sub wait_for_cdc_client_to_catch_up_with_coordinator {
|
||||
my $node_coordinator = $_[0];
|
||||
my $subscription = 'cdc_subscription';
|
||||
print "coordinator: waiting for cdc client subscription $subscription to catch up\n";
|
||||
$node_coordinator->wait_for_catchup($subscription);
|
||||
}
|
||||
|
||||
sub wait_for_cdc_client_to_catch_up_with_workers {
|
||||
my ($workersref) = $_[0];
|
||||
my $i = 1;
|
||||
for (@$workersref) {
|
||||
my $subscription = 'cdc_subscription_' . $i;
|
||||
print "node$i: waiting for cdc client subscription $subscription to catch up\n";
|
||||
$_->wait_for_catchup($subscription);
|
||||
$i++;
|
||||
}
|
||||
}
|
||||
|
||||
sub drop_cdc_client_subscriptions {
|
||||
my $node = $_[0];
|
||||
my ($workersref) = $_[1];
|
||||
|
||||
$node->safe_psql('postgres',"drop subscription cdc_subscription");
|
||||
my $i = 1;
|
||||
for (@$workersref) {
|
||||
my $subscription = 'cdc_subscription_' . $i;
|
||||
$node->safe_psql('postgres',"drop subscription " . $subscription);
|
||||
$i++;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
|
|
@ -1360,9 +1360,12 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
-- Snapshot of state at 11.3-1
|
||||
ALTER EXTENSION citus UPDATE TO '11.3-1';
|
||||
SELECT * FROM multi_extension.print_extension_changes();
|
||||
previous_object | current_object
|
||||
previous_object | current_object
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
| function citus_internal_is_replication_origin_tracking_active() boolean
|
||||
| function citus_internal_start_replication_origin_tracking() void
|
||||
| function citus_internal_stop_replication_origin_tracking() void
|
||||
(3 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -74,7 +74,10 @@ ORDER BY 1;
|
|||
function citus_internal_delete_partition_metadata(regclass)
|
||||
function citus_internal_delete_shard_metadata(bigint)
|
||||
function citus_internal_global_blocked_processes()
|
||||
function citus_internal_is_replication_origin_tracking_active()
|
||||
function citus_internal_local_blocked_processes()
|
||||
function citus_internal_start_replication_origin_tracking()
|
||||
function citus_internal_stop_replication_origin_tracking()
|
||||
function citus_internal_update_placement_metadata(bigint,integer,integer)
|
||||
function citus_internal_update_relation_colocation(oid,integer)
|
||||
function citus_is_clock_after(cluster_clock,cluster_clock)
|
||||
|
@ -318,5 +321,5 @@ ORDER BY 1;
|
|||
view citus_stat_statements
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(310 rows)
|
||||
(313 rows)
|
||||
|
||||
|
|
|
@ -90,6 +90,7 @@ my $workerCount = 2;
|
|||
my $serversAreShutdown = "TRUE";
|
||||
my $usingWindows = 0;
|
||||
my $mitmPid = 0;
|
||||
my $workerCount = 2;
|
||||
|
||||
if ($Config{osname} eq "MSWin32")
|
||||
{
|
||||
|
@ -487,6 +488,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'");
|
|||
push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
|
||||
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
|
||||
push(@pgOptions, "citus.stat_statements_track = 'all'");
|
||||
push(@pgOptions, "citus.enable_change_data_capture=on");
|
||||
|
||||
# Some tests look at shards in pg_class, make sure we can usually see them:
|
||||
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");
|
||||
|
|
Loading…
Reference in New Issue