From d5df89239454d352c99f970f5027b0fdf337b4e1 Mon Sep 17 00:00:00 2001 From: rajeshkt78 <109729326+rajeshkt78@users.noreply.github.com> Date: Mon, 3 Apr 2023 21:32:15 +0530 Subject: [PATCH] Make CDC decoder an independent extension (#6810) DESCRIPTION: - The CDC decoder is refacroted into a seperate extension that can be used loaded dynamically without having to reload citus. - CDC decoder code can be compiled using DECODER flag to work with different decoders like pgoutput and wal2json. by default the base decode is "pgoutput". - the dynamic_library_path config is adjusted dynamically to prefer the decoders in cdc_decoders directory in citus init so that the users can use the replication subscription commands without having to make any config changes. --- src/backend/distributed/Makefile | 21 +- src/backend/distributed/cdc/Makefile | 26 ++ src/backend/distributed/cdc/cdc_decoder.c | 184 ++++++-- .../distributed/cdc/cdc_decoder_utils.c | 432 ++++++++++++++++++ .../distributed/cdc/cdc_decoder_utils.h} | 25 +- .../shardsplit/shardsplit_decoder.c | 70 ++- src/backend/distributed/shared_library_init.c | 31 +- src/test/cdc/postgresql.conf | 1 - .../001_cdc_create_distributed_table_test.pl | 12 + .../cdc/t/006_cdc_schema_change_and_move.pl | 1 + .../cdc/t/007_cdc_undistributed_table_test.pl | 2 +- src/test/cdc/t/008_cdc_shard_split_test.pl | 2 +- .../009_cdc_shard_split_test_non_blocking.pl | 2 +- .../t/010_cdc_shard_split_parallel_insert.pl | 2 +- .../cdc/t/011_cdc_alter_distributed_table.pl | 2 +- .../013_cdc_drop_last_column_for_one_shard.pl | 89 ++++ .../t/014_cdc_with_table_like_shard_name.pl | 90 ++++ src/test/cdc/t/015_cdc_without_citus.pl | 53 +++ src/test/cdc/t/cdctestlib.pm | 4 +- 19 files changed, 948 insertions(+), 101 deletions(-) create mode 100644 src/backend/distributed/cdc/Makefile create mode 100644 src/backend/distributed/cdc/cdc_decoder_utils.c rename src/{include/distributed/cdc_decoder.h => backend/distributed/cdc/cdc_decoder_utils.h} (50%) create mode 100644 src/test/cdc/t/013_cdc_drop_last_column_for_one_shard.pl create mode 100644 src/test/cdc/t/014_cdc_with_table_like_shard_name.pl create mode 100644 src/test/cdc/t/015_cdc_without_citus.pl diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 9c5218b35..1cefb5769 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -18,7 +18,7 @@ generated_downgrade_sql_files += $(patsubst %,$(citus_abs_srcdir)/build/sql/%,$( DATA_built = $(generated_sql_files) # directories with source files -SUBDIRS = . commands cdc connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock +SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock # enterprise modules SUBDIRS += replication @@ -32,7 +32,13 @@ OBJS += \ $(patsubst $(citus_abs_srcdir)/%.c,%.o,$(foreach dir,$(SUBDIRS), $(sort $(wildcard $(citus_abs_srcdir)/$(dir)/*.c)))) # be explicit about the default target -all: +.PHONY: cdc + +all: cdc + +cdc: + echo "running cdc make" + $(MAKE) DECODER=pgoutput -C cdc all NO_PGXS = 1 @@ -81,11 +87,19 @@ endif .PHONY: clean-full install install-downgrades install-all +clean: clean-cdc + +clean-cdc: + $(MAKE) DECODER=pgoutput -C cdc clean + cleanup-before-install: rm -f $(DESTDIR)$(datadir)/$(datamoduledir)/citus.control rm -f $(DESTDIR)$(datadir)/$(datamoduledir)/citus--* -install: cleanup-before-install +install: cleanup-before-install install-cdc + +install-cdc: + $(MAKE) DECODER=pgoutput -C cdc install # install and install-downgrades should be run sequentially install-all: install @@ -96,4 +110,5 @@ install-downgrades: $(generated_downgrade_sql_files) clean-full: $(MAKE) clean + $(MAKE) -C cdc clean-full rm -rf $(safestringlib_builddir) diff --git a/src/backend/distributed/cdc/Makefile b/src/backend/distributed/cdc/Makefile new file mode 100644 index 000000000..76aa28726 --- /dev/null +++ b/src/backend/distributed/cdc/Makefile @@ -0,0 +1,26 @@ +ifndef DECODER + DECODER = pgoutput +endif + +MODULE_big = citus_$(DECODER) +citus_subdir = src/backend/distributed/cdc +citus_top_builddir = ../../../.. +citus_decoders_dir = $(DESTDIR)$(pkglibdir)/citus_decoders + +OBJS += cdc_decoder.o cdc_decoder_utils.o + +include $(citus_top_builddir)/Makefile.global + +override CFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include +override CPPFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include + +install: install-cdc + +clean: clean-cdc + +install-cdc: + mkdir -p '$(citus_decoders_dir)' + $(INSTALL_SHLIB) citus_$(DECODER).so '$(citus_decoders_dir)/$(DECODER).so' + +clean-cdc: + rm -f '$(DESTDIR)$(datadir)/$(datamoduledir)/citus_decoders/$(DECODER).so' diff --git a/src/backend/distributed/cdc/cdc_decoder.c b/src/backend/distributed/cdc/cdc_decoder.c index 5df4271e6..9dfb8bc12 100644 --- a/src/backend/distributed/cdc/cdc_decoder.c +++ b/src/backend/distributed/cdc/cdc_decoder.c @@ -8,18 +8,31 @@ *------------------------------------------------------------------------- */ +#include "cdc_decoder_utils.h" #include "postgres.h" -#include "common/hashfn.h" -#include "utils/typcache.h" -#include "utils/lsyscache.h" -#include "catalog/pg_namespace.h" -#include "distributed/cdc_decoder.h" -#include "distributed/relay_utility.h" -#include "distributed/worker_protocol.h" -#include "distributed/metadata_cache.h" +#include "fmgr.h" +#include "access/genam.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_publication.h" +#include "commands/extension.h" +#include "common/hashfn.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/typcache.h" + +PG_MODULE_MAGIC; + +extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); static LogicalDecodeChangeCB ouputPluginChangeCB; +static void InitShardToDistributedTableMap(void); + +static void PublishDistributedTableChanges(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id); @@ -43,6 +56,124 @@ typedef struct static HTAB *shardToDistributedTableMap = NULL; +static void cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); + + +/* build time macro for base decoder plugin name for CDC and Shard Split. */ +#ifndef DECODER +#define DECODER "pgoutput" +#endif + +#define DECODER_INIT_FUNCTION_NAME "_PG_output_plugin_init" + +#define CITUS_SHARD_TRANSFER_SLOT_PREFIX "citus_shard_" +#define CITUS_SHARD_TRANSFER_SLOT_PREFIX_SIZE (sizeof(CITUS_SHARD_TRANSFER_SLOT_PREFIX) - \ + 1) + +/* + * Postgres uses 'pgoutput' as default plugin for logical replication. + * We want to reuse Postgres pgoutput's functionality as much as possible. + * Hence we load all the functions of this plugin and override as required. + */ +void +_PG_output_plugin_init(OutputPluginCallbacks *cb) +{ + elog(LOG, "Initializing CDC decoder"); + + /* + * We build custom .so files whose name matches common decoders (pgoutput, wal2json) + * and place them in $libdir/citus_decoders/ such that administrators can configure + * dynamic_library_path to include this directory, and users can then use the + * regular decoder names when creating replications slots. + * + * To load the original decoder, we need to remove citus_decoders/ from the + * dynamic_library_path. + */ + char *originalDLP = Dynamic_library_path; + Dynamic_library_path = RemoveCitusDecodersFromPaths(Dynamic_library_path); + + LogicalOutputPluginInit plugin_init = + (LogicalOutputPluginInit) (void *) + load_external_function(DECODER, + DECODER_INIT_FUNCTION_NAME, + false, NULL); + + if (plugin_init == NULL) + { + elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol"); + } + + /* in case this session is used for different replication slots */ + Dynamic_library_path = originalDLP; + + /* ask the output plugin to fill the callback struct */ + plugin_init(cb); + + /* Initialize the Shard Id to Distributed Table id mapping hash table.*/ + InitShardToDistributedTableMap(); + + /* actual pgoutput callback function will be called */ + ouputPluginChangeCB = cb->change_cb; + cb->change_cb = cdc_change_cb; + cb->filter_by_origin_cb = replication_origin_filter_cb; +} + + +/* + * Check if the replication slot is for Shard transfer by checking for prefix. + */ +inline static +bool +IsShardTransferSlot(char *replicationSlotName) +{ + return strncmp(replicationSlotName, CITUS_SHARD_TRANSFER_SLOT_PREFIX, + CITUS_SHARD_TRANSFER_SLOT_PREFIX_SIZE) == 0; +} + + +/* + * shard_split_and_cdc_change_cb function emits the incoming tuple change + * to the appropriate destination shard. + */ +static void +cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + /* + * If Citus has not been loaded yet, pass the changes + * through to the undrelying decoder plugin. + */ + if (!CdcCitusHasBeenLoaded()) + { + ouputPluginChangeCB(ctx, txn, relation, change); + return; + } + + /* check if the relation is publishable.*/ + if (!is_publishable_relation(relation)) + { + return; + } + + char *replicationSlotName = ctx->slot->data.name.data; + if (replicationSlotName == NULL) + { + elog(ERROR, "Replication slot name is NULL!"); + return; + } + + /* If the slot is for internal shard operations, call the base plugin's call back. */ + if (IsShardTransferSlot(replicationSlotName)) + { + ouputPluginChangeCB(ctx, txn, relation, change); + return; + } + + /* Transalate the changes from shard to distributes table and publish. */ + PublishDistributedTableChanges(ctx, txn, relation, change); +} + /* * InitShardToDistributedTableMap initializes the hash table that is used to @@ -68,23 +199,24 @@ InitShardToDistributedTableMap() * AddShardIdToHashTable adds the shardId to the hash table. */ static Oid -AddShardIdToHashTable(Oid shardId, ShardIdHashEntry *entry) +AddShardIdToHashTable(uint64 shardId, ShardIdHashEntry *entry) { entry->shardId = shardId; - entry->distributedTableId = LookupShardRelationFromCatalog(shardId, true); - entry->isReferenceTable = PartitionMethodViaCatalog(entry->distributedTableId) == 'n'; + entry->distributedTableId = CdcLookupShardRelationFromCatalog(shardId, true); + entry->isReferenceTable = CdcPartitionMethodViaCatalog(entry->distributedTableId) == + 'n'; return entry->distributedTableId; } static Oid -LookupDistributedTableIdForShardId(Oid shardId, bool *isReferenceTable) +LookupDistributedTableIdForShardId(uint64 shardId, bool *isReferenceTable) { bool found; Oid distributedTableId = InvalidOid; ShardIdHashEntry *entry = (ShardIdHashEntry *) hash_search(shardToDistributedTableMap, &shardId, - HASH_FIND | HASH_ENTER, + HASH_ENTER, &found); if (found) { @@ -99,24 +231,6 @@ LookupDistributedTableIdForShardId(Oid shardId, bool *isReferenceTable) } -/* - * InitCDCDecoder is called by from the shard split decoder plugin's init function. - * It sets the call back function for filtering out changes originated from other nodes. - * It also sets the call back function for processing the changes in ouputPluginChangeCB. - * This function is common for both CDC and shard split decoder plugins. - */ -void -InitCDCDecoder(OutputPluginCallbacks *cb, LogicalDecodeChangeCB changeCB) -{ - elog(LOG, "Initializing CDC decoder"); - cb->filter_by_origin_cb = replication_origin_filter_cb; - ouputPluginChangeCB = changeCB; - - /* Initialize the hash table used for mapping shard to shell tables. */ - InitShardToDistributedTableMap(); -} - - /* * replication_origin_filter_cb call back function filters out publication of changes * originated from any other node other than the current node. This is @@ -166,7 +280,7 @@ TranslateAndPublishRelationForCDC(LogicalDecodingContext *ctx, ReorderBufferTXN * the changes as the change for the distributed table instead of shard. * If not, it returns false. It also skips the Citus metadata tables. */ -void +static void PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { @@ -179,7 +293,7 @@ PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *tx } /* Check if the relation is a distributed table by checking for shard name. */ - uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true); + uint64 shardId = CdcExtractShardIdFromTableName(shardRelationName, true); /* If this relation is not distributed, call the pgoutput's callback and return. */ if (shardId == INVALID_SHARD_ID) @@ -197,7 +311,7 @@ PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *tx } /* Publish changes for reference table only from the coordinator node. */ - if (isReferenceTable && !IsCoordinator()) + if (isReferenceTable && !CdcIsCoordinator()) { return; } @@ -247,14 +361,12 @@ GetTupleForTargetSchemaForCdc(HeapTuple sourceRelationTuple, targetNulls[targetIndex] = true; targetIndex++; } - /* If this source attribute has been dropped, just skip this source attribute.*/ else if (TupleDescAttr(sourceRelDesc, sourceIndex)->attisdropped) { sourceIndex++; continue; } - /* If both source and target attributes are not dropped, add the attribute field to targetValues. */ else if (sourceIndex < sourceRelDesc->natts) { diff --git a/src/backend/distributed/cdc/cdc_decoder_utils.c b/src/backend/distributed/cdc/cdc_decoder_utils.c new file mode 100644 index 000000000..272221a5f --- /dev/null +++ b/src/backend/distributed/cdc/cdc_decoder_utils.c @@ -0,0 +1,432 @@ +/*------------------------------------------------------------------------- + * + * cdc_decoder_utils.c + * CDC Decoder plugin utility functions for Citus + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "commands/extension.h" +#include "fmgr.h" +#include "miscadmin.h" +#include "access/genam.h" +#include "access/heapam.h" +#include "common/hashfn.h" +#include "common/string.h" +#include "utils/fmgroids.h" +#include "utils/typcache.h" +#include "utils/lsyscache.h" +#include "catalog/pg_namespace.h" +#include "cdc_decoder_utils.h" +#include "distributed/pg_dist_partition.h" +#include "distributed/pg_dist_shard.h" +#include "distributed/relay_utility.h" + +static int32 LocalGroupId = -1; +static Oid PgDistLocalGroupRelationId = InvalidOid; +static Oid PgDistShardRelationId = InvalidOid; +static Oid PgDistShardShardidIndexId = InvalidOid; +static Oid PgDistPartitionRelationId = InvalidOid; +static Oid PgDistPartitionLogicalrelidIndexId = InvalidOid; +static bool IsCitusExtensionLoaded = false; + +#define COORDINATOR_GROUP_ID 0 +#define InvalidRepOriginId 0 +#define Anum_pg_dist_local_groupid 1 +#define GROUP_ID_UPGRADING -2 + + +static Oid DistLocalGroupIdRelationId(void); +static int32 CdcGetLocalGroupId(void); +static HeapTuple CdcPgDistPartitionTupleViaCatalog(Oid relationId); + +/* + * DistLocalGroupIdRelationId returns the relation id of the pg_dist_local_group + */ +static Oid +DistLocalGroupIdRelationId(void) +{ + if (PgDistLocalGroupRelationId == InvalidOid) + { + PgDistLocalGroupRelationId = get_relname_relid("pg_dist_local_group", + PG_CATALOG_NAMESPACE); + } + return PgDistLocalGroupRelationId; +} + + +/* + * DistShardRelationId returns the relation id of the pg_dist_shard + */ +static Oid +DistShardRelationId(void) +{ + if (PgDistShardRelationId == InvalidOid) + { + PgDistShardRelationId = get_relname_relid("pg_dist_shard", PG_CATALOG_NAMESPACE); + } + return PgDistShardRelationId; +} + + +/* + * DistShardRelationId returns the relation id of the pg_dist_shard + */ +static Oid +DistShardShardidIndexId(void) +{ + if (PgDistShardShardidIndexId == InvalidOid) + { + PgDistShardShardidIndexId = get_relname_relid("pg_dist_shard_shardid_index", + PG_CATALOG_NAMESPACE); + } + return PgDistShardShardidIndexId; +} + + +/* + * DistShardRelationId returns the relation id of the pg_dist_shard + */ +static Oid +DistPartitionRelationId(void) +{ + if (PgDistPartitionRelationId == InvalidOid) + { + PgDistPartitionRelationId = get_relname_relid("pg_dist_partition", + PG_CATALOG_NAMESPACE); + } + return PgDistPartitionRelationId; +} + + +static Oid +DistPartitionLogicalRelidIndexId(void) +{ + if (PgDistPartitionLogicalrelidIndexId == InvalidOid) + { + PgDistPartitionLogicalrelidIndexId = get_relname_relid( + "pg_dist_partition_logicalrelid_index", PG_CATALOG_NAMESPACE); + } + return PgDistPartitionLogicalrelidIndexId; +} + + +/* + * CdcIsCoordinator function returns true if this node is identified as the + * schema/coordinator/master node of the cluster. + */ +bool +CdcIsCoordinator(void) +{ + return (CdcGetLocalGroupId() == COORDINATOR_GROUP_ID); +} + + +/* + * CdcCitusHasBeenLoaded function returns true if the citus extension has been loaded. + */ +bool +CdcCitusHasBeenLoaded() +{ + if (!IsCitusExtensionLoaded) + { + IsCitusExtensionLoaded = (get_extension_oid("citus", true) != InvalidOid); + } + + return IsCitusExtensionLoaded; +} + + +/* + * ExtractShardIdFromTableName tries to extract shard id from the given table name, + * and returns the shard id if table name is formatted as shard name. + * Else, the function returns INVALID_SHARD_ID. + */ +uint64 +CdcExtractShardIdFromTableName(const char *tableName, bool missingOk) +{ + char *shardIdStringEnd = NULL; + + /* find the last underscore and increment for shardId string */ + char *shardIdString = strrchr(tableName, SHARD_NAME_SEPARATOR); + if (shardIdString == NULL && !missingOk) + { + ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"", + tableName))); + } + else if (shardIdString == NULL && missingOk) + { + return INVALID_SHARD_ID; + } + + shardIdString++; + + errno = 0; + uint64 shardId = strtoull(shardIdString, &shardIdStringEnd, 0); + + if (errno != 0 || (*shardIdStringEnd != '\0')) + { + if (!missingOk) + { + ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"", + tableName))); + } + else + { + return INVALID_SHARD_ID; + } + } + + return shardId; +} + + +/* + * CdcGetLocalGroupId returns the group identifier of the local node. The function assumes + * that pg_dist_local_node_group has exactly one row and has at least one column. + * Otherwise, the function errors out. + */ +static int32 +CdcGetLocalGroupId(void) +{ + ScanKeyData scanKey[1]; + int scanKeyCount = 0; + int32 groupId = 0; + + /* + * Already set the group id, no need to read the heap again. + */ + if (LocalGroupId != -1) + { + return LocalGroupId; + } + + Oid localGroupTableOid = DistLocalGroupIdRelationId(); + if (localGroupTableOid == InvalidOid) + { + return 0; + } + + Relation pgDistLocalGroupId = table_open(localGroupTableOid, AccessShareLock); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistLocalGroupId, + InvalidOid, false, + NULL, scanKeyCount, scanKey); + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistLocalGroupId); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + + if (HeapTupleIsValid(heapTuple)) + { + bool isNull = false; + Datum groupIdDatum = heap_getattr(heapTuple, + Anum_pg_dist_local_groupid, + tupleDescriptor, &isNull); + + groupId = DatumGetInt32(groupIdDatum); + + /* set the local cache variable */ + LocalGroupId = groupId; + } + else + { + /* + * Upgrade is happening. When upgrading postgres, pg_dist_local_group is + * temporarily empty before citus_finish_pg_upgrade() finishes execution. + */ + groupId = GROUP_ID_UPGRADING; + } + + systable_endscan(scanDescriptor); + table_close(pgDistLocalGroupId, AccessShareLock); + + return groupId; +} + + +/* + * CdcLookupShardRelationFromCatalog returns the logical relation oid a shard belongs to. + * + * Errors out if the shardId does not exist and missingOk is false. + * Returns InvalidOid if the shardId does not exist and missingOk is true. + */ +Oid +CdcLookupShardRelationFromCatalog(int64 shardId, bool missingOk) +{ + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + Form_pg_dist_shard shardForm = NULL; + Relation pgDistShard = table_open(DistShardRelationId(), AccessShareLock); + Oid relationId = InvalidOid; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistShard, + DistShardShardidIndexId(), true, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple) && !missingOk) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, shardId))); + } + + if (!HeapTupleIsValid(heapTuple)) + { + relationId = InvalidOid; + } + else + { + shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); + relationId = shardForm->logicalrelid; + } + + systable_endscan(scanDescriptor); + table_close(pgDistShard, NoLock); + + return relationId; +} + + +/* + * CdcPgDistPartitionTupleViaCatalog is a helper function that searches + * pg_dist_partition for the given relationId. The caller is responsible + * for ensuring that the returned heap tuple is valid before accessing + * its fields. + */ +static HeapTuple +CdcPgDistPartitionTupleViaCatalog(Oid relationId) +{ + const int scanKeyCount = 1; + ScanKeyData scanKey[1]; + bool indexOK = true; + + Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistPartition, + DistPartitionLogicalRelidIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + HeapTuple partitionTuple = systable_getnext(scanDescriptor); + + if (HeapTupleIsValid(partitionTuple)) + { + /* callers should have the tuple in their memory contexts */ + partitionTuple = heap_copytuple(partitionTuple); + } + + systable_endscan(scanDescriptor); + table_close(pgDistPartition, AccessShareLock); + + return partitionTuple; +} + + +/* + * CdcPartitionMethodViaCatalog gets a relationId and returns the partition + * method column from pg_dist_partition via reading from catalog. + */ +char +CdcPartitionMethodViaCatalog(Oid relationId) +{ + HeapTuple partitionTuple = CdcPgDistPartitionTupleViaCatalog(relationId); + if (!HeapTupleIsValid(partitionTuple)) + { + return DISTRIBUTE_BY_INVALID; + } + + Datum datumArray[Natts_pg_dist_partition]; + bool isNullArray[Natts_pg_dist_partition]; + + Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); + + if (isNullArray[Anum_pg_dist_partition_partmethod - 1]) + { + /* partition method cannot be NULL, still let's make sure */ + heap_freetuple(partitionTuple); + table_close(pgDistPartition, NoLock); + return DISTRIBUTE_BY_INVALID; + } + + Datum partitionMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1]; + char partitionMethodChar = DatumGetChar(partitionMethodDatum); + + heap_freetuple(partitionTuple); + table_close(pgDistPartition, NoLock); + + return partitionMethodChar; +} + + +/* + * RemoveCitusDecodersFromPaths removes a path ending in citus_decoders + * from the given input paths. + */ +char * +RemoveCitusDecodersFromPaths(char *paths) +{ + if (strlen(paths) == 0) + { + /* dynamic_library_path is empty */ + return paths; + } + + StringInfo newPaths = makeStringInfo(); + + char *remainingPaths = paths; + + for (;;) + { + int pathLength = 0; + + char *pathStart = first_path_var_separator(remainingPaths); + if (pathStart == remainingPaths) + { + /* + * This will error out in find_in_dynamic_libpath, return + * original value here. + */ + return paths; + } + else if (pathStart == NULL) + { + /* final path */ + pathLength = strlen(remainingPaths); + } + else + { + /* more paths remaining */ + pathLength = pathStart - remainingPaths; + } + + char *currentPath = palloc(pathLength + 1); + strlcpy(currentPath, remainingPaths, pathLength + 1); + canonicalize_path(currentPath); + + if (!pg_str_endswith(currentPath, "/citus_decoders")) + { + appendStringInfo(newPaths, "%s%s", newPaths->len > 0 ? ":" : "", currentPath); + } + + if (remainingPaths[pathLength] == '\0') + { + /* end of string */ + break; + } + + remainingPaths += pathLength + 1; + } + + return newPaths->data; +} diff --git a/src/include/distributed/cdc_decoder.h b/src/backend/distributed/cdc/cdc_decoder_utils.h similarity index 50% rename from src/include/distributed/cdc_decoder.h rename to src/backend/distributed/cdc/cdc_decoder_utils.h index 391a3dece..d30500de4 100644 --- a/src/include/distributed/cdc_decoder.h +++ b/src/backend/distributed/cdc/cdc_decoder_utils.h @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * cdc_decoder..h + * cdc_decoder_utils.h * Utility functions and declerations for cdc decoder. * * Copyright (c) Citus Data, Inc. @@ -14,14 +14,21 @@ #include "postgres.h" #include "fmgr.h" #include "replication/logical.h" +#include "c.h" - -void PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change); - -void InitCDCDecoder(OutputPluginCallbacks *cb, LogicalDecodeChangeCB changeCB); - -/* used in the replication_origin_filter_cb function. */ #define InvalidRepOriginId 0 +#define INVALID_SHARD_ID 0 -#endif /* CITUS_CDC_DECODER_H */ +bool CdcIsCoordinator(void); + +uint64 CdcExtractShardIdFromTableName(const char *tableName, bool missingOk); + +Oid CdcLookupShardRelationFromCatalog(int64 shardId, bool missingOk); + +char CdcPartitionMethodViaCatalog(Oid relationId); + +bool CdcCitusHasBeenLoaded(void); + +char * RemoveCitusDecodersFromPaths(char *paths); + +#endif /* CITUS_CDC_DECODER_UTILS_H */ diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 7693f216e..1386a21b0 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -8,7 +8,6 @@ *------------------------------------------------------------------------- */ #include "postgres.h" -#include "distributed/cdc_decoder.h" #include "distributed/shardinterval_utils.h" #include "distributed/shardsplit_shared_memory.h" #include "distributed/worker_shard_visibility.h" @@ -21,14 +20,18 @@ #include "catalog/pg_namespace.h" extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); -static LogicalDecodeChangeCB ouputPluginChangeCB; +static LogicalDecodeChangeCB pgOutputPluginChangeCB; + +#define InvalidRepOriginId 0 static HTAB *SourceToDestinationShardMap = NULL; +static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId + origin_id); /* Plugin callback */ -static void shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change); +static void shard_split_change_cb(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); /* Helper methods */ static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation, @@ -44,22 +47,6 @@ static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple, TupleDesc sourceTupleDesc, TupleDesc targetTupleDesc); -inline static bool IsShardSplitSlot(char *replicationSlotName); - - -#define CITUS_SHARD_SLOT_PREFIX "citus_shard_" -#define CITUS_SHARD_SLOT_PREFIX_SIZE (sizeof(CITUS_SHARD_SLOT_PREFIX) - 1) - -/* build time macro for base decoder plugin name for CDC and Shard Split. */ -#ifndef CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME -#define CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME "pgoutput" -#endif - -/* build time macro for base decoder plugin's initialization function name for CDC and Shard Split. */ -#ifndef CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME -#define CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME "_PG_output_plugin_init" -#endif - /* * Postgres uses 'pgoutput' as default plugin for logical replication. * We want to reuse Postgres pgoutput's functionality as much as possible. @@ -70,8 +57,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) { LogicalOutputPluginInit plugin_init = (LogicalOutputPluginInit) (void *) - load_external_function(CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME, - CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME, + load_external_function("pgoutput", + "_PG_output_plugin_init", false, NULL); if (plugin_init == NULL) @@ -83,31 +70,33 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) plugin_init(cb); /* actual pgoutput callback will be called with the appropriate destination shard */ - ouputPluginChangeCB = cb->change_cb; - cb->change_cb = shard_split_and_cdc_change_cb; - InitCDCDecoder(cb, ouputPluginChangeCB); + pgOutputPluginChangeCB = cb->change_cb; + cb->change_cb = shard_split_change_cb; + cb->filter_by_origin_cb = replication_origin_filter_cb; } /* - * Check if the replication slot is for Shard split by checking for prefix. + * replication_origin_filter_cb call back function filters out publication of changes + * originated from any other node other than the current node. This is + * identified by the "origin_id" of the changes. The origin_id is set to + * a non-zero value in the origin node as part of WAL replication for internal + * operations like shard split/moves/create_distributed_table etc. */ -inline static -bool -IsShardSplitSlot(char *replicationSlotName) +static bool +replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id) { - return strncmp(replicationSlotName, CITUS_SHARD_SLOT_PREFIX, - CITUS_SHARD_SLOT_PREFIX_SIZE) == 0; + return (origin_id != InvalidRepOriginId); } /* - * shard_split_and_cdc_change_cb function emits the incoming tuple change + * shard_split_change_cb function emits the incoming tuple change * to the appropriate destination shard. */ static void -shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change) +shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) { /* * If Citus has not been loaded yet, pass the changes @@ -115,7 +104,7 @@ shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn */ if (!CitusHasBeenLoaded()) { - ouputPluginChangeCB(ctx, txn, relation, change); + pgOutputPluginChangeCB(ctx, txn, relation, change); return; } @@ -132,13 +121,6 @@ shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn return; } - /* check for the internal shard split names, if not, assume the slot is for CDC. */ - if (!IsShardSplitSlot(replicationSlotName)) - { - PublishDistributedTableChanges(ctx, txn, relation, change); - return; - } - /* * Initialize SourceToDestinationShardMap if not already initialized. * This gets initialized during the replication of first message. @@ -257,7 +239,7 @@ shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn } } - ouputPluginChangeCB(ctx, txn, targetRelation, change); + pgOutputPluginChangeCB(ctx, txn, targetRelation, change); RelationClose(targetRelation); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 23393078b..c779b0a8d 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -136,6 +136,8 @@ ReadColumnarOptions_type extern_ReadColumnarOptions = NULL; CppConcat(extern_, funcname) = \ (typename) (void *) lookup_external_function(handle, # funcname) +#define CDC_DECODER_DYNAMIC_LIB_PATH "$libdir/citus_decoders:$libdir" + DEFINE_COLUMNAR_PASSTHROUGH_FUNC(columnar_handler) DEFINE_COLUMNAR_PASSTHROUGH_FUNC(alter_columnar_table_set) DEFINE_COLUMNAR_PASSTHROUGH_FUNC(alter_columnar_table_reset) @@ -207,7 +209,7 @@ static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSour source); static void CitusAuthHook(Port *port, int status); static bool IsSuperuser(char *userName); - +static void AdjustDynamicLibraryPathForCdcDecoders(void); static ClientAuthentication_hook_type original_client_auth_hook = NULL; @@ -475,6 +477,17 @@ _PG_init(void) InitializeLocallyReservedSharedConnections(); InitializeClusterClockMem(); + /* + * Adjust the Dynamic Library Path to prepend citus_decodes to the dynamic + * library path. This is needed to make sure that the citus decoders are + * loaded before the default decoders for CDC. + */ + if (EnableChangeDataCapture) + { + AdjustDynamicLibraryPathForCdcDecoders(); + } + + /* initialize shard split shared memory handle management */ InitializeShardSplitSMHandleManagement(); @@ -542,6 +555,22 @@ _PG_init(void) } +/* + * PrependCitusDecodersToDynamicLibrayPath prepends the $libdir/citus_decoders + * to the dynamic library path. This is needed to make sure that the citus + * decoders are loaded before the default decoders for CDC. + */ +static void +AdjustDynamicLibraryPathForCdcDecoders(void) +{ + if (strcmp(Dynamic_library_path, "$libdir") == 0) + { + SetConfigOption("dynamic_library_path", CDC_DECODER_DYNAMIC_LIB_PATH, + PGC_POSTMASTER, PGC_S_OVERRIDE); + } +} + + #if PG_VERSION_NUM >= PG_VERSION_15 /* diff --git a/src/test/cdc/postgresql.conf b/src/test/cdc/postgresql.conf index d2d6d3efe..1c0e1fad7 100644 --- a/src/test/cdc/postgresql.conf +++ b/src/test/cdc/postgresql.conf @@ -1,2 +1 @@ -shared_preload_libraries=citus shared_preload_libraries='citus' diff --git a/src/test/cdc/t/001_cdc_create_distributed_table_test.pl b/src/test/cdc/t/001_cdc_create_distributed_table_test.pl index 74850e58a..5e57b8a54 100644 --- a/src/test/cdc/t/001_cdc_create_distributed_table_test.pl +++ b/src/test/cdc/t/001_cdc_create_distributed_table_test.pl @@ -34,6 +34,8 @@ my $initial_schema = " CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; $node_coordinator->safe_psql('postgres',$initial_schema); +$node_coordinator->safe_psql('postgres','ALTER TABLE sensors REPLICA IDENTITY FULL;'); + $node_cdc_client->safe_psql('postgres',$initial_schema); create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); @@ -93,5 +95,15 @@ wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers) $result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); is($result, 1, 'CDC basic test - distributed table delete data'); +$node_coordinator->safe_psql('postgres',"TRUNCATE sensors;"); + +# Wait for the data changes to be replicated to the cdc client node. +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +# Compare the data in the coordinator and cdc client nodes. +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC basic test - distributed table delete data'); + + drop_cdc_client_subscriptions($node_cdc_client,\@workers); done_testing(); diff --git a/src/test/cdc/t/006_cdc_schema_change_and_move.pl b/src/test/cdc/t/006_cdc_schema_change_and_move.pl index 7b0c0759d..cf1425a30 100644 --- a/src/test/cdc/t/006_cdc_schema_change_and_move.pl +++ b/src/test/cdc/t/006_cdc_schema_change_and_move.pl @@ -41,6 +41,7 @@ my $initial_schema = " CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; $node_coordinator->safe_psql('postgres',$initial_schema); +$node_coordinator->safe_psql('postgres','ALTER TABLE sensors REPLICA IDENTITY FULL;'); $node_cdc_client->safe_psql('postgres',$initial_schema); create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); diff --git a/src/test/cdc/t/007_cdc_undistributed_table_test.pl b/src/test/cdc/t/007_cdc_undistributed_table_test.pl index c8c87f678..f927b43e2 100644 --- a/src/test/cdc/t/007_cdc_undistributed_table_test.pl +++ b/src/test/cdc/t/007_cdc_undistributed_table_test.pl @@ -14,7 +14,7 @@ my $result = 0; ### Create the citus cluster with coordinator and two worker nodes our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); -my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);"; +my $command = "UPDATE pg_dist_node SET shouldhaveshards = true;"; $node_coordinator->safe_psql('postgres',$command); our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); diff --git a/src/test/cdc/t/008_cdc_shard_split_test.pl b/src/test/cdc/t/008_cdc_shard_split_test.pl index 6348116f7..6875d1855 100644 --- a/src/test/cdc/t/008_cdc_shard_split_test.pl +++ b/src/test/cdc/t/008_cdc_shard_split_test.pl @@ -17,7 +17,7 @@ citus.shard_replication_factor = 1 ### Create the citus cluster with coordinator and two worker nodes our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config); -my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);"; +my $command = "UPDATE pg_dist_node SET shouldhaveshards = true;"; $node_coordinator->safe_psql('postgres',$command); our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); diff --git a/src/test/cdc/t/009_cdc_shard_split_test_non_blocking.pl b/src/test/cdc/t/009_cdc_shard_split_test_non_blocking.pl index e73637aa1..58077b5a1 100644 --- a/src/test/cdc/t/009_cdc_shard_split_test_non_blocking.pl +++ b/src/test/cdc/t/009_cdc_shard_split_test_non_blocking.pl @@ -17,7 +17,7 @@ citus.shard_replication_factor = 1 ### Create the citus cluster with coordinator and two worker nodes our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config); -my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);"; +my $command = "UPDATE pg_dist_node SET shouldhaveshards = true;"; $node_coordinator->safe_psql('postgres',$command); our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); diff --git a/src/test/cdc/t/010_cdc_shard_split_parallel_insert.pl b/src/test/cdc/t/010_cdc_shard_split_parallel_insert.pl index cb00ec328..4ac75244a 100644 --- a/src/test/cdc/t/010_cdc_shard_split_parallel_insert.pl +++ b/src/test/cdc/t/010_cdc_shard_split_parallel_insert.pl @@ -19,7 +19,7 @@ citus.shard_replication_factor = 1 ### Create the citus cluster with coordinator and two worker nodes our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config); -my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);"; +my $command = "UPDATE pg_dist_node SET shouldhaveshards = true;"; $node_coordinator->safe_psql('postgres',$command); our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); diff --git a/src/test/cdc/t/011_cdc_alter_distributed_table.pl b/src/test/cdc/t/011_cdc_alter_distributed_table.pl index 29fc2d037..2fbcd6429 100644 --- a/src/test/cdc/t/011_cdc_alter_distributed_table.pl +++ b/src/test/cdc/t/011_cdc_alter_distributed_table.pl @@ -18,7 +18,7 @@ citus.shard_replication_factor = 1 ### Create the citus cluster with coordinator and two worker nodes our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config); -my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);"; +my $command = "UPDATE pg_dist_node SET shouldhaveshards = true;"; $node_coordinator->safe_psql('postgres',$command); our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); diff --git a/src/test/cdc/t/013_cdc_drop_last_column_for_one_shard.pl b/src/test/cdc/t/013_cdc_drop_last_column_for_one_shard.pl new file mode 100644 index 000000000..ec8ccb718 --- /dev/null +++ b/src/test/cdc/t/013_cdc_drop_last_column_for_one_shard.pl @@ -0,0 +1,89 @@ +# Schema change CDC test for Citus +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $select_stmt_after_drop = qq(SELECT measureid, eventdatetime, measure_data, meaure_quantity, measure_status FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; + +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +print("coordinator port: " . $node_coordinator->port() . "\n"); +print("worker0 port:" . $workers[0]->port() . "\n"); +print("worker1 port:" . $workers[1]->port() . "\n"); +print("cdc_client port:" .$node_cdc_client->port() . "\n"); + +# Creeate the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_coordinator->safe_psql('postgres','ALTER TABLE sensors REPLICA IDENTITY FULL;'); +$node_cdc_client->safe_psql('postgres',$initial_schema); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +#insert data into the sensors table in the coordinator node before distributing the table. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors +SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' +FROM generate_series(0,100)i;"); + +$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');"); + +#connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +create_cdc_slots_for_workers(\@workers); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC create_distributed_table - schema change before move'); + + +my $shard_id = $workers[1]->safe_psql('postgres', + "SELECT shardid FROM citus_shards ORDER BY shardid LIMIT 1;"); + +my $shard_to_drop_column = "sensors_" . $shard_id; + + +$workers[1]->safe_psql('postgres',"ALTER TABLE $shard_to_drop_column DROP COLUMN measure_comment;"); + + +$workers[1]->safe_psql('postgres'," + INSERT INTO sensors + SELECT i, '2020-01-05', '{}', 11011.10, 'A' + FROM generate_series(-10,-1)i;"); + + +wait_for_cdc_client_to_catch_up_with_workers(\@workers); +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt_after_drop); +is($result, 1, 'CDC create_distributed_table - schema change and move shard'); + + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/014_cdc_with_table_like_shard_name.pl b/src/test/cdc/t/014_cdc_with_table_like_shard_name.pl new file mode 100644 index 000000000..c96fea921 --- /dev/null +++ b/src/test/cdc/t/014_cdc_with_table_like_shard_name.pl @@ -0,0 +1,90 @@ +# Schema change CDC test for Citus +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $select_stmt_after_drop = qq(SELECT measureid, eventdatetime, measure_data, meaure_quantity, measure_status FROM sensors ORDER BY measureid, eventdatetime, measure_data;); +my $result = 0; + +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +print("coordinator port: " . $node_coordinator->port() . "\n"); +print("worker0 port:" . $workers[0]->port() . "\n"); +print("worker1 port:" . $workers[1]->port() . "\n"); +print("cdc_client port:" .$node_cdc_client->port() . "\n"); + +# Creeate the sensors table and ndexes. +my $initial_schema = " + CREATE TABLE sensors( + measureid integer, + eventdatetime timestamptz, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + + CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); + ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; + CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); + CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); + CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;"; + +my $shard_like_table_schema = " + CREATE TABLE data_100008( + id integer, + data integer, + PRIMARY KEY (data));"; + +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_coordinator->safe_psql('postgres',$shard_like_table_schema); +$node_coordinator->safe_psql('postgres','ALTER TABLE sensors REPLICA IDENTITY FULL;'); + +$node_cdc_client->safe_psql('postgres',$initial_schema); +$node_cdc_client->safe_psql('postgres',$shard_like_table_schema); + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors,data_100008'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +#insert data into the sensors table in the coordinator node before distributing the table. +$node_coordinator->safe_psql('postgres'," + INSERT INTO sensors +SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' +FROM generate_series(0,100)i;"); + +$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');"); + +#connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +create_cdc_slots_for_workers(\@workers); +connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers); + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC create_distributed_table - basic test'); + +$workers[1]->safe_psql('postgres',$shard_like_table_schema); +$workers[1]->safe_psql('postgres','\d'); + +$workers[1]->safe_psql('postgres'," + INSERT INTO data_100008 + SELECT i, i*10 + FROM generate_series(-10,10)i;"); + + +wait_for_cdc_client_to_catch_up_with_workers(\@workers); +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt_after_drop); +is($result, 1, 'CDC create_distributed_table - normal table with name like shard'); + +drop_cdc_client_subscriptions($node_cdc_client,\@workers); +done_testing(); diff --git a/src/test/cdc/t/015_cdc_without_citus.pl b/src/test/cdc/t/015_cdc_without_citus.pl new file mode 100644 index 000000000..4f3db68ca --- /dev/null +++ b/src/test/cdc/t/015_cdc_without_citus.pl @@ -0,0 +1,53 @@ +# Schema change CDC test for Citus +use strict; +use warnings; + +use Test::More; + +use lib './t'; +use cdctestlib; + + +# Initialize co-ordinator node +my $select_stmt = qq(SELECT * FROM data_100008 ORDER BY id;); +my $result = 0; + +### Create the citus cluster with coordinator and two worker nodes +our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636); + +our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639); + +print("coordinator port: " . $node_coordinator->port() . "\n"); +print("worker0 port:" . $workers[0]->port() . "\n"); +print("worker1 port:" . $workers[1]->port() . "\n"); +print("cdc_client port:" .$node_cdc_client->port() . "\n"); + +my $initial_schema = " + CREATE TABLE data_100008( + id integer, + data integer, + PRIMARY KEY (data));"; + +$node_coordinator->safe_psql('postgres','DROP extension citus'); +$node_coordinator->safe_psql('postgres',$initial_schema); +$node_coordinator->safe_psql('postgres','ALTER TABLE data_100008 REPLICA IDENTITY FULL;'); + +$node_cdc_client->safe_psql('postgres',$initial_schema); + + +create_cdc_publication_and_slots_for_coordinator($node_coordinator,'data_100008'); +connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client); +wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator); + +#insert data into the sensors table in the coordinator node before distributing the table. +$node_coordinator->safe_psql('postgres'," + INSERT INTO data_100008 + SELECT i, i*10 + FROM generate_series(-10,10)i;"); + + +$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt); +is($result, 1, 'CDC create_distributed_table - basic test'); + +$node_cdc_client->safe_psql('postgres',"drop subscription cdc_subscription"); +done_testing(); diff --git a/src/test/cdc/t/cdctestlib.pm b/src/test/cdc/t/cdctestlib.pm index 393a62ac9..782da1198 100644 --- a/src/test/cdc/t/cdctestlib.pm +++ b/src/test/cdc/t/cdctestlib.pm @@ -207,7 +207,7 @@ sub create_cdc_publication_and_slots_for_coordinator { $node_coordinator->safe_psql('postgres',"DROP PUBLICATION IF EXISTS cdc_publication;"); } $node_coordinator->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR TABLE $table_names;"); - $node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,false)"); + $node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','pgoutput',false)"); } sub create_cdc_slots_for_workers { @@ -217,7 +217,7 @@ sub create_cdc_slots_for_workers { if ($slot ne "") { $_->safe_psql('postgres',"SELECT pg_catalog.pg_drop_replication_slot('cdc_replication_slot');"); } - $_->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,true)"); + $_->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','pgoutput',false)"); } }