diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 4185576a6..661bb6be4 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -2019,7 +2019,7 @@ ExecuteSplitShardReleaseSharedMemory(MultiConnection *sourceConnection) * Array[ * ROW(sourceShardId, childFirstShardId, childFirstMinRange, childFirstMaxRange, worker1)::citus.split_shard_info, * ROW(sourceShardId, childSecondShardId, childSecondMinRange, childSecondMaxRange, worker2)::citus.split_shard_info - * ]); + * ], CurrentOperationId); */ StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, @@ -2080,8 +2080,10 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, StringInfo splitShardReplicationUDF = makeStringInfo(); appendStringInfo(splitShardReplicationUDF, - "SELECT * FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[%s]);", - splitChildrenRows->data); + "SELECT * FROM pg_catalog.worker_split_shard_replication_setup(" + "ARRAY[%s], %lu);", + splitChildrenRows->data, + CurrentOperationId); return splitShardReplicationUDF; } diff --git a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c index cd8c58426..85c2328c7 100644 --- a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c +++ b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c @@ -15,6 +15,7 @@ #include "distributed/distribution_column.h" #include "distributed/hash_helpers.h" #include "distributed/shardinterval_utils.h" +#include "distributed/shard_cleaner.h" #include "distributed/shard_utils.h" #include "distributed/shardsplit_shared_memory.h" #include "distributed/connection_management.h" @@ -49,10 +50,12 @@ static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit, int32 maxValue, int32 nodeId); static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo); -static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader); +static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, + OperationId operationId); static void ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, - TupleDesc tupleDescriptor); + TupleDesc tupleDescriptor, + OperationId operationId); /* * worker_split_shard_replication_setup UDF creates in-memory data structures @@ -104,6 +107,8 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) ereport(ERROR, (errmsg("Unexpectedly shard info array contains a null value"))); } + OperationId operationId = DatumGetUInt64(PG_GETARG_DATUM(1)); + /* SetupMap */ ShardInfoHashMap = CreateSimpleHash(NodeAndOwner, GroupedShardSplitInfos); @@ -142,14 +147,14 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) ShardSplitInfoSMHeader *splitShardInfoSMHeader = CreateSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle); - PopulateShardSplitInfoInSM(splitShardInfoSMHeader); + PopulateShardSplitInfoInSM(splitShardInfoSMHeader, operationId); /* store handle in statically allocated shared memory*/ StoreShardSplitSharedMemoryHandle(dsmHandle); TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); - ReturnReplicationSlotInfo(tupleStore, tupleDescriptor); + ReturnReplicationSlotInfo(tupleStore, tupleDescriptor, operationId); PG_RETURN_VOID(); } @@ -270,7 +275,8 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) * shardSplitInfoSMHeader - Shared memory header */ static void -PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) +PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, + OperationId operationId) { HASH_SEQ_STATUS status; hash_seq_init(&status, ShardInfoHashMap); @@ -281,8 +287,11 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) { uint32_t nodeId = entry->key.nodeId; uint32_t tableOwnerId = entry->key.tableOwnerId; - char *derivedSlotName = ReplicationSlotNameForNodeAndOwner(SHARD_SPLIT, nodeId, - tableOwnerId); + char *derivedSlotName = + ReplicationSlotNameForNodeAndOwnerForOperation(SHARD_SPLIT, + nodeId, + tableOwnerId, + operationId); List *shardSplitInfoList = entry->shardSplitInfoList; ShardSplitInfo *splitShardInfo = NULL; @@ -370,8 +379,9 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, * part of non-blocking split workflow. */ static void -ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, TupleDesc - tupleDescriptor) +ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, + TupleDesc tupleDescriptor, + OperationId operationId) { HASH_SEQ_STATUS status; hash_seq_init(&status, ShardInfoHashMap); @@ -390,9 +400,11 @@ ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, TupleDesc char *tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false); values[1] = CStringGetTextDatum(tableOwnerName); - char *slotName = ReplicationSlotNameForNodeAndOwner(SHARD_SPLIT, - entry->key.nodeId, - entry->key.tableOwnerId); + char *slotName = + ReplicationSlotNameForNodeAndOwnerForOperation(SHARD_SPLIT, + entry->key.nodeId, + entry->key.tableOwnerId, + operationId); values[2] = CStringGetTextDatum(slotName); tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 0157bb545..a301367e1 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -477,9 +477,11 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList) target->newShards = NIL; target->subscriptionOwnerName = SubscriptionRoleName(SHARD_MOVE, ownerId); target->replicationSlot = palloc0(sizeof(ReplicationSlotInfo)); - target->replicationSlot->name = ReplicationSlotNameForNodeAndOwner(SHARD_MOVE, - nodeId, - ownerId); + target->replicationSlot->name = + ReplicationSlotNameForNodeAndOwnerForOperation(SHARD_MOVE, + nodeId, + ownerId, + CurrentOperationId); target->replicationSlot->targetNodeId = nodeId; target->replicationSlot->tableOwnerId = ownerId; logicalRepTargetList = lappend(logicalRepTargetList, target); @@ -1208,23 +1210,25 @@ ConflictWithIsolationTestingAfterCopy(void) char * PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId) { - return psprintf("%s%u_%u", publicationPrefix[type], nodeId, ownerId); + return psprintf("%s%u_%u_%lu", publicationPrefix[type], + nodeId, ownerId, CurrentOperationId); } /* - * ReplicationSlotNameForNodeAndOwner returns the name of the replication slot for the - * given node and table owner. + * ReplicationSlotNameForNodeAndOwnerForOperation returns the name of the + * replication slot for the given node, table owner and operation id. * * Note that PG15 introduced a new ReplicationSlotName function that caused name conflicts * and we renamed this function. */ char * -ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid ownerId) +ReplicationSlotNameForNodeAndOwnerForOperation(LogicalRepType type, uint32_t nodeId, + Oid ownerId, OperationId operationId) { StringInfo slotName = makeStringInfo(); - appendStringInfo(slotName, "%s%u_%u", replicationSlotPrefix[type], nodeId, - ownerId); + appendStringInfo(slotName, "%s%u_%u_%lu", replicationSlotPrefix[type], nodeId, + ownerId, operationId); if (slotName->len > NAMEDATALEN) { @@ -1243,7 +1247,8 @@ ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid own char * SubscriptionName(LogicalRepType type, Oid ownerId) { - return psprintf("%s%i", subscriptionPrefix[type], ownerId); + return psprintf("%s%i_%lu", subscriptionPrefix[type], + ownerId, CurrentOperationId); } @@ -1254,7 +1259,8 @@ SubscriptionName(LogicalRepType type, Oid ownerId) char * SubscriptionRoleName(LogicalRepType type, Oid ownerId) { - return psprintf("%s%i", subscriptionRolePrefix[type], ownerId); + return psprintf("%s%i_%lu", subscriptionRolePrefix[type], ownerId, + CurrentOperationId); } diff --git a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql index d40336103..1c059dac5 100644 --- a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql +++ b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql @@ -9,3 +9,4 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer) #include "udfs/citus_get_transaction_clock/11.2-1.sql" #include "udfs/citus_is_clock_after/11.2-1.sql" #include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql" +#include "udfs/worker_split_shard_replication_setup/11.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql index 7cdd51140..6031c2fd0 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql @@ -10,6 +10,9 @@ DROP SEQUENCE pg_catalog.pg_dist_clock_logical_seq; DROP OPERATOR CLASS pg_catalog.cluster_clock_ops USING btree CASCADE; DROP OPERATOR FAMILY pg_catalog.cluster_clock_ops USING btree CASCADE; DROP TYPE pg_catalog.cluster_clock CASCADE; +DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[], bigint); +DROP TYPE pg_catalog.replication_slot_info; +DROP TYPE pg_catalog.split_shard_info; CREATE FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer) RETURNS void @@ -17,3 +20,5 @@ CREATE FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, intege AS 'MODULE_PATHNAME', $$worker_append_table_to_shard$$; COMMENT ON FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer) IS 'append a regular table''s contents to the shard'; + +#include "../udfs/worker_split_shard_replication_setup/11.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.2-1.sql b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.2-1.sql new file mode 100644 index 000000000..2aa20d832 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.2-1.sql @@ -0,0 +1,11 @@ +DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]); + +CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( + splitShardInfo pg_catalog.split_shard_info[], operation_id bigint) +RETURNS setof pg_catalog.replication_slot_info +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; +COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[], operation_id bigint) + IS 'Replication setup for splitting a shard'; + +REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[], bigint) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql index 3ee131d45..2aa20d832 100644 --- a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql @@ -1,25 +1,11 @@ -CREATE TYPE citus.split_shard_info AS ( - source_shard_id bigint, - distribution_column text, - child_shard_id bigint, - shard_min_value text, - shard_max_value text, - node_id integer); -ALTER TYPE citus.split_shard_info SET SCHEMA pg_catalog; -COMMENT ON TYPE pg_catalog.split_shard_info - IS 'Stores split child shard information'; - -CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text); -ALTER TYPE citus.replication_slot_info SET SCHEMA pg_catalog; -COMMENT ON TYPE pg_catalog.replication_slot_info - IS 'Replication slot information to be used for subscriptions during non blocking shard split'; +DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]); CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( - splitShardInfo pg_catalog.split_shard_info[]) + splitShardInfo pg_catalog.split_shard_info[], operation_id bigint) RETURNS setof pg_catalog.replication_slot_info LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; -COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[]) +COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[], operation_id bigint) IS 'Replication setup for splitting a shard'; -REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]) FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[], bigint) FROM PUBLIC; diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 838236c79..f5a9dc342 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -17,6 +17,7 @@ #include "nodes/pg_list.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/shard_cleaner.h" /* Config variables managed via guc.c */ @@ -152,8 +153,10 @@ extern char * CreateReplicationSlots(MultiConnection *sourceConnection, extern void EnableSubscriptions(List *subscriptionInfoList); extern char * PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId); -extern char * ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid - ownerId); +extern char * ReplicationSlotNameForNodeAndOwnerForOperation(LogicalRepType type, + uint32_t nodeId, + Oid ownerId, + OperationId operationId); extern char * SubscriptionName(LogicalRepType type, Oid ownerId); extern char * SubscriptionRoleName(LogicalRepType type, Oid ownerId); diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index cd57a6673..3d251156c 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -45,8 +45,9 @@ s/truncate_trigger_[0-9]+/truncate_trigger_xxxxxxx/g # shard move subscription and publication names contain the oid of the # table owner, which can change across runs -s/(citus_shard_(move|split)_subscription_)[0-9]+/\1xxxxxxx/g -s/(citus_shard_(move|split)_(slot|publication)_)[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx/g +s/(citus_shard_(move|split)_subscription_role_)[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx/g +s/(citus_shard_(move|split)_subscription_)[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx/g +s/(citus_shard_(move|split)_(slot|publication)_)[0-9]+_[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx_xxxxxxx/g # In foreign_key_restriction_enforcement, normalize shard names s/"(on_update_fkey_table_|fkey_)[0-9]+"/"\1xxxxxxx"/g diff --git a/src/test/regress/expected/cpu_priority.out b/src/test/regress/expected/cpu_priority.out index 008eacafd..0c2412fdd 100644 --- a/src/test/regress/expected/cpu_priority.out +++ b/src/test/regress/expected/cpu_priority.out @@ -89,13 +89,13 @@ SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; -- PG14, beacuse PG13 doesn't support binary logical replication. SET citus.enable_binary_protocol = false; SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx master_move_shard_placement --------------------------------------------------------------------- @@ -104,13 +104,13 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.cpu_priority_for_logical_replication_senders = 15; SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx master_move_shard_placement --------------------------------------------------------------------- @@ -119,13 +119,13 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.max_high_priority_background_processes = 3; SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx master_move_shard_placement --------------------------------------------------------------------- @@ -141,21 +141,21 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY['-1500000000'], ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx citus_split_shard_by_split_points --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_online_move_shard_placement.out b/src/test/regress/expected/failure_online_move_shard_placement.out index 71a3b2527..1bb9841ab 100644 --- a/src/test/regress/expected/failure_online_move_shard_placement.out +++ b/src/test/regress/expected/failure_online_move_shard_placement.out @@ -126,8 +126,8 @@ WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx -WARNING: role "citus_shard_move_subscription_role_10" cannot be dropped because some objects depend on it -DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx +WARNING: role "citus_shard_move_subscription_role_xxxxxxx_xxxxxxx" cannot be dropped because some objects depend on it +DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx_xxxxxxx CONTEXT: while executing command on localhost:xxxxx ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx @@ -202,8 +202,8 @@ WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx -WARNING: role "citus_shard_move_subscription_role_10" cannot be dropped because some objects depend on it -DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx +WARNING: role "citus_shard_move_subscription_role_xxxxxxx_xxxxxxx" cannot be dropped because some objects depend on it +DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx_xxxxxxx CONTEXT: while executing command on localhost:xxxxx master_move_shard_placement --------------------------------------------------------------------- @@ -316,7 +316,7 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup -SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_xxxxxxx$$); +SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_xxxxxxx_xxxxxxx$$); run_command_on_workers --------------------------------------------------------------------- (localhost,9060,t,"DROP SUBSCRIPTION") diff --git a/src/test/regress/expected/failure_split_cleanup.out b/src/test/regress/expected/failure_split_cleanup.out index ce06f4978..9e8bb17d2 100644 --- a/src/test/regress/expected/failure_split_cleanup.out +++ b/src/test/regress/expected/failure_split_cleanup.out @@ -57,7 +57,7 @@ CONTEXT: while executing command on localhost:xxxxx --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 (3 rows) -- we need to allow connection so that we can connect to proxy @@ -160,8 +160,8 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 (4 rows) -- we need to allow connection so that we can connect to proxy @@ -185,10 +185,10 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes -- Left over publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over replication slots @@ -271,9 +271,9 @@ CONTEXT: while executing command on localhost:xxxxx --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 (5 rows) -- we need to allow connection so that we can connect to proxy @@ -297,10 +297,10 @@ CONTEXT: while executing command on localhost:xxxxx -- Left over publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over replication slots @@ -383,12 +383,12 @@ CONTEXT: while executing command on localhost:xxxxx --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 2 | 0 (8 rows) -- we need to allow connection so that we can connect to proxy @@ -412,25 +412,25 @@ CONTEXT: while executing command on localhost:xxxxx -- Left over publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over replication slots SELECT slot_name FROM pg_replication_slots; - slot_name + slot_name --------------------------------------------------------------------- - citus_shard_split_slot_xxxxxxx_xxxxxxx - citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over subscriptions SELECT subname FROM pg_subscription; - subname + subname --------------------------------------------------------------------- - citus_shard_split_subscription_xxxxxxx + citus_shard_split_subscription_xxxxxxx_xxxxxxx (1 row) \c - postgres - :master_port @@ -501,12 +501,12 @@ CONTEXT: while executing command on localhost:xxxxx --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 2 | 0 (8 rows) -- we need to allow connection so that we can connect to proxy @@ -530,25 +530,25 @@ CONTEXT: while executing command on localhost:xxxxx -- Left over publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over replication slots SELECT slot_name FROM pg_replication_slots; - slot_name + slot_name --------------------------------------------------------------------- - citus_shard_split_slot_xxxxxxx_xxxxxxx - citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over subscriptions SELECT subname FROM pg_subscription; - subname + subname --------------------------------------------------------------------- - citus_shard_split_subscription_xxxxxxx + citus_shard_split_subscription_xxxxxxx_xxxxxxx (1 row) \c - postgres - :master_port @@ -620,12 +620,12 @@ CONTEXT: while executing command on localhost:xxxxx --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 2 | 0 (8 rows) SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; @@ -653,25 +653,25 @@ CONTEXT: while executing command on localhost:xxxxx -- Left over publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over replication slots SELECT slot_name FROM pg_replication_slots; - slot_name + slot_name --------------------------------------------------------------------- - citus_shard_split_slot_xxxxxxx_xxxxxxx - citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over subscriptions SELECT subname FROM pg_subscription; - subname + subname --------------------------------------------------------------------- - citus_shard_split_subscription_xxxxxxx + citus_shard_split_subscription_xxxxxxx_xxxxxxx (1 row) \c - postgres - :master_port diff --git a/src/test/regress/expected/isolation_non_blocking_shard_split.out b/src/test/regress/expected/isolation_non_blocking_shard_split.out index 5c8b27213..cd0dbdbe1 100644 --- a/src/test/regress/expected/isolation_non_blocking_shard_split.out +++ b/src/test/regress/expected/isolation_non_blocking_shard_split.out @@ -982,20 +982,20 @@ run_try_drop_marked_resources step s1-show-pg_dist_cleanup: SELECT object_name, object_type, policy_type FROM pg_dist_cleanup; -object_name |object_type|policy_type +object_name |object_type|policy_type --------------------------------------------------------------------- -public.to_split_table_1500002 | 1| 1 -public.to_split_table_1500003 | 1| 1 -public.to_split_table_1500001 | 1| 0 -public.to_split_table_1500003 | 1| 0 -citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 -citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 -citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 -citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 -citus_shard_split_subscription_role_10| 5| 0 -citus_shard_split_subscription_xxxxxxx | 2| 0 -citus_shard_split_subscription_role_10| 5| 0 -citus_shard_split_subscription_xxxxxxx | 2| 0 +public.to_split_table_1500002 | 1| 1 +public.to_split_table_1500003 | 1| 1 +public.to_split_table_1500001 | 1| 0 +public.to_split_table_1500003 | 1| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0 +citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0 +citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0 +citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0 (12 rows) step s1-release-split-advisory-lock: @@ -1060,20 +1060,20 @@ run_try_drop_marked_resources step s1-show-pg_dist_cleanup: SELECT object_name, object_type, policy_type FROM pg_dist_cleanup; -object_name |object_type|policy_type +object_name |object_type|policy_type --------------------------------------------------------------------- -public.to_split_table_1500002 | 1| 1 -public.to_split_table_1500003 | 1| 1 -public.to_split_table_1500001 | 1| 0 -public.to_split_table_1500003 | 1| 0 -citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 -citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 -citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 -citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 -citus_shard_split_subscription_role_10| 5| 0 -citus_shard_split_subscription_xxxxxxx | 2| 0 -citus_shard_split_subscription_role_10| 5| 0 -citus_shard_split_subscription_xxxxxxx | 2| 0 +public.to_split_table_1500002 | 1| 1 +public.to_split_table_1500003 | 1| 1 +public.to_split_table_1500001 | 1| 0 +public.to_split_table_1500003 | 1| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0 +citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0 +citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0 +citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0 (12 rows) step s1-release-split-advisory-lock: diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 5d9b3ea5f..a03422e40 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1203,6 +1203,7 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text) | function worker_append_table_to_shard(text,text,text,integer) void | + function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info | | function citus_get_node_clock() cluster_clock | function citus_get_transaction_clock() cluster_clock | function citus_internal_adjust_local_clock_to_remote(cluster_clock) void @@ -1220,6 +1221,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function cluster_clock_recv(internal) cluster_clock | function cluster_clock_send(cluster_clock) bytea | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text, source_lsn pg_lsn, target_lsn pg_lsn, status text) + | function worker_split_shard_replication_setup(split_shard_info[],bigint) SETOF replication_slot_info | operator <(cluster_clock,cluster_clock) | operator <=(cluster_clock,cluster_clock) | operator <>(cluster_clock,cluster_clock) @@ -1230,7 +1232,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | operator family cluster_clock_ops for access method btree | sequence pg_dist_clock_logical_seq | type cluster_clock -(29 rows) +(31 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/pg14.out b/src/test/regress/expected/pg14.out index 335a87a5b..7d73284ca 100644 --- a/src/test/regress/expected/pg14.out +++ b/src/test/regress/expected/pg14.out @@ -1472,7 +1472,7 @@ BEGIN; SET LOCAL citus.log_remote_commands TO ON; SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx, binary=true) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx, binary=true) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx citus_move_shard_placement --------------------------------------------------------------------- @@ -1487,7 +1487,7 @@ SET LOCAL citus.log_remote_commands TO ON; SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; SET LOCAL citus.enable_binary_protocol = FALSE; SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx citus_move_shard_placement --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_release_dsm.out b/src/test/regress/expected/split_shard_release_dsm.out index 95cc210bb..f35406b5a 100644 --- a/src/test/regress/expected/split_shard_release_dsm.out +++ b/src/test/regress/expected/split_shard_release_dsm.out @@ -11,7 +11,7 @@ SET client_min_messages TO ERROR; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); count --------------------------------------------------------------------- 1 @@ -21,7 +21,7 @@ SET client_min_messages TO WARNING; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. count --------------------------------------------------------------------- @@ -37,7 +37,7 @@ SELECT pg_catalog.worker_split_shard_release_dsm(); SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); count --------------------------------------------------------------------- 1 diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index eead15e40..ab159bc1f 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -69,7 +69,7 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. count --------------------------------------------------------------------- @@ -78,8 +78,16 @@ WARNING: Previous split shard worflow was not successfully and could not comple SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset -SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset -SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index 4bdb8c013..84370c983 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -65,13 +65,21 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); count --------------------------------------------------------------------- 1 (1 row) -SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; diff --git a/src/test/regress/expected/split_shard_replication_setup_local.out b/src/test/regress/expected/split_shard_replication_setup_local.out index d14b29552..05431d06e 100644 --- a/src/test/regress/expected/split_shard_replication_setup_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_local.out @@ -13,13 +13,21 @@ CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); count --------------------------------------------------------------------- 1 (1 row) -SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' a BEGIN; CREATE SUBSCRIPTION local_subscription diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out index 88df2a5b7..e6e6fda1d 100644 --- a/src/test/regress/expected/split_shard_replication_setup_remote_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_remote_local.out @@ -11,15 +11,23 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. count --------------------------------------------------------------------- 2 (1 row) -SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset -SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' CREATE SUBSCRIPTION sub_worker1 CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 14578a976..88d07b38d 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -253,7 +253,7 @@ ORDER BY 1; function worker_save_query_explain_analyze(text,jsonb) function worker_split_copy(bigint,text,split_copy_info[]) function worker_split_shard_release_dsm() - function worker_split_shard_replication_setup(split_shard_info[]) + function worker_split_shard_replication_setup(split_shard_info[],bigint) operator <(cluster_clock,cluster_clock) operator <=(cluster_clock,cluster_clock) operator <>(cluster_clock,cluster_clock) diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index 31b51f1e7..2bc84441c 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -131,7 +131,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.allow()'); -- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup -SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10$$); +SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10_15$$); -- cleanup leftovers -- verify we don't see any error for already dropped subscription SET client_min_messages TO WARNING; diff --git a/src/test/regress/sql/split_shard_release_dsm.sql b/src/test/regress/sql/split_shard_release_dsm.sql index b8fe6bfb6..69394e207 100644 --- a/src/test/regress/sql/split_shard_release_dsm.sql +++ b/src/test/regress/sql/split_shard_release_dsm.sql @@ -13,16 +13,16 @@ SET client_min_messages TO ERROR; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); SET client_min_messages TO WARNING; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); SELECT pg_catalog.worker_split_shard_release_dsm(); SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index a8311bceb..74a82e936 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -71,14 +71,18 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset -SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; -SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset + +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 20e12ee7b..0d13cf381 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -69,9 +69,13 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); -SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - - - :worker_2_port diff --git a/src/test/regress/sql/split_shard_replication_setup_local.sql b/src/test/regress/sql/split_shard_replication_setup_local.sql index 6d0091fe2..e0927c659 100644 --- a/src/test/regress/sql/split_shard_replication_setup_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_local.sql @@ -16,9 +16,13 @@ CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); -SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' a BEGIN; diff --git a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql index 21c2b0252..0a6e67a5f 100644 --- a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql @@ -14,10 +14,14 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); -SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset -SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' CREATE SUBSCRIPTION sub_worker1