From cb02d623690a268cd05e5d325cc4be535033db2e Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Tue, 6 Dec 2022 15:48:16 +0300 Subject: [PATCH] Unique names for replication artifacts (#6529) DESCRIPTION: Create replication artifacts with unique names We're creating replication objects with generic names. This disallows us to enable parallel shard moves, as two operations might use the same objects. With this PR, we'll create below objects with operation specific names, by appending OparationId to the names. * Subscriptions * Publications * Replication Slots * Users created for subscriptions --- .../distributed/operations/shard_split.c | 8 +- ...worker_split_shard_replication_setup_udf.c | 36 ++++-- .../replication/multi_logical_replication.c | 28 +++-- .../distributed/sql/citus--11.1-1--11.2-1.sql | 1 + .../sql/downgrades/citus--11.2-1--11.1-1.sql | 5 + .../11.2-1.sql | 11 ++ .../latest.sql | 22 +--- .../distributed/multi_logical_replication.h | 7 +- src/test/regress/bin/normalize.sed | 5 +- src/test/regress/expected/cpu_priority.out | 40 +++---- .../failure_online_move_shard_placement.out | 10 +- .../expected/failure_split_cleanup.out | 108 +++++++++--------- .../isolation_non_blocking_shard_split.out | 52 ++++----- src/test/regress/expected/multi_extension.out | 4 +- src/test/regress/expected/pg14.out | 4 +- .../expected/split_shard_release_dsm.out | 6 +- ...plit_shard_replication_colocated_setup.out | 14 ++- .../split_shard_replication_setup.out | 12 +- .../split_shard_replication_setup_local.out | 12 +- ...t_shard_replication_setup_remote_local.out | 14 ++- .../expected/upgrade_list_citus_objects.out | 2 +- .../failure_online_move_shard_placement.sql | 2 +- .../regress/sql/split_shard_release_dsm.sql | 6 +- ...plit_shard_replication_colocated_setup.sql | 10 +- .../sql/split_shard_replication_setup.sql | 8 +- .../split_shard_replication_setup_local.sql | 8 +- ...t_shard_replication_setup_remote_local.sql | 10 +- 27 files changed, 261 insertions(+), 184 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.2-1.sql diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 4185576a6..661bb6be4 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -2019,7 +2019,7 @@ ExecuteSplitShardReleaseSharedMemory(MultiConnection *sourceConnection) * Array[ * ROW(sourceShardId, childFirstShardId, childFirstMinRange, childFirstMaxRange, worker1)::citus.split_shard_info, * ROW(sourceShardId, childSecondShardId, childSecondMinRange, childSecondMaxRange, worker2)::citus.split_shard_info - * ]); + * ], CurrentOperationId); */ StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, @@ -2080,8 +2080,10 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, StringInfo splitShardReplicationUDF = makeStringInfo(); appendStringInfo(splitShardReplicationUDF, - "SELECT * FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[%s]);", - splitChildrenRows->data); + "SELECT * FROM pg_catalog.worker_split_shard_replication_setup(" + "ARRAY[%s], %lu);", + splitChildrenRows->data, + CurrentOperationId); return splitShardReplicationUDF; } diff --git a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c index cd8c58426..85c2328c7 100644 --- a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c +++ b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c @@ -15,6 +15,7 @@ #include "distributed/distribution_column.h" #include "distributed/hash_helpers.h" #include "distributed/shardinterval_utils.h" +#include "distributed/shard_cleaner.h" #include "distributed/shard_utils.h" #include "distributed/shardsplit_shared_memory.h" #include "distributed/connection_management.h" @@ -49,10 +50,12 @@ static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit, int32 maxValue, int32 nodeId); static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo); -static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader); +static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, + OperationId operationId); static void ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, - TupleDesc tupleDescriptor); + TupleDesc tupleDescriptor, + OperationId operationId); /* * worker_split_shard_replication_setup UDF creates in-memory data structures @@ -104,6 +107,8 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) ereport(ERROR, (errmsg("Unexpectedly shard info array contains a null value"))); } + OperationId operationId = DatumGetUInt64(PG_GETARG_DATUM(1)); + /* SetupMap */ ShardInfoHashMap = CreateSimpleHash(NodeAndOwner, GroupedShardSplitInfos); @@ -142,14 +147,14 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) ShardSplitInfoSMHeader *splitShardInfoSMHeader = CreateSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle); - PopulateShardSplitInfoInSM(splitShardInfoSMHeader); + PopulateShardSplitInfoInSM(splitShardInfoSMHeader, operationId); /* store handle in statically allocated shared memory*/ StoreShardSplitSharedMemoryHandle(dsmHandle); TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); - ReturnReplicationSlotInfo(tupleStore, tupleDescriptor); + ReturnReplicationSlotInfo(tupleStore, tupleDescriptor, operationId); PG_RETURN_VOID(); } @@ -270,7 +275,8 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) * shardSplitInfoSMHeader - Shared memory header */ static void -PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) +PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, + OperationId operationId) { HASH_SEQ_STATUS status; hash_seq_init(&status, ShardInfoHashMap); @@ -281,8 +287,11 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) { uint32_t nodeId = entry->key.nodeId; uint32_t tableOwnerId = entry->key.tableOwnerId; - char *derivedSlotName = ReplicationSlotNameForNodeAndOwner(SHARD_SPLIT, nodeId, - tableOwnerId); + char *derivedSlotName = + ReplicationSlotNameForNodeAndOwnerForOperation(SHARD_SPLIT, + nodeId, + tableOwnerId, + operationId); List *shardSplitInfoList = entry->shardSplitInfoList; ShardSplitInfo *splitShardInfo = NULL; @@ -370,8 +379,9 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, * part of non-blocking split workflow. */ static void -ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, TupleDesc - tupleDescriptor) +ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, + TupleDesc tupleDescriptor, + OperationId operationId) { HASH_SEQ_STATUS status; hash_seq_init(&status, ShardInfoHashMap); @@ -390,9 +400,11 @@ ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, TupleDesc char *tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false); values[1] = CStringGetTextDatum(tableOwnerName); - char *slotName = ReplicationSlotNameForNodeAndOwner(SHARD_SPLIT, - entry->key.nodeId, - entry->key.tableOwnerId); + char *slotName = + ReplicationSlotNameForNodeAndOwnerForOperation(SHARD_SPLIT, + entry->key.nodeId, + entry->key.tableOwnerId, + operationId); values[2] = CStringGetTextDatum(slotName); tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 0157bb545..a301367e1 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -477,9 +477,11 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList) target->newShards = NIL; target->subscriptionOwnerName = SubscriptionRoleName(SHARD_MOVE, ownerId); target->replicationSlot = palloc0(sizeof(ReplicationSlotInfo)); - target->replicationSlot->name = ReplicationSlotNameForNodeAndOwner(SHARD_MOVE, - nodeId, - ownerId); + target->replicationSlot->name = + ReplicationSlotNameForNodeAndOwnerForOperation(SHARD_MOVE, + nodeId, + ownerId, + CurrentOperationId); target->replicationSlot->targetNodeId = nodeId; target->replicationSlot->tableOwnerId = ownerId; logicalRepTargetList = lappend(logicalRepTargetList, target); @@ -1208,23 +1210,25 @@ ConflictWithIsolationTestingAfterCopy(void) char * PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId) { - return psprintf("%s%u_%u", publicationPrefix[type], nodeId, ownerId); + return psprintf("%s%u_%u_%lu", publicationPrefix[type], + nodeId, ownerId, CurrentOperationId); } /* - * ReplicationSlotNameForNodeAndOwner returns the name of the replication slot for the - * given node and table owner. + * ReplicationSlotNameForNodeAndOwnerForOperation returns the name of the + * replication slot for the given node, table owner and operation id. * * Note that PG15 introduced a new ReplicationSlotName function that caused name conflicts * and we renamed this function. */ char * -ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid ownerId) +ReplicationSlotNameForNodeAndOwnerForOperation(LogicalRepType type, uint32_t nodeId, + Oid ownerId, OperationId operationId) { StringInfo slotName = makeStringInfo(); - appendStringInfo(slotName, "%s%u_%u", replicationSlotPrefix[type], nodeId, - ownerId); + appendStringInfo(slotName, "%s%u_%u_%lu", replicationSlotPrefix[type], nodeId, + ownerId, operationId); if (slotName->len > NAMEDATALEN) { @@ -1243,7 +1247,8 @@ ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid own char * SubscriptionName(LogicalRepType type, Oid ownerId) { - return psprintf("%s%i", subscriptionPrefix[type], ownerId); + return psprintf("%s%i_%lu", subscriptionPrefix[type], + ownerId, CurrentOperationId); } @@ -1254,7 +1259,8 @@ SubscriptionName(LogicalRepType type, Oid ownerId) char * SubscriptionRoleName(LogicalRepType type, Oid ownerId) { - return psprintf("%s%i", subscriptionRolePrefix[type], ownerId); + return psprintf("%s%i_%lu", subscriptionRolePrefix[type], ownerId, + CurrentOperationId); } diff --git a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql index d40336103..1c059dac5 100644 --- a/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql +++ b/src/backend/distributed/sql/citus--11.1-1--11.2-1.sql @@ -9,3 +9,4 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer) #include "udfs/citus_get_transaction_clock/11.2-1.sql" #include "udfs/citus_is_clock_after/11.2-1.sql" #include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql" +#include "udfs/worker_split_shard_replication_setup/11.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql index 7cdd51140..6031c2fd0 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.2-1--11.1-1.sql @@ -10,6 +10,9 @@ DROP SEQUENCE pg_catalog.pg_dist_clock_logical_seq; DROP OPERATOR CLASS pg_catalog.cluster_clock_ops USING btree CASCADE; DROP OPERATOR FAMILY pg_catalog.cluster_clock_ops USING btree CASCADE; DROP TYPE pg_catalog.cluster_clock CASCADE; +DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[], bigint); +DROP TYPE pg_catalog.replication_slot_info; +DROP TYPE pg_catalog.split_shard_info; CREATE FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer) RETURNS void @@ -17,3 +20,5 @@ CREATE FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, intege AS 'MODULE_PATHNAME', $$worker_append_table_to_shard$$; COMMENT ON FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer) IS 'append a regular table''s contents to the shard'; + +#include "../udfs/worker_split_shard_replication_setup/11.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.2-1.sql b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.2-1.sql new file mode 100644 index 000000000..2aa20d832 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.2-1.sql @@ -0,0 +1,11 @@ +DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]); + +CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( + splitShardInfo pg_catalog.split_shard_info[], operation_id bigint) +RETURNS setof pg_catalog.replication_slot_info +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; +COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[], operation_id bigint) + IS 'Replication setup for splitting a shard'; + +REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[], bigint) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql index 3ee131d45..2aa20d832 100644 --- a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql @@ -1,25 +1,11 @@ -CREATE TYPE citus.split_shard_info AS ( - source_shard_id bigint, - distribution_column text, - child_shard_id bigint, - shard_min_value text, - shard_max_value text, - node_id integer); -ALTER TYPE citus.split_shard_info SET SCHEMA pg_catalog; -COMMENT ON TYPE pg_catalog.split_shard_info - IS 'Stores split child shard information'; - -CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text); -ALTER TYPE citus.replication_slot_info SET SCHEMA pg_catalog; -COMMENT ON TYPE pg_catalog.replication_slot_info - IS 'Replication slot information to be used for subscriptions during non blocking shard split'; +DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]); CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( - splitShardInfo pg_catalog.split_shard_info[]) + splitShardInfo pg_catalog.split_shard_info[], operation_id bigint) RETURNS setof pg_catalog.replication_slot_info LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; -COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[]) +COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[], operation_id bigint) IS 'Replication setup for splitting a shard'; -REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]) FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[], bigint) FROM PUBLIC; diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 838236c79..f5a9dc342 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -17,6 +17,7 @@ #include "nodes/pg_list.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/shard_cleaner.h" /* Config variables managed via guc.c */ @@ -152,8 +153,10 @@ extern char * CreateReplicationSlots(MultiConnection *sourceConnection, extern void EnableSubscriptions(List *subscriptionInfoList); extern char * PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId); -extern char * ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid - ownerId); +extern char * ReplicationSlotNameForNodeAndOwnerForOperation(LogicalRepType type, + uint32_t nodeId, + Oid ownerId, + OperationId operationId); extern char * SubscriptionName(LogicalRepType type, Oid ownerId); extern char * SubscriptionRoleName(LogicalRepType type, Oid ownerId); diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index cd57a6673..3d251156c 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -45,8 +45,9 @@ s/truncate_trigger_[0-9]+/truncate_trigger_xxxxxxx/g # shard move subscription and publication names contain the oid of the # table owner, which can change across runs -s/(citus_shard_(move|split)_subscription_)[0-9]+/\1xxxxxxx/g -s/(citus_shard_(move|split)_(slot|publication)_)[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx/g +s/(citus_shard_(move|split)_subscription_role_)[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx/g +s/(citus_shard_(move|split)_subscription_)[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx/g +s/(citus_shard_(move|split)_(slot|publication)_)[0-9]+_[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx_xxxxxxx/g # In foreign_key_restriction_enforcement, normalize shard names s/"(on_update_fkey_table_|fkey_)[0-9]+"/"\1xxxxxxx"/g diff --git a/src/test/regress/expected/cpu_priority.out b/src/test/regress/expected/cpu_priority.out index 008eacafd..0c2412fdd 100644 --- a/src/test/regress/expected/cpu_priority.out +++ b/src/test/regress/expected/cpu_priority.out @@ -89,13 +89,13 @@ SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; -- PG14, beacuse PG13 doesn't support binary logical replication. SET citus.enable_binary_protocol = false; SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx master_move_shard_placement --------------------------------------------------------------------- @@ -104,13 +104,13 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.cpu_priority_for_logical_replication_senders = 15; SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx master_move_shard_placement --------------------------------------------------------------------- @@ -119,13 +119,13 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.max_high_priority_background_processes = 3; SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx master_move_shard_placement --------------------------------------------------------------------- @@ -141,21 +141,21 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY['-1500000000'], ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx citus_split_shard_by_split_points --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_online_move_shard_placement.out b/src/test/regress/expected/failure_online_move_shard_placement.out index 71a3b2527..1bb9841ab 100644 --- a/src/test/regress/expected/failure_online_move_shard_placement.out +++ b/src/test/regress/expected/failure_online_move_shard_placement.out @@ -126,8 +126,8 @@ WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx -WARNING: role "citus_shard_move_subscription_role_10" cannot be dropped because some objects depend on it -DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx +WARNING: role "citus_shard_move_subscription_role_xxxxxxx_xxxxxxx" cannot be dropped because some objects depend on it +DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx_xxxxxxx CONTEXT: while executing command on localhost:xxxxx ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx @@ -202,8 +202,8 @@ WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx -WARNING: role "citus_shard_move_subscription_role_10" cannot be dropped because some objects depend on it -DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx +WARNING: role "citus_shard_move_subscription_role_xxxxxxx_xxxxxxx" cannot be dropped because some objects depend on it +DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx_xxxxxxx CONTEXT: while executing command on localhost:xxxxx master_move_shard_placement --------------------------------------------------------------------- @@ -316,7 +316,7 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup -SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_xxxxxxx$$); +SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_xxxxxxx_xxxxxxx$$); run_command_on_workers --------------------------------------------------------------------- (localhost,9060,t,"DROP SUBSCRIPTION") diff --git a/src/test/regress/expected/failure_split_cleanup.out b/src/test/regress/expected/failure_split_cleanup.out index ce06f4978..9e8bb17d2 100644 --- a/src/test/regress/expected/failure_split_cleanup.out +++ b/src/test/regress/expected/failure_split_cleanup.out @@ -57,7 +57,7 @@ CONTEXT: while executing command on localhost:xxxxx --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 (3 rows) -- we need to allow connection so that we can connect to proxy @@ -160,8 +160,8 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 (4 rows) -- we need to allow connection so that we can connect to proxy @@ -185,10 +185,10 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes -- Left over publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over replication slots @@ -271,9 +271,9 @@ CONTEXT: while executing command on localhost:xxxxx --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 (5 rows) -- we need to allow connection so that we can connect to proxy @@ -297,10 +297,10 @@ CONTEXT: while executing command on localhost:xxxxx -- Left over publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over replication slots @@ -383,12 +383,12 @@ CONTEXT: while executing command on localhost:xxxxx --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 2 | 0 (8 rows) -- we need to allow connection so that we can connect to proxy @@ -412,25 +412,25 @@ CONTEXT: while executing command on localhost:xxxxx -- Left over publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over replication slots SELECT slot_name FROM pg_replication_slots; - slot_name + slot_name --------------------------------------------------------------------- - citus_shard_split_slot_xxxxxxx_xxxxxxx - citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over subscriptions SELECT subname FROM pg_subscription; - subname + subname --------------------------------------------------------------------- - citus_shard_split_subscription_xxxxxxx + citus_shard_split_subscription_xxxxxxx_xxxxxxx (1 row) \c - postgres - :master_port @@ -501,12 +501,12 @@ CONTEXT: while executing command on localhost:xxxxx --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 2 | 0 (8 rows) -- we need to allow connection so that we can connect to proxy @@ -530,25 +530,25 @@ CONTEXT: while executing command on localhost:xxxxx -- Left over publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over replication slots SELECT slot_name FROM pg_replication_slots; - slot_name + slot_name --------------------------------------------------------------------- - citus_shard_split_slot_xxxxxxx_xxxxxxx - citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over subscriptions SELECT subname FROM pg_subscription; - subname + subname --------------------------------------------------------------------- - citus_shard_split_subscription_xxxxxxx + citus_shard_split_subscription_xxxxxxx_xxxxxxx (1 row) \c - postgres - :master_port @@ -620,12 +620,12 @@ CONTEXT: while executing command on localhost:xxxxx --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 2 | 0 (8 rows) SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; @@ -653,25 +653,25 @@ CONTEXT: while executing command on localhost:xxxxx -- Left over publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over replication slots SELECT slot_name FROM pg_replication_slots; - slot_name + slot_name --------------------------------------------------------------------- - citus_shard_split_slot_xxxxxxx_xxxxxxx - citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx (2 rows) -- Left over subscriptions SELECT subname FROM pg_subscription; - subname + subname --------------------------------------------------------------------- - citus_shard_split_subscription_xxxxxxx + citus_shard_split_subscription_xxxxxxx_xxxxxxx (1 row) \c - postgres - :master_port diff --git a/src/test/regress/expected/isolation_non_blocking_shard_split.out b/src/test/regress/expected/isolation_non_blocking_shard_split.out index 5c8b27213..cd0dbdbe1 100644 --- a/src/test/regress/expected/isolation_non_blocking_shard_split.out +++ b/src/test/regress/expected/isolation_non_blocking_shard_split.out @@ -982,20 +982,20 @@ run_try_drop_marked_resources step s1-show-pg_dist_cleanup: SELECT object_name, object_type, policy_type FROM pg_dist_cleanup; -object_name |object_type|policy_type +object_name |object_type|policy_type --------------------------------------------------------------------- -public.to_split_table_1500002 | 1| 1 -public.to_split_table_1500003 | 1| 1 -public.to_split_table_1500001 | 1| 0 -public.to_split_table_1500003 | 1| 0 -citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 -citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 -citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 -citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 -citus_shard_split_subscription_role_10| 5| 0 -citus_shard_split_subscription_xxxxxxx | 2| 0 -citus_shard_split_subscription_role_10| 5| 0 -citus_shard_split_subscription_xxxxxxx | 2| 0 +public.to_split_table_1500002 | 1| 1 +public.to_split_table_1500003 | 1| 1 +public.to_split_table_1500001 | 1| 0 +public.to_split_table_1500003 | 1| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0 +citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0 +citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0 +citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0 (12 rows) step s1-release-split-advisory-lock: @@ -1060,20 +1060,20 @@ run_try_drop_marked_resources step s1-show-pg_dist_cleanup: SELECT object_name, object_type, policy_type FROM pg_dist_cleanup; -object_name |object_type|policy_type +object_name |object_type|policy_type --------------------------------------------------------------------- -public.to_split_table_1500002 | 1| 1 -public.to_split_table_1500003 | 1| 1 -public.to_split_table_1500001 | 1| 0 -public.to_split_table_1500003 | 1| 0 -citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 -citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 -citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 -citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 -citus_shard_split_subscription_role_10| 5| 0 -citus_shard_split_subscription_xxxxxxx | 2| 0 -citus_shard_split_subscription_role_10| 5| 0 -citus_shard_split_subscription_xxxxxxx | 2| 0 +public.to_split_table_1500002 | 1| 1 +public.to_split_table_1500003 | 1| 1 +public.to_split_table_1500001 | 1| 0 +public.to_split_table_1500003 | 1| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0 +citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0 +citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0 +citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0 (12 rows) step s1-release-split-advisory-lock: diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 5d9b3ea5f..a03422e40 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1203,6 +1203,7 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text) | function worker_append_table_to_shard(text,text,text,integer) void | + function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info | | function citus_get_node_clock() cluster_clock | function citus_get_transaction_clock() cluster_clock | function citus_internal_adjust_local_clock_to_remote(cluster_clock) void @@ -1220,6 +1221,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function cluster_clock_recv(internal) cluster_clock | function cluster_clock_send(cluster_clock) bytea | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text, source_lsn pg_lsn, target_lsn pg_lsn, status text) + | function worker_split_shard_replication_setup(split_shard_info[],bigint) SETOF replication_slot_info | operator <(cluster_clock,cluster_clock) | operator <=(cluster_clock,cluster_clock) | operator <>(cluster_clock,cluster_clock) @@ -1230,7 +1232,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | operator family cluster_clock_ops for access method btree | sequence pg_dist_clock_logical_seq | type cluster_clock -(29 rows) +(31 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/pg14.out b/src/test/regress/expected/pg14.out index 335a87a5b..7d73284ca 100644 --- a/src/test/regress/expected/pg14.out +++ b/src/test/regress/expected/pg14.out @@ -1472,7 +1472,7 @@ BEGIN; SET LOCAL citus.log_remote_commands TO ON; SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx, binary=true) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx, binary=true) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx citus_move_shard_placement --------------------------------------------------------------------- @@ -1487,7 +1487,7 @@ SET LOCAL citus.log_remote_commands TO ON; SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; SET LOCAL citus.enable_binary_protocol = FALSE; SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx citus_move_shard_placement --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_release_dsm.out b/src/test/regress/expected/split_shard_release_dsm.out index 95cc210bb..f35406b5a 100644 --- a/src/test/regress/expected/split_shard_release_dsm.out +++ b/src/test/regress/expected/split_shard_release_dsm.out @@ -11,7 +11,7 @@ SET client_min_messages TO ERROR; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); count --------------------------------------------------------------------- 1 @@ -21,7 +21,7 @@ SET client_min_messages TO WARNING; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. count --------------------------------------------------------------------- @@ -37,7 +37,7 @@ SELECT pg_catalog.worker_split_shard_release_dsm(); SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); count --------------------------------------------------------------------- 1 diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index eead15e40..ab159bc1f 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -69,7 +69,7 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. count --------------------------------------------------------------------- @@ -78,8 +78,16 @@ WARNING: Previous split shard worflow was not successfully and could not comple SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset -SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset -SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index 4bdb8c013..84370c983 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -65,13 +65,21 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); count --------------------------------------------------------------------- 1 (1 row) -SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; diff --git a/src/test/regress/expected/split_shard_replication_setup_local.out b/src/test/regress/expected/split_shard_replication_setup_local.out index d14b29552..05431d06e 100644 --- a/src/test/regress/expected/split_shard_replication_setup_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_local.out @@ -13,13 +13,21 @@ CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); count --------------------------------------------------------------------- 1 (1 row) -SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' a BEGIN; CREATE SUBSCRIPTION local_subscription diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out index 88df2a5b7..e6e6fda1d 100644 --- a/src/test/regress/expected/split_shard_replication_setup_remote_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_remote_local.out @@ -11,15 +11,23 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. count --------------------------------------------------------------------- 2 (1 row) -SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset -SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + citus.next_operation_id +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' CREATE SUBSCRIPTION sub_worker1 CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 14578a976..88d07b38d 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -253,7 +253,7 @@ ORDER BY 1; function worker_save_query_explain_analyze(text,jsonb) function worker_split_copy(bigint,text,split_copy_info[]) function worker_split_shard_release_dsm() - function worker_split_shard_replication_setup(split_shard_info[]) + function worker_split_shard_replication_setup(split_shard_info[],bigint) operator <(cluster_clock,cluster_clock) operator <=(cluster_clock,cluster_clock) operator <>(cluster_clock,cluster_clock) diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index 31b51f1e7..2bc84441c 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -131,7 +131,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.allow()'); -- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup -SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10$$); +SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10_15$$); -- cleanup leftovers -- verify we don't see any error for already dropped subscription SET client_min_messages TO WARNING; diff --git a/src/test/regress/sql/split_shard_release_dsm.sql b/src/test/regress/sql/split_shard_release_dsm.sql index b8fe6bfb6..69394e207 100644 --- a/src/test/regress/sql/split_shard_release_dsm.sql +++ b/src/test/regress/sql/split_shard_release_dsm.sql @@ -13,16 +13,16 @@ SET client_min_messages TO ERROR; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); SET client_min_messages TO WARNING; SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); SELECT pg_catalog.worker_split_shard_release_dsm(); SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index a8311bceb..74a82e936 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -71,14 +71,18 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset -SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; -SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset + +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 20e12ee7b..0d13cf381 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -69,9 +69,13 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); -SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - - - :worker_2_port diff --git a/src/test/regress/sql/split_shard_replication_setup_local.sql b/src/test/regress/sql/split_shard_replication_setup_local.sql index 6d0091fe2..e0927c659 100644 --- a/src/test/regress/sql/split_shard_replication_setup_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_local.sql @@ -16,9 +16,13 @@ CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info - ]); + ], 0); -SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' a BEGIN; diff --git a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql index 21c2b0252..0a6e67a5f 100644 --- a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql @@ -14,10 +14,14 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info - ]); + ], 0); -SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset -SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset +-- we create replication slots with a name including the next_operation_id as a suffix +-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command +SHOW citus.next_operation_id; + +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' CREATE SUBSCRIPTION sub_worker1