Make CDC decoder an independent extension (#6810)

DESCRIPTION: 

- The CDC decoder is refacroted into a seperate extension that can be used loaded dynamically without having to reload citus.
- CDC decoder code can be compiled using DECODER flag to work with different decoders like pgoutput and wal2json.
   by default the base decode is "pgoutput".
- the dynamic_library_path config is adjusted dynamically to prefer the decoders in cdc_decoders directory in citus init
  so that the users can use the replication subscription commands without having to make any config changes.
pull/6805/head
rajeshkt78 2023-04-03 21:32:15 +05:30 committed by GitHub
parent 697bb55fc5
commit d5df892394
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 948 additions and 101 deletions

View File

@ -18,7 +18,7 @@ generated_downgrade_sql_files += $(patsubst %,$(citus_abs_srcdir)/build/sql/%,$(
DATA_built = $(generated_sql_files)
# directories with source files
SUBDIRS = . commands cdc connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock
SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock
# enterprise modules
SUBDIRS += replication
@ -32,7 +32,13 @@ OBJS += \
$(patsubst $(citus_abs_srcdir)/%.c,%.o,$(foreach dir,$(SUBDIRS), $(sort $(wildcard $(citus_abs_srcdir)/$(dir)/*.c))))
# be explicit about the default target
all:
.PHONY: cdc
all: cdc
cdc:
echo "running cdc make"
$(MAKE) DECODER=pgoutput -C cdc all
NO_PGXS = 1
@ -81,11 +87,19 @@ endif
.PHONY: clean-full install install-downgrades install-all
clean: clean-cdc
clean-cdc:
$(MAKE) DECODER=pgoutput -C cdc clean
cleanup-before-install:
rm -f $(DESTDIR)$(datadir)/$(datamoduledir)/citus.control
rm -f $(DESTDIR)$(datadir)/$(datamoduledir)/citus--*
install: cleanup-before-install
install: cleanup-before-install install-cdc
install-cdc:
$(MAKE) DECODER=pgoutput -C cdc install
# install and install-downgrades should be run sequentially
install-all: install
@ -96,4 +110,5 @@ install-downgrades: $(generated_downgrade_sql_files)
clean-full:
$(MAKE) clean
$(MAKE) -C cdc clean-full
rm -rf $(safestringlib_builddir)

View File

@ -0,0 +1,26 @@
ifndef DECODER
DECODER = pgoutput
endif
MODULE_big = citus_$(DECODER)
citus_subdir = src/backend/distributed/cdc
citus_top_builddir = ../../../..
citus_decoders_dir = $(DESTDIR)$(pkglibdir)/citus_decoders
OBJS += cdc_decoder.o cdc_decoder_utils.o
include $(citus_top_builddir)/Makefile.global
override CFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include
override CPPFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include
install: install-cdc
clean: clean-cdc
install-cdc:
mkdir -p '$(citus_decoders_dir)'
$(INSTALL_SHLIB) citus_$(DECODER).so '$(citus_decoders_dir)/$(DECODER).so'
clean-cdc:
rm -f '$(DESTDIR)$(datadir)/$(datamoduledir)/citus_decoders/$(DECODER).so'

View File

@ -8,18 +8,31 @@
*-------------------------------------------------------------------------
*/
#include "cdc_decoder_utils.h"
#include "postgres.h"
#include "common/hashfn.h"
#include "utils/typcache.h"
#include "utils/lsyscache.h"
#include "catalog/pg_namespace.h"
#include "distributed/cdc_decoder.h"
#include "distributed/relay_utility.h"
#include "distributed/worker_protocol.h"
#include "distributed/metadata_cache.h"
#include "fmgr.h"
#include "access/genam.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_publication.h"
#include "commands/extension.h"
#include "common/hashfn.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/typcache.h"
PG_MODULE_MAGIC;
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
static LogicalDecodeChangeCB ouputPluginChangeCB;
static void InitShardToDistributedTableMap(void);
static void PublishDistributedTableChanges(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId
origin_id);
@ -43,6 +56,124 @@ typedef struct
static HTAB *shardToDistributedTableMap = NULL;
static void cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
/* build time macro for base decoder plugin name for CDC and Shard Split. */
#ifndef DECODER
#define DECODER "pgoutput"
#endif
#define DECODER_INIT_FUNCTION_NAME "_PG_output_plugin_init"
#define CITUS_SHARD_TRANSFER_SLOT_PREFIX "citus_shard_"
#define CITUS_SHARD_TRANSFER_SLOT_PREFIX_SIZE (sizeof(CITUS_SHARD_TRANSFER_SLOT_PREFIX) - \
1)
/*
* Postgres uses 'pgoutput' as default plugin for logical replication.
* We want to reuse Postgres pgoutput's functionality as much as possible.
* Hence we load all the functions of this plugin and override as required.
*/
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
elog(LOG, "Initializing CDC decoder");
/*
* We build custom .so files whose name matches common decoders (pgoutput, wal2json)
* and place them in $libdir/citus_decoders/ such that administrators can configure
* dynamic_library_path to include this directory, and users can then use the
* regular decoder names when creating replications slots.
*
* To load the original decoder, we need to remove citus_decoders/ from the
* dynamic_library_path.
*/
char *originalDLP = Dynamic_library_path;
Dynamic_library_path = RemoveCitusDecodersFromPaths(Dynamic_library_path);
LogicalOutputPluginInit plugin_init =
(LogicalOutputPluginInit) (void *)
load_external_function(DECODER,
DECODER_INIT_FUNCTION_NAME,
false, NULL);
if (plugin_init == NULL)
{
elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
}
/* in case this session is used for different replication slots */
Dynamic_library_path = originalDLP;
/* ask the output plugin to fill the callback struct */
plugin_init(cb);
/* Initialize the Shard Id to Distributed Table id mapping hash table.*/
InitShardToDistributedTableMap();
/* actual pgoutput callback function will be called */
ouputPluginChangeCB = cb->change_cb;
cb->change_cb = cdc_change_cb;
cb->filter_by_origin_cb = replication_origin_filter_cb;
}
/*
* Check if the replication slot is for Shard transfer by checking for prefix.
*/
inline static
bool
IsShardTransferSlot(char *replicationSlotName)
{
return strncmp(replicationSlotName, CITUS_SHARD_TRANSFER_SLOT_PREFIX,
CITUS_SHARD_TRANSFER_SLOT_PREFIX_SIZE) == 0;
}
/*
* shard_split_and_cdc_change_cb function emits the incoming tuple change
* to the appropriate destination shard.
*/
static void
cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
/*
* If Citus has not been loaded yet, pass the changes
* through to the undrelying decoder plugin.
*/
if (!CdcCitusHasBeenLoaded())
{
ouputPluginChangeCB(ctx, txn, relation, change);
return;
}
/* check if the relation is publishable.*/
if (!is_publishable_relation(relation))
{
return;
}
char *replicationSlotName = ctx->slot->data.name.data;
if (replicationSlotName == NULL)
{
elog(ERROR, "Replication slot name is NULL!");
return;
}
/* If the slot is for internal shard operations, call the base plugin's call back. */
if (IsShardTransferSlot(replicationSlotName))
{
ouputPluginChangeCB(ctx, txn, relation, change);
return;
}
/* Transalate the changes from shard to distributes table and publish. */
PublishDistributedTableChanges(ctx, txn, relation, change);
}
/*
* InitShardToDistributedTableMap initializes the hash table that is used to
@ -68,23 +199,24 @@ InitShardToDistributedTableMap()
* AddShardIdToHashTable adds the shardId to the hash table.
*/
static Oid
AddShardIdToHashTable(Oid shardId, ShardIdHashEntry *entry)
AddShardIdToHashTable(uint64 shardId, ShardIdHashEntry *entry)
{
entry->shardId = shardId;
entry->distributedTableId = LookupShardRelationFromCatalog(shardId, true);
entry->isReferenceTable = PartitionMethodViaCatalog(entry->distributedTableId) == 'n';
entry->distributedTableId = CdcLookupShardRelationFromCatalog(shardId, true);
entry->isReferenceTable = CdcPartitionMethodViaCatalog(entry->distributedTableId) ==
'n';
return entry->distributedTableId;
}
static Oid
LookupDistributedTableIdForShardId(Oid shardId, bool *isReferenceTable)
LookupDistributedTableIdForShardId(uint64 shardId, bool *isReferenceTable)
{
bool found;
Oid distributedTableId = InvalidOid;
ShardIdHashEntry *entry = (ShardIdHashEntry *) hash_search(shardToDistributedTableMap,
&shardId,
HASH_FIND | HASH_ENTER,
HASH_ENTER,
&found);
if (found)
{
@ -99,24 +231,6 @@ LookupDistributedTableIdForShardId(Oid shardId, bool *isReferenceTable)
}
/*
* InitCDCDecoder is called by from the shard split decoder plugin's init function.
* It sets the call back function for filtering out changes originated from other nodes.
* It also sets the call back function for processing the changes in ouputPluginChangeCB.
* This function is common for both CDC and shard split decoder plugins.
*/
void
InitCDCDecoder(OutputPluginCallbacks *cb, LogicalDecodeChangeCB changeCB)
{
elog(LOG, "Initializing CDC decoder");
cb->filter_by_origin_cb = replication_origin_filter_cb;
ouputPluginChangeCB = changeCB;
/* Initialize the hash table used for mapping shard to shell tables. */
InitShardToDistributedTableMap();
}
/*
* replication_origin_filter_cb call back function filters out publication of changes
* originated from any other node other than the current node. This is
@ -166,7 +280,7 @@ TranslateAndPublishRelationForCDC(LogicalDecodingContext *ctx, ReorderBufferTXN
* the changes as the change for the distributed table instead of shard.
* If not, it returns false. It also skips the Citus metadata tables.
*/
void
static void
PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
@ -179,7 +293,7 @@ PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *tx
}
/* Check if the relation is a distributed table by checking for shard name. */
uint64 shardId = ExtractShardIdFromTableName(shardRelationName, true);
uint64 shardId = CdcExtractShardIdFromTableName(shardRelationName, true);
/* If this relation is not distributed, call the pgoutput's callback and return. */
if (shardId == INVALID_SHARD_ID)
@ -197,7 +311,7 @@ PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *tx
}
/* Publish changes for reference table only from the coordinator node. */
if (isReferenceTable && !IsCoordinator())
if (isReferenceTable && !CdcIsCoordinator())
{
return;
}
@ -247,14 +361,12 @@ GetTupleForTargetSchemaForCdc(HeapTuple sourceRelationTuple,
targetNulls[targetIndex] = true;
targetIndex++;
}
/* If this source attribute has been dropped, just skip this source attribute.*/
else if (TupleDescAttr(sourceRelDesc, sourceIndex)->attisdropped)
{
sourceIndex++;
continue;
}
/* If both source and target attributes are not dropped, add the attribute field to targetValues. */
else if (sourceIndex < sourceRelDesc->natts)
{

View File

@ -0,0 +1,432 @@
/*-------------------------------------------------------------------------
*
* cdc_decoder_utils.c
* CDC Decoder plugin utility functions for Citus
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "commands/extension.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "common/hashfn.h"
#include "common/string.h"
#include "utils/fmgroids.h"
#include "utils/typcache.h"
#include "utils/lsyscache.h"
#include "catalog/pg_namespace.h"
#include "cdc_decoder_utils.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/relay_utility.h"
static int32 LocalGroupId = -1;
static Oid PgDistLocalGroupRelationId = InvalidOid;
static Oid PgDistShardRelationId = InvalidOid;
static Oid PgDistShardShardidIndexId = InvalidOid;
static Oid PgDistPartitionRelationId = InvalidOid;
static Oid PgDistPartitionLogicalrelidIndexId = InvalidOid;
static bool IsCitusExtensionLoaded = false;
#define COORDINATOR_GROUP_ID 0
#define InvalidRepOriginId 0
#define Anum_pg_dist_local_groupid 1
#define GROUP_ID_UPGRADING -2
static Oid DistLocalGroupIdRelationId(void);
static int32 CdcGetLocalGroupId(void);
static HeapTuple CdcPgDistPartitionTupleViaCatalog(Oid relationId);
/*
* DistLocalGroupIdRelationId returns the relation id of the pg_dist_local_group
*/
static Oid
DistLocalGroupIdRelationId(void)
{
if (PgDistLocalGroupRelationId == InvalidOid)
{
PgDistLocalGroupRelationId = get_relname_relid("pg_dist_local_group",
PG_CATALOG_NAMESPACE);
}
return PgDistLocalGroupRelationId;
}
/*
* DistShardRelationId returns the relation id of the pg_dist_shard
*/
static Oid
DistShardRelationId(void)
{
if (PgDistShardRelationId == InvalidOid)
{
PgDistShardRelationId = get_relname_relid("pg_dist_shard", PG_CATALOG_NAMESPACE);
}
return PgDistShardRelationId;
}
/*
* DistShardRelationId returns the relation id of the pg_dist_shard
*/
static Oid
DistShardShardidIndexId(void)
{
if (PgDistShardShardidIndexId == InvalidOid)
{
PgDistShardShardidIndexId = get_relname_relid("pg_dist_shard_shardid_index",
PG_CATALOG_NAMESPACE);
}
return PgDistShardShardidIndexId;
}
/*
* DistShardRelationId returns the relation id of the pg_dist_shard
*/
static Oid
DistPartitionRelationId(void)
{
if (PgDistPartitionRelationId == InvalidOid)
{
PgDistPartitionRelationId = get_relname_relid("pg_dist_partition",
PG_CATALOG_NAMESPACE);
}
return PgDistPartitionRelationId;
}
static Oid
DistPartitionLogicalRelidIndexId(void)
{
if (PgDistPartitionLogicalrelidIndexId == InvalidOid)
{
PgDistPartitionLogicalrelidIndexId = get_relname_relid(
"pg_dist_partition_logicalrelid_index", PG_CATALOG_NAMESPACE);
}
return PgDistPartitionLogicalrelidIndexId;
}
/*
* CdcIsCoordinator function returns true if this node is identified as the
* schema/coordinator/master node of the cluster.
*/
bool
CdcIsCoordinator(void)
{
return (CdcGetLocalGroupId() == COORDINATOR_GROUP_ID);
}
/*
* CdcCitusHasBeenLoaded function returns true if the citus extension has been loaded.
*/
bool
CdcCitusHasBeenLoaded()
{
if (!IsCitusExtensionLoaded)
{
IsCitusExtensionLoaded = (get_extension_oid("citus", true) != InvalidOid);
}
return IsCitusExtensionLoaded;
}
/*
* ExtractShardIdFromTableName tries to extract shard id from the given table name,
* and returns the shard id if table name is formatted as shard name.
* Else, the function returns INVALID_SHARD_ID.
*/
uint64
CdcExtractShardIdFromTableName(const char *tableName, bool missingOk)
{
char *shardIdStringEnd = NULL;
/* find the last underscore and increment for shardId string */
char *shardIdString = strrchr(tableName, SHARD_NAME_SEPARATOR);
if (shardIdString == NULL && !missingOk)
{
ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"",
tableName)));
}
else if (shardIdString == NULL && missingOk)
{
return INVALID_SHARD_ID;
}
shardIdString++;
errno = 0;
uint64 shardId = strtoull(shardIdString, &shardIdStringEnd, 0);
if (errno != 0 || (*shardIdStringEnd != '\0'))
{
if (!missingOk)
{
ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"",
tableName)));
}
else
{
return INVALID_SHARD_ID;
}
}
return shardId;
}
/*
* CdcGetLocalGroupId returns the group identifier of the local node. The function assumes
* that pg_dist_local_node_group has exactly one row and has at least one column.
* Otherwise, the function errors out.
*/
static int32
CdcGetLocalGroupId(void)
{
ScanKeyData scanKey[1];
int scanKeyCount = 0;
int32 groupId = 0;
/*
* Already set the group id, no need to read the heap again.
*/
if (LocalGroupId != -1)
{
return LocalGroupId;
}
Oid localGroupTableOid = DistLocalGroupIdRelationId();
if (localGroupTableOid == InvalidOid)
{
return 0;
}
Relation pgDistLocalGroupId = table_open(localGroupTableOid, AccessShareLock);
SysScanDesc scanDescriptor = systable_beginscan(pgDistLocalGroupId,
InvalidOid, false,
NULL, scanKeyCount, scanKey);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistLocalGroupId);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
bool isNull = false;
Datum groupIdDatum = heap_getattr(heapTuple,
Anum_pg_dist_local_groupid,
tupleDescriptor, &isNull);
groupId = DatumGetInt32(groupIdDatum);
/* set the local cache variable */
LocalGroupId = groupId;
}
else
{
/*
* Upgrade is happening. When upgrading postgres, pg_dist_local_group is
* temporarily empty before citus_finish_pg_upgrade() finishes execution.
*/
groupId = GROUP_ID_UPGRADING;
}
systable_endscan(scanDescriptor);
table_close(pgDistLocalGroupId, AccessShareLock);
return groupId;
}
/*
* CdcLookupShardRelationFromCatalog returns the logical relation oid a shard belongs to.
*
* Errors out if the shardId does not exist and missingOk is false.
* Returns InvalidOid if the shardId does not exist and missingOk is true.
*/
Oid
CdcLookupShardRelationFromCatalog(int64 shardId, bool missingOk)
{
ScanKeyData scanKey[1];
int scanKeyCount = 1;
Form_pg_dist_shard shardForm = NULL;
Relation pgDistShard = table_open(DistShardRelationId(), AccessShareLock);
Oid relationId = InvalidOid;
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistShard,
DistShardShardidIndexId(), true,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple) && !missingOk)
{
ereport(ERROR, (errmsg("could not find valid entry for shard "
UINT64_FORMAT, shardId)));
}
if (!HeapTupleIsValid(heapTuple))
{
relationId = InvalidOid;
}
else
{
shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple);
relationId = shardForm->logicalrelid;
}
systable_endscan(scanDescriptor);
table_close(pgDistShard, NoLock);
return relationId;
}
/*
* CdcPgDistPartitionTupleViaCatalog is a helper function that searches
* pg_dist_partition for the given relationId. The caller is responsible
* for ensuring that the returned heap tuple is valid before accessing
* its fields.
*/
static HeapTuple
CdcPgDistPartitionTupleViaCatalog(Oid relationId)
{
const int scanKeyCount = 1;
ScanKeyData scanKey[1];
bool indexOK = true;
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionLogicalRelidIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
HeapTuple partitionTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(partitionTuple))
{
/* callers should have the tuple in their memory contexts */
partitionTuple = heap_copytuple(partitionTuple);
}
systable_endscan(scanDescriptor);
table_close(pgDistPartition, AccessShareLock);
return partitionTuple;
}
/*
* CdcPartitionMethodViaCatalog gets a relationId and returns the partition
* method column from pg_dist_partition via reading from catalog.
*/
char
CdcPartitionMethodViaCatalog(Oid relationId)
{
HeapTuple partitionTuple = CdcPgDistPartitionTupleViaCatalog(relationId);
if (!HeapTupleIsValid(partitionTuple))
{
return DISTRIBUTE_BY_INVALID;
}
Datum datumArray[Natts_pg_dist_partition];
bool isNullArray[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
if (isNullArray[Anum_pg_dist_partition_partmethod - 1])
{
/* partition method cannot be NULL, still let's make sure */
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
return DISTRIBUTE_BY_INVALID;
}
Datum partitionMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1];
char partitionMethodChar = DatumGetChar(partitionMethodDatum);
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
return partitionMethodChar;
}
/*
* RemoveCitusDecodersFromPaths removes a path ending in citus_decoders
* from the given input paths.
*/
char *
RemoveCitusDecodersFromPaths(char *paths)
{
if (strlen(paths) == 0)
{
/* dynamic_library_path is empty */
return paths;
}
StringInfo newPaths = makeStringInfo();
char *remainingPaths = paths;
for (;;)
{
int pathLength = 0;
char *pathStart = first_path_var_separator(remainingPaths);
if (pathStart == remainingPaths)
{
/*
* This will error out in find_in_dynamic_libpath, return
* original value here.
*/
return paths;
}
else if (pathStart == NULL)
{
/* final path */
pathLength = strlen(remainingPaths);
}
else
{
/* more paths remaining */
pathLength = pathStart - remainingPaths;
}
char *currentPath = palloc(pathLength + 1);
strlcpy(currentPath, remainingPaths, pathLength + 1);
canonicalize_path(currentPath);
if (!pg_str_endswith(currentPath, "/citus_decoders"))
{
appendStringInfo(newPaths, "%s%s", newPaths->len > 0 ? ":" : "", currentPath);
}
if (remainingPaths[pathLength] == '\0')
{
/* end of string */
break;
}
remainingPaths += pathLength + 1;
}
return newPaths->data;
}

View File

@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
*
* cdc_decoder..h
* cdc_decoder_utils.h
* Utility functions and declerations for cdc decoder.
*
* Copyright (c) Citus Data, Inc.
@ -14,14 +14,21 @@
#include "postgres.h"
#include "fmgr.h"
#include "replication/logical.h"
#include "c.h"
void PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
void InitCDCDecoder(OutputPluginCallbacks *cb, LogicalDecodeChangeCB changeCB);
/* used in the replication_origin_filter_cb function. */
#define InvalidRepOriginId 0
#define INVALID_SHARD_ID 0
#endif /* CITUS_CDC_DECODER_H */
bool CdcIsCoordinator(void);
uint64 CdcExtractShardIdFromTableName(const char *tableName, bool missingOk);
Oid CdcLookupShardRelationFromCatalog(int64 shardId, bool missingOk);
char CdcPartitionMethodViaCatalog(Oid relationId);
bool CdcCitusHasBeenLoaded(void);
char * RemoveCitusDecodersFromPaths(char *paths);
#endif /* CITUS_CDC_DECODER_UTILS_H */

View File

@ -8,7 +8,6 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/cdc_decoder.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/worker_shard_visibility.h"
@ -21,14 +20,18 @@
#include "catalog/pg_namespace.h"
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
static LogicalDecodeChangeCB ouputPluginChangeCB;
static LogicalDecodeChangeCB pgOutputPluginChangeCB;
#define InvalidRepOriginId 0
static HTAB *SourceToDestinationShardMap = NULL;
static bool replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId
origin_id);
/* Plugin callback */
static void shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
static void shard_split_change_cb(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
/* Helper methods */
static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation,
@ -44,22 +47,6 @@ static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
TupleDesc sourceTupleDesc,
TupleDesc targetTupleDesc);
inline static bool IsShardSplitSlot(char *replicationSlotName);
#define CITUS_SHARD_SLOT_PREFIX "citus_shard_"
#define CITUS_SHARD_SLOT_PREFIX_SIZE (sizeof(CITUS_SHARD_SLOT_PREFIX) - 1)
/* build time macro for base decoder plugin name for CDC and Shard Split. */
#ifndef CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME
#define CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME "pgoutput"
#endif
/* build time macro for base decoder plugin's initialization function name for CDC and Shard Split. */
#ifndef CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME
#define CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME "_PG_output_plugin_init"
#endif
/*
* Postgres uses 'pgoutput' as default plugin for logical replication.
* We want to reuse Postgres pgoutput's functionality as much as possible.
@ -70,8 +57,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
{
LogicalOutputPluginInit plugin_init =
(LogicalOutputPluginInit) (void *)
load_external_function(CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_NAME,
CDC_SHARD_SPLIT_BASE_DECODER_PLUGIN_INIT_FUNCTION_NAME,
load_external_function("pgoutput",
"_PG_output_plugin_init",
false, NULL);
if (plugin_init == NULL)
@ -83,31 +70,33 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
plugin_init(cb);
/* actual pgoutput callback will be called with the appropriate destination shard */
ouputPluginChangeCB = cb->change_cb;
cb->change_cb = shard_split_and_cdc_change_cb;
InitCDCDecoder(cb, ouputPluginChangeCB);
pgOutputPluginChangeCB = cb->change_cb;
cb->change_cb = shard_split_change_cb;
cb->filter_by_origin_cb = replication_origin_filter_cb;
}
/*
* Check if the replication slot is for Shard split by checking for prefix.
* replication_origin_filter_cb call back function filters out publication of changes
* originated from any other node other than the current node. This is
* identified by the "origin_id" of the changes. The origin_id is set to
* a non-zero value in the origin node as part of WAL replication for internal
* operations like shard split/moves/create_distributed_table etc.
*/
inline static
bool
IsShardSplitSlot(char *replicationSlotName)
static bool
replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
return strncmp(replicationSlotName, CITUS_SHARD_SLOT_PREFIX,
CITUS_SHARD_SLOT_PREFIX_SIZE) == 0;
return (origin_id != InvalidRepOriginId);
}
/*
* shard_split_and_cdc_change_cb function emits the incoming tuple change
* shard_split_change_cb function emits the incoming tuple change
* to the appropriate destination shard.
*/
static void
shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
/*
* If Citus has not been loaded yet, pass the changes
@ -115,7 +104,7 @@ shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn
*/
if (!CitusHasBeenLoaded())
{
ouputPluginChangeCB(ctx, txn, relation, change);
pgOutputPluginChangeCB(ctx, txn, relation, change);
return;
}
@ -132,13 +121,6 @@ shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn
return;
}
/* check for the internal shard split names, if not, assume the slot is for CDC. */
if (!IsShardSplitSlot(replicationSlotName))
{
PublishDistributedTableChanges(ctx, txn, relation, change);
return;
}
/*
* Initialize SourceToDestinationShardMap if not already initialized.
* This gets initialized during the replication of first message.
@ -257,7 +239,7 @@ shard_split_and_cdc_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn
}
}
ouputPluginChangeCB(ctx, txn, targetRelation, change);
pgOutputPluginChangeCB(ctx, txn, targetRelation, change);
RelationClose(targetRelation);
}

View File

@ -136,6 +136,8 @@ ReadColumnarOptions_type extern_ReadColumnarOptions = NULL;
CppConcat(extern_, funcname) = \
(typename) (void *) lookup_external_function(handle, # funcname)
#define CDC_DECODER_DYNAMIC_LIB_PATH "$libdir/citus_decoders:$libdir"
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(columnar_handler)
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(alter_columnar_table_set)
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(alter_columnar_table_reset)
@ -207,7 +209,7 @@ static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSour
source);
static void CitusAuthHook(Port *port, int status);
static bool IsSuperuser(char *userName);
static void AdjustDynamicLibraryPathForCdcDecoders(void);
static ClientAuthentication_hook_type original_client_auth_hook = NULL;
@ -475,6 +477,17 @@ _PG_init(void)
InitializeLocallyReservedSharedConnections();
InitializeClusterClockMem();
/*
* Adjust the Dynamic Library Path to prepend citus_decodes to the dynamic
* library path. This is needed to make sure that the citus decoders are
* loaded before the default decoders for CDC.
*/
if (EnableChangeDataCapture)
{
AdjustDynamicLibraryPathForCdcDecoders();
}
/* initialize shard split shared memory handle management */
InitializeShardSplitSMHandleManagement();
@ -542,6 +555,22 @@ _PG_init(void)
}
/*
* PrependCitusDecodersToDynamicLibrayPath prepends the $libdir/citus_decoders
* to the dynamic library path. This is needed to make sure that the citus
* decoders are loaded before the default decoders for CDC.
*/
static void
AdjustDynamicLibraryPathForCdcDecoders(void)
{
if (strcmp(Dynamic_library_path, "$libdir") == 0)
{
SetConfigOption("dynamic_library_path", CDC_DECODER_DYNAMIC_LIB_PATH,
PGC_POSTMASTER, PGC_S_OVERRIDE);
}
}
#if PG_VERSION_NUM >= PG_VERSION_15
/*

View File

@ -1,2 +1 @@
shared_preload_libraries=citus
shared_preload_libraries='citus'

View File

@ -34,6 +34,8 @@ my $initial_schema = "
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
$node_coordinator->safe_psql('postgres',$initial_schema);
$node_coordinator->safe_psql('postgres','ALTER TABLE sensors REPLICA IDENTITY FULL;');
$node_cdc_client->safe_psql('postgres',$initial_schema);
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
@ -93,5 +95,15 @@ wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers)
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
is($result, 1, 'CDC basic test - distributed table delete data');
$node_coordinator->safe_psql('postgres',"TRUNCATE sensors;");
# Wait for the data changes to be replicated to the cdc client node.
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
# Compare the data in the coordinator and cdc client nodes.
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
is($result, 1, 'CDC basic test - distributed table delete data');
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
done_testing();

View File

@ -41,6 +41,7 @@ my $initial_schema = "
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
$node_coordinator->safe_psql('postgres',$initial_schema);
$node_coordinator->safe_psql('postgres','ALTER TABLE sensors REPLICA IDENTITY FULL;');
$node_cdc_client->safe_psql('postgres',$initial_schema);
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');

View File

@ -14,7 +14,7 @@ my $result = 0;
### Create the citus cluster with coordinator and two worker nodes
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);";
my $command = "UPDATE pg_dist_node SET shouldhaveshards = true;";
$node_coordinator->safe_psql('postgres',$command);
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);

View File

@ -17,7 +17,7 @@ citus.shard_replication_factor = 1
### Create the citus cluster with coordinator and two worker nodes
our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config);
my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);";
my $command = "UPDATE pg_dist_node SET shouldhaveshards = true;";
$node_coordinator->safe_psql('postgres',$command);
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);

View File

@ -17,7 +17,7 @@ citus.shard_replication_factor = 1
### Create the citus cluster with coordinator and two worker nodes
our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config);
my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);";
my $command = "UPDATE pg_dist_node SET shouldhaveshards = true;";
$node_coordinator->safe_psql('postgres',$command);
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);

View File

@ -19,7 +19,7 @@ citus.shard_replication_factor = 1
### Create the citus cluster with coordinator and two worker nodes
our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config);
my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);";
my $command = "UPDATE pg_dist_node SET shouldhaveshards = true;";
$node_coordinator->safe_psql('postgres',$command);
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);

View File

@ -18,7 +18,7 @@ citus.shard_replication_factor = 1
### Create the citus cluster with coordinator and two worker nodes
our ($node_coordinator, @workers) = create_citus_cluster(1,"localhost",57636, $citus_config);
my $command = "SELECT citus_set_node_property('localhost', 57636, 'shouldhaveshards', true);";
my $command = "UPDATE pg_dist_node SET shouldhaveshards = true;";
$node_coordinator->safe_psql('postgres',$command);
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);

View File

@ -0,0 +1,89 @@
# Schema change CDC test for Citus
use strict;
use warnings;
use Test::More;
use lib './t';
use cdctestlib;
# Initialize co-ordinator node
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
my $select_stmt_after_drop = qq(SELECT measureid, eventdatetime, measure_data, meaure_quantity, measure_status FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
my $result = 0;
### Create the citus cluster with coordinator and two worker nodes
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
print("coordinator port: " . $node_coordinator->port() . "\n");
print("worker0 port:" . $workers[0]->port() . "\n");
print("worker1 port:" . $workers[1]->port() . "\n");
print("cdc_client port:" .$node_cdc_client->port() . "\n");
# Creeate the sensors table and ndexes.
my $initial_schema = "
CREATE TABLE sensors(
measureid integer,
eventdatetime timestamptz,
measure_data jsonb,
meaure_quantity decimal(15, 2),
measure_status char(1),
measure_comment varchar(44),
PRIMARY KEY (measureid, eventdatetime, measure_data));
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
$node_coordinator->safe_psql('postgres',$initial_schema);
$node_coordinator->safe_psql('postgres','ALTER TABLE sensors REPLICA IDENTITY FULL;');
$node_cdc_client->safe_psql('postgres',$initial_schema);
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
#insert data into the sensors table in the coordinator node before distributing the table.
$node_coordinator->safe_psql('postgres',"
INSERT INTO sensors
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
FROM generate_series(0,100)i;");
$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');");
#connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
create_cdc_slots_for_workers(\@workers);
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
is($result, 1, 'CDC create_distributed_table - schema change before move');
my $shard_id = $workers[1]->safe_psql('postgres',
"SELECT shardid FROM citus_shards ORDER BY shardid LIMIT 1;");
my $shard_to_drop_column = "sensors_" . $shard_id;
$workers[1]->safe_psql('postgres',"ALTER TABLE $shard_to_drop_column DROP COLUMN measure_comment;");
$workers[1]->safe_psql('postgres',"
INSERT INTO sensors
SELECT i, '2020-01-05', '{}', 11011.10, 'A'
FROM generate_series(-10,-1)i;");
wait_for_cdc_client_to_catch_up_with_workers(\@workers);
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt_after_drop);
is($result, 1, 'CDC create_distributed_table - schema change and move shard');
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
done_testing();

View File

@ -0,0 +1,90 @@
# Schema change CDC test for Citus
use strict;
use warnings;
use Test::More;
use lib './t';
use cdctestlib;
# Initialize co-ordinator node
my $select_stmt = qq(SELECT * FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
my $select_stmt_after_drop = qq(SELECT measureid, eventdatetime, measure_data, meaure_quantity, measure_status FROM sensors ORDER BY measureid, eventdatetime, measure_data;);
my $result = 0;
### Create the citus cluster with coordinator and two worker nodes
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
print("coordinator port: " . $node_coordinator->port() . "\n");
print("worker0 port:" . $workers[0]->port() . "\n");
print("worker1 port:" . $workers[1]->port() . "\n");
print("cdc_client port:" .$node_cdc_client->port() . "\n");
# Creeate the sensors table and ndexes.
my $initial_schema = "
CREATE TABLE sensors(
measureid integer,
eventdatetime timestamptz,
measure_data jsonb,
meaure_quantity decimal(15, 2),
measure_status char(1),
measure_comment varchar(44),
PRIMARY KEY (measureid, eventdatetime, measure_data));
CREATE INDEX index_on_sensors ON sensors(lower(measureid::text));
ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000;
CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed'));
CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status);
CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors;";
my $shard_like_table_schema = "
CREATE TABLE data_100008(
id integer,
data integer,
PRIMARY KEY (data));";
$node_coordinator->safe_psql('postgres',$initial_schema);
$node_coordinator->safe_psql('postgres',$shard_like_table_schema);
$node_coordinator->safe_psql('postgres','ALTER TABLE sensors REPLICA IDENTITY FULL;');
$node_cdc_client->safe_psql('postgres',$initial_schema);
$node_cdc_client->safe_psql('postgres',$shard_like_table_schema);
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'sensors,data_100008');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
#insert data into the sensors table in the coordinator node before distributing the table.
$node_coordinator->safe_psql('postgres',"
INSERT INTO sensors
SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus'
FROM generate_series(0,100)i;");
$node_coordinator->safe_psql('postgres',"SET citus.shard_count = 2; SELECT create_distributed_table_concurrently('sensors', 'measureid');");
#connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
create_cdc_slots_for_workers(\@workers);
connect_cdc_client_to_workers_publication(\@workers, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_citus_cluster($node_coordinator, \@workers);
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
is($result, 1, 'CDC create_distributed_table - basic test');
$workers[1]->safe_psql('postgres',$shard_like_table_schema);
$workers[1]->safe_psql('postgres','\d');
$workers[1]->safe_psql('postgres',"
INSERT INTO data_100008
SELECT i, i*10
FROM generate_series(-10,10)i;");
wait_for_cdc_client_to_catch_up_with_workers(\@workers);
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt_after_drop);
is($result, 1, 'CDC create_distributed_table - normal table with name like shard');
drop_cdc_client_subscriptions($node_cdc_client,\@workers);
done_testing();

View File

@ -0,0 +1,53 @@
# Schema change CDC test for Citus
use strict;
use warnings;
use Test::More;
use lib './t';
use cdctestlib;
# Initialize co-ordinator node
my $select_stmt = qq(SELECT * FROM data_100008 ORDER BY id;);
my $result = 0;
### Create the citus cluster with coordinator and two worker nodes
our ($node_coordinator, @workers) = create_citus_cluster(2,"localhost",57636);
our $node_cdc_client = create_node('cdc_client', 0, "localhost", 57639);
print("coordinator port: " . $node_coordinator->port() . "\n");
print("worker0 port:" . $workers[0]->port() . "\n");
print("worker1 port:" . $workers[1]->port() . "\n");
print("cdc_client port:" .$node_cdc_client->port() . "\n");
my $initial_schema = "
CREATE TABLE data_100008(
id integer,
data integer,
PRIMARY KEY (data));";
$node_coordinator->safe_psql('postgres','DROP extension citus');
$node_coordinator->safe_psql('postgres',$initial_schema);
$node_coordinator->safe_psql('postgres','ALTER TABLE data_100008 REPLICA IDENTITY FULL;');
$node_cdc_client->safe_psql('postgres',$initial_schema);
create_cdc_publication_and_slots_for_coordinator($node_coordinator,'data_100008');
connect_cdc_client_to_coordinator_publication($node_coordinator, $node_cdc_client);
wait_for_cdc_client_to_catch_up_with_coordinator($node_coordinator);
#insert data into the sensors table in the coordinator node before distributing the table.
$node_coordinator->safe_psql('postgres',"
INSERT INTO data_100008
SELECT i, i*10
FROM generate_series(-10,10)i;");
$result = compare_tables_in_different_nodes($node_coordinator,$node_cdc_client,'postgres',$select_stmt);
is($result, 1, 'CDC create_distributed_table - basic test');
$node_cdc_client->safe_psql('postgres',"drop subscription cdc_subscription");
done_testing();

View File

@ -207,7 +207,7 @@ sub create_cdc_publication_and_slots_for_coordinator {
$node_coordinator->safe_psql('postgres',"DROP PUBLICATION IF EXISTS cdc_publication;");
}
$node_coordinator->safe_psql('postgres',"CREATE PUBLICATION cdc_publication FOR TABLE $table_names;");
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,false)");
$node_coordinator->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','pgoutput',false)");
}
sub create_cdc_slots_for_workers {
@ -217,7 +217,7 @@ sub create_cdc_slots_for_workers {
if ($slot ne "") {
$_->safe_psql('postgres',"SELECT pg_catalog.pg_drop_replication_slot('cdc_replication_slot');");
}
$_->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','citus',false,true)");
$_->safe_psql('postgres',"SELECT pg_catalog.pg_create_logical_replication_slot('cdc_replication_slot','pgoutput',false)");
}
}