Unique names for replication artifacts (#6529)

DESCRIPTION: Create replication artifacts with unique names

We're creating replication objects with generic names. This disallows us
to enable parallel shard moves, as two operations might use the same
objects. With this PR, we'll create below objects with operation
specific names, by appending OparationId to the names.

* Subscriptions
* Publications
* Replication Slots
* Users created for subscriptions
reproduce-flaky-background
Ahmet Gedemenli 2022-12-06 15:48:16 +03:00 committed by GitHub
parent e14dc5d45d
commit cb02d62369
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 261 additions and 184 deletions

View File

@ -2019,7 +2019,7 @@ ExecuteSplitShardReleaseSharedMemory(MultiConnection *sourceConnection)
* Array[ * Array[
* ROW(sourceShardId, childFirstShardId, childFirstMinRange, childFirstMaxRange, worker1)::citus.split_shard_info, * ROW(sourceShardId, childFirstShardId, childFirstMinRange, childFirstMaxRange, worker1)::citus.split_shard_info,
* ROW(sourceShardId, childSecondShardId, childSecondMinRange, childSecondMaxRange, worker2)::citus.split_shard_info * ROW(sourceShardId, childSecondShardId, childSecondMinRange, childSecondMaxRange, worker2)::citus.split_shard_info
* ]); * ], CurrentOperationId);
*/ */
StringInfo StringInfo
CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
@ -2080,8 +2080,10 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
StringInfo splitShardReplicationUDF = makeStringInfo(); StringInfo splitShardReplicationUDF = makeStringInfo();
appendStringInfo(splitShardReplicationUDF, appendStringInfo(splitShardReplicationUDF,
"SELECT * FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[%s]);", "SELECT * FROM pg_catalog.worker_split_shard_replication_setup("
splitChildrenRows->data); "ARRAY[%s], %lu);",
splitChildrenRows->data,
CurrentOperationId);
return splitShardReplicationUDF; return splitShardReplicationUDF;
} }

View File

@ -15,6 +15,7 @@
#include "distributed/distribution_column.h" #include "distributed/distribution_column.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/shard_cleaner.h"
#include "distributed/shard_utils.h" #include "distributed/shard_utils.h"
#include "distributed/shardsplit_shared_memory.h" #include "distributed/shardsplit_shared_memory.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
@ -49,10 +50,12 @@ static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit,
int32 maxValue, int32 maxValue,
int32 nodeId); int32 nodeId);
static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo); static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo);
static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader); static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
OperationId operationId);
static void ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, static void ReturnReplicationSlotInfo(Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor); TupleDesc tupleDescriptor,
OperationId operationId);
/* /*
* worker_split_shard_replication_setup UDF creates in-memory data structures * worker_split_shard_replication_setup UDF creates in-memory data structures
@ -104,6 +107,8 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
ereport(ERROR, (errmsg("Unexpectedly shard info array contains a null value"))); ereport(ERROR, (errmsg("Unexpectedly shard info array contains a null value")));
} }
OperationId operationId = DatumGetUInt64(PG_GETARG_DATUM(1));
/* SetupMap */ /* SetupMap */
ShardInfoHashMap = CreateSimpleHash(NodeAndOwner, GroupedShardSplitInfos); ShardInfoHashMap = CreateSimpleHash(NodeAndOwner, GroupedShardSplitInfos);
@ -142,14 +147,14 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
ShardSplitInfoSMHeader *splitShardInfoSMHeader = ShardSplitInfoSMHeader *splitShardInfoSMHeader =
CreateSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle); CreateSharedMemoryForShardSplitInfo(shardSplitInfoCount, &dsmHandle);
PopulateShardSplitInfoInSM(splitShardInfoSMHeader); PopulateShardSplitInfoInSM(splitShardInfoSMHeader, operationId);
/* store handle in statically allocated shared memory*/ /* store handle in statically allocated shared memory*/
StoreShardSplitSharedMemoryHandle(dsmHandle); StoreShardSplitSharedMemoryHandle(dsmHandle);
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
ReturnReplicationSlotInfo(tupleStore, tupleDescriptor); ReturnReplicationSlotInfo(tupleStore, tupleDescriptor, operationId);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -270,7 +275,8 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
* shardSplitInfoSMHeader - Shared memory header * shardSplitInfoSMHeader - Shared memory header
*/ */
static void static void
PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
OperationId operationId)
{ {
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
hash_seq_init(&status, ShardInfoHashMap); hash_seq_init(&status, ShardInfoHashMap);
@ -281,8 +287,11 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader)
{ {
uint32_t nodeId = entry->key.nodeId; uint32_t nodeId = entry->key.nodeId;
uint32_t tableOwnerId = entry->key.tableOwnerId; uint32_t tableOwnerId = entry->key.tableOwnerId;
char *derivedSlotName = ReplicationSlotNameForNodeAndOwner(SHARD_SPLIT, nodeId, char *derivedSlotName =
tableOwnerId); ReplicationSlotNameForNodeAndOwnerForOperation(SHARD_SPLIT,
nodeId,
tableOwnerId,
operationId);
List *shardSplitInfoList = entry->shardSplitInfoList; List *shardSplitInfoList = entry->shardSplitInfoList;
ShardSplitInfo *splitShardInfo = NULL; ShardSplitInfo *splitShardInfo = NULL;
@ -370,8 +379,9 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
* part of non-blocking split workflow. * part of non-blocking split workflow.
*/ */
static void static void
ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, TupleDesc ReturnReplicationSlotInfo(Tuplestorestate *tupleStore,
tupleDescriptor) TupleDesc tupleDescriptor,
OperationId operationId)
{ {
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
hash_seq_init(&status, ShardInfoHashMap); hash_seq_init(&status, ShardInfoHashMap);
@ -390,9 +400,11 @@ ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, TupleDesc
char *tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false); char *tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false);
values[1] = CStringGetTextDatum(tableOwnerName); values[1] = CStringGetTextDatum(tableOwnerName);
char *slotName = ReplicationSlotNameForNodeAndOwner(SHARD_SPLIT, char *slotName =
entry->key.nodeId, ReplicationSlotNameForNodeAndOwnerForOperation(SHARD_SPLIT,
entry->key.tableOwnerId); entry->key.nodeId,
entry->key.tableOwnerId,
operationId);
values[2] = CStringGetTextDatum(slotName); values[2] = CStringGetTextDatum(slotName);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls);

View File

@ -477,9 +477,11 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList)
target->newShards = NIL; target->newShards = NIL;
target->subscriptionOwnerName = SubscriptionRoleName(SHARD_MOVE, ownerId); target->subscriptionOwnerName = SubscriptionRoleName(SHARD_MOVE, ownerId);
target->replicationSlot = palloc0(sizeof(ReplicationSlotInfo)); target->replicationSlot = palloc0(sizeof(ReplicationSlotInfo));
target->replicationSlot->name = ReplicationSlotNameForNodeAndOwner(SHARD_MOVE, target->replicationSlot->name =
nodeId, ReplicationSlotNameForNodeAndOwnerForOperation(SHARD_MOVE,
ownerId); nodeId,
ownerId,
CurrentOperationId);
target->replicationSlot->targetNodeId = nodeId; target->replicationSlot->targetNodeId = nodeId;
target->replicationSlot->tableOwnerId = ownerId; target->replicationSlot->tableOwnerId = ownerId;
logicalRepTargetList = lappend(logicalRepTargetList, target); logicalRepTargetList = lappend(logicalRepTargetList, target);
@ -1208,23 +1210,25 @@ ConflictWithIsolationTestingAfterCopy(void)
char * char *
PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId) PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId)
{ {
return psprintf("%s%u_%u", publicationPrefix[type], nodeId, ownerId); return psprintf("%s%u_%u_%lu", publicationPrefix[type],
nodeId, ownerId, CurrentOperationId);
} }
/* /*
* ReplicationSlotNameForNodeAndOwner returns the name of the replication slot for the * ReplicationSlotNameForNodeAndOwnerForOperation returns the name of the
* given node and table owner. * replication slot for the given node, table owner and operation id.
* *
* Note that PG15 introduced a new ReplicationSlotName function that caused name conflicts * Note that PG15 introduced a new ReplicationSlotName function that caused name conflicts
* and we renamed this function. * and we renamed this function.
*/ */
char * char *
ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid ownerId) ReplicationSlotNameForNodeAndOwnerForOperation(LogicalRepType type, uint32_t nodeId,
Oid ownerId, OperationId operationId)
{ {
StringInfo slotName = makeStringInfo(); StringInfo slotName = makeStringInfo();
appendStringInfo(slotName, "%s%u_%u", replicationSlotPrefix[type], nodeId, appendStringInfo(slotName, "%s%u_%u_%lu", replicationSlotPrefix[type], nodeId,
ownerId); ownerId, operationId);
if (slotName->len > NAMEDATALEN) if (slotName->len > NAMEDATALEN)
{ {
@ -1243,7 +1247,8 @@ ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid own
char * char *
SubscriptionName(LogicalRepType type, Oid ownerId) SubscriptionName(LogicalRepType type, Oid ownerId)
{ {
return psprintf("%s%i", subscriptionPrefix[type], ownerId); return psprintf("%s%i_%lu", subscriptionPrefix[type],
ownerId, CurrentOperationId);
} }
@ -1254,7 +1259,8 @@ SubscriptionName(LogicalRepType type, Oid ownerId)
char * char *
SubscriptionRoleName(LogicalRepType type, Oid ownerId) SubscriptionRoleName(LogicalRepType type, Oid ownerId)
{ {
return psprintf("%s%i", subscriptionRolePrefix[type], ownerId); return psprintf("%s%i_%lu", subscriptionRolePrefix[type], ownerId,
CurrentOperationId);
} }

View File

@ -9,3 +9,4 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer)
#include "udfs/citus_get_transaction_clock/11.2-1.sql" #include "udfs/citus_get_transaction_clock/11.2-1.sql"
#include "udfs/citus_is_clock_after/11.2-1.sql" #include "udfs/citus_is_clock_after/11.2-1.sql"
#include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql" #include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql"
#include "udfs/worker_split_shard_replication_setup/11.2-1.sql"

View File

@ -10,6 +10,9 @@ DROP SEQUENCE pg_catalog.pg_dist_clock_logical_seq;
DROP OPERATOR CLASS pg_catalog.cluster_clock_ops USING btree CASCADE; DROP OPERATOR CLASS pg_catalog.cluster_clock_ops USING btree CASCADE;
DROP OPERATOR FAMILY pg_catalog.cluster_clock_ops USING btree CASCADE; DROP OPERATOR FAMILY pg_catalog.cluster_clock_ops USING btree CASCADE;
DROP TYPE pg_catalog.cluster_clock CASCADE; DROP TYPE pg_catalog.cluster_clock CASCADE;
DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[], bigint);
DROP TYPE pg_catalog.replication_slot_info;
DROP TYPE pg_catalog.split_shard_info;
CREATE FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer) CREATE FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer)
RETURNS void RETURNS void
@ -17,3 +20,5 @@ CREATE FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, intege
AS 'MODULE_PATHNAME', $$worker_append_table_to_shard$$; AS 'MODULE_PATHNAME', $$worker_append_table_to_shard$$;
COMMENT ON FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer) COMMENT ON FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer)
IS 'append a regular table''s contents to the shard'; IS 'append a regular table''s contents to the shard';
#include "../udfs/worker_split_shard_replication_setup/11.1-1.sql"

View File

@ -0,0 +1,11 @@
DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]);
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
splitShardInfo pg_catalog.split_shard_info[], operation_id bigint)
RETURNS setof pg_catalog.replication_slot_info
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[], operation_id bigint)
IS 'Replication setup for splitting a shard';
REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[], bigint) FROM PUBLIC;

View File

@ -1,25 +1,11 @@
CREATE TYPE citus.split_shard_info AS ( DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]);
source_shard_id bigint,
distribution_column text,
child_shard_id bigint,
shard_min_value text,
shard_max_value text,
node_id integer);
ALTER TYPE citus.split_shard_info SET SCHEMA pg_catalog;
COMMENT ON TYPE pg_catalog.split_shard_info
IS 'Stores split child shard information';
CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text);
ALTER TYPE citus.replication_slot_info SET SCHEMA pg_catalog;
COMMENT ON TYPE pg_catalog.replication_slot_info
IS 'Replication slot information to be used for subscriptions during non blocking shard split';
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
splitShardInfo pg_catalog.split_shard_info[]) splitShardInfo pg_catalog.split_shard_info[], operation_id bigint)
RETURNS setof pg_catalog.replication_slot_info RETURNS setof pg_catalog.replication_slot_info
LANGUAGE C STRICT LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[]) COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[], operation_id bigint)
IS 'Replication setup for splitting a shard'; IS 'Replication setup for splitting a shard';
REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]) FROM PUBLIC; REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[], bigint) FROM PUBLIC;

View File

@ -17,6 +17,7 @@
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/shard_cleaner.h"
/* Config variables managed via guc.c */ /* Config variables managed via guc.c */
@ -152,8 +153,10 @@ extern char * CreateReplicationSlots(MultiConnection *sourceConnection,
extern void EnableSubscriptions(List *subscriptionInfoList); extern void EnableSubscriptions(List *subscriptionInfoList);
extern char * PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId); extern char * PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId);
extern char * ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid extern char * ReplicationSlotNameForNodeAndOwnerForOperation(LogicalRepType type,
ownerId); uint32_t nodeId,
Oid ownerId,
OperationId operationId);
extern char * SubscriptionName(LogicalRepType type, Oid ownerId); extern char * SubscriptionName(LogicalRepType type, Oid ownerId);
extern char * SubscriptionRoleName(LogicalRepType type, Oid ownerId); extern char * SubscriptionRoleName(LogicalRepType type, Oid ownerId);

View File

@ -45,8 +45,9 @@ s/truncate_trigger_[0-9]+/truncate_trigger_xxxxxxx/g
# shard move subscription and publication names contain the oid of the # shard move subscription and publication names contain the oid of the
# table owner, which can change across runs # table owner, which can change across runs
s/(citus_shard_(move|split)_subscription_)[0-9]+/\1xxxxxxx/g s/(citus_shard_(move|split)_subscription_role_)[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx/g
s/(citus_shard_(move|split)_(slot|publication)_)[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx/g s/(citus_shard_(move|split)_subscription_)[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx/g
s/(citus_shard_(move|split)_(slot|publication)_)[0-9]+_[0-9]+_[0-9]+/\1xxxxxxx_xxxxxxx_xxxxxxx/g
# In foreign_key_restriction_enforcement, normalize shard names # In foreign_key_restriction_enforcement, normalize shard names
s/"(on_update_fkey_table_|fkey_)[0-9]+"/"\1xxxxxxx"/g s/"(on_update_fkey_table_|fkey_)[0-9]+"/"\1xxxxxxx"/g

View File

@ -89,13 +89,13 @@ SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
-- PG14, beacuse PG13 doesn't support binary logical replication. -- PG14, beacuse PG13 doesn't support binary logical replication.
SET citus.enable_binary_protocol = false; SET citus.enable_binary_protocol = false;
SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
master_move_shard_placement master_move_shard_placement
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -104,13 +104,13 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.cpu_priority_for_logical_replication_senders = 15; SET citus.cpu_priority_for_logical_replication_senders = 15;
SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
master_move_shard_placement master_move_shard_placement
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -119,13 +119,13 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
SET citus.max_high_priority_background_processes = 3; SET citus.max_high_priority_background_processes = 3;
SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
master_move_shard_placement master_move_shard_placement
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -141,21 +141,21 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
ARRAY['-1500000000'], ARRAY['-1500000000'],
ARRAY[:worker_1_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
citus_split_shard_by_split_points citus_split_shard_by_split_points
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -126,8 +126,8 @@ WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
WARNING: role "citus_shard_move_subscription_role_10" cannot be dropped because some objects depend on it WARNING: role "citus_shard_move_subscription_role_xxxxxxx_xxxxxxx" cannot be dropped because some objects depend on it
DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx_xxxxxxx
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
ERROR: connection not open ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
@ -202,8 +202,8 @@ WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
WARNING: role "citus_shard_move_subscription_role_10" cannot be dropped because some objects depend on it WARNING: role "citus_shard_move_subscription_role_xxxxxxx_xxxxxxx" cannot be dropped because some objects depend on it
DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx_xxxxxxx
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
master_move_shard_placement master_move_shard_placement
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -316,7 +316,7 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
-- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup -- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup
SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_xxxxxxx$$); SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_xxxxxxx_xxxxxxx$$);
run_command_on_workers run_command_on_workers
--------------------------------------------------------------------- ---------------------------------------------------------------------
(localhost,9060,t,"DROP SUBSCRIPTION") (localhost,9060,t,"DROP SUBSCRIPTION")

View File

@ -57,7 +57,7 @@ CONTEXT: while executing command on localhost:xxxxx
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
(3 rows) (3 rows)
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
@ -160,8 +160,8 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
(4 rows) (4 rows)
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
@ -185,10 +185,10 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes
-- Left over publications -- Left over publications
SELECT pubname FROM pg_publication; SELECT pubname FROM pg_publication;
pubname pubname
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx
(2 rows) (2 rows)
-- Left over replication slots -- Left over replication slots
@ -271,9 +271,9 @@ CONTEXT: while executing command on localhost:xxxxx
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
(5 rows) (5 rows)
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
@ -297,10 +297,10 @@ CONTEXT: while executing command on localhost:xxxxx
-- Left over publications -- Left over publications
SELECT pubname FROM pg_publication; SELECT pubname FROM pg_publication;
pubname pubname
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx
(2 rows) (2 rows)
-- Left over replication slots -- Left over replication slots
@ -383,12 +383,12 @@ CONTEXT: while executing command on localhost:xxxxx
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 2 | 0
(8 rows) (8 rows)
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
@ -412,25 +412,25 @@ CONTEXT: while executing command on localhost:xxxxx
-- Left over publications -- Left over publications
SELECT pubname FROM pg_publication; SELECT pubname FROM pg_publication;
pubname pubname
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx
(2 rows) (2 rows)
-- Left over replication slots -- Left over replication slots
SELECT slot_name FROM pg_replication_slots; SELECT slot_name FROM pg_replication_slots;
slot_name slot_name
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx
(2 rows) (2 rows)
-- Left over subscriptions -- Left over subscriptions
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
subname subname
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx citus_shard_split_subscription_xxxxxxx_xxxxxxx
(1 row) (1 row)
\c - postgres - :master_port \c - postgres - :master_port
@ -501,12 +501,12 @@ CONTEXT: while executing command on localhost:xxxxx
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 2 | 0
(8 rows) (8 rows)
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
@ -530,25 +530,25 @@ CONTEXT: while executing command on localhost:xxxxx
-- Left over publications -- Left over publications
SELECT pubname FROM pg_publication; SELECT pubname FROM pg_publication;
pubname pubname
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx
(2 rows) (2 rows)
-- Left over replication slots -- Left over replication slots
SELECT slot_name FROM pg_replication_slots; SELECT slot_name FROM pg_replication_slots;
slot_name slot_name
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx
(2 rows) (2 rows)
-- Left over subscriptions -- Left over subscriptions
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
subname subname
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx citus_shard_split_subscription_xxxxxxx_xxxxxxx
(1 row) (1 row)
\c - postgres - :master_port \c - postgres - :master_port
@ -620,12 +620,12 @@ CONTEXT: while executing command on localhost:xxxxx
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 2 | 0
(8 rows) (8 rows)
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname;
@ -653,25 +653,25 @@ CONTEXT: while executing command on localhost:xxxxx
-- Left over publications -- Left over publications
SELECT pubname FROM pg_publication; SELECT pubname FROM pg_publication;
pubname pubname
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx
(2 rows) (2 rows)
-- Left over replication slots -- Left over replication slots
SELECT slot_name FROM pg_replication_slots; SELECT slot_name FROM pg_replication_slots;
slot_name slot_name
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx
(2 rows) (2 rows)
-- Left over subscriptions -- Left over subscriptions
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
subname subname
--------------------------------------------------------------------- ---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx citus_shard_split_subscription_xxxxxxx_xxxxxxx
(1 row) (1 row)
\c - postgres - :master_port \c - postgres - :master_port

View File

@ -982,20 +982,20 @@ run_try_drop_marked_resources
step s1-show-pg_dist_cleanup: step s1-show-pg_dist_cleanup:
SELECT object_name, object_type, policy_type FROM pg_dist_cleanup; SELECT object_name, object_type, policy_type FROM pg_dist_cleanup;
object_name |object_type|policy_type object_name |object_type|policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
public.to_split_table_1500002 | 1| 1 public.to_split_table_1500002 | 1| 1
public.to_split_table_1500003 | 1| 1 public.to_split_table_1500003 | 1| 1
public.to_split_table_1500001 | 1| 0 public.to_split_table_1500001 | 1| 0
public.to_split_table_1500003 | 1| 0 public.to_split_table_1500003 | 1| 0
citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0
citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0
citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0
citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0
citus_shard_split_subscription_role_10| 5| 0 citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0
citus_shard_split_subscription_xxxxxxx | 2| 0 citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0
citus_shard_split_subscription_role_10| 5| 0 citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0
citus_shard_split_subscription_xxxxxxx | 2| 0 citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0
(12 rows) (12 rows)
step s1-release-split-advisory-lock: step s1-release-split-advisory-lock:
@ -1060,20 +1060,20 @@ run_try_drop_marked_resources
step s1-show-pg_dist_cleanup: step s1-show-pg_dist_cleanup:
SELECT object_name, object_type, policy_type FROM pg_dist_cleanup; SELECT object_name, object_type, policy_type FROM pg_dist_cleanup;
object_name |object_type|policy_type object_name |object_type|policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
public.to_split_table_1500002 | 1| 1 public.to_split_table_1500002 | 1| 1
public.to_split_table_1500003 | 1| 1 public.to_split_table_1500003 | 1| 1
public.to_split_table_1500001 | 1| 0 public.to_split_table_1500001 | 1| 0
public.to_split_table_1500003 | 1| 0 public.to_split_table_1500003 | 1| 0
citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0
citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 4| 0
citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0
citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 3| 0
citus_shard_split_subscription_role_10| 5| 0 citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0
citus_shard_split_subscription_xxxxxxx | 2| 0 citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0
citus_shard_split_subscription_role_10| 5| 0 citus_shard_split_subscription_role_xxxxxxx_xxxxxxx| 5| 0
citus_shard_split_subscription_xxxxxxx | 2| 0 citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2| 0
(12 rows) (12 rows)
step s1-release-split-advisory-lock: step s1-release-split-advisory-lock:

View File

@ -1203,6 +1203,7 @@ SELECT * FROM multi_extension.print_extension_changes();
--------------------------------------------------------------------- ---------------------------------------------------------------------
function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text) | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text) |
function worker_append_table_to_shard(text,text,text,integer) void | function worker_append_table_to_shard(text,text,text,integer) void |
function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info |
| function citus_get_node_clock() cluster_clock | function citus_get_node_clock() cluster_clock
| function citus_get_transaction_clock() cluster_clock | function citus_get_transaction_clock() cluster_clock
| function citus_internal_adjust_local_clock_to_remote(cluster_clock) void | function citus_internal_adjust_local_clock_to_remote(cluster_clock) void
@ -1220,6 +1221,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| function cluster_clock_recv(internal) cluster_clock | function cluster_clock_recv(internal) cluster_clock
| function cluster_clock_send(cluster_clock) bytea | function cluster_clock_send(cluster_clock) bytea
| function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text, source_lsn pg_lsn, target_lsn pg_lsn, status text) | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint, operation_type text, source_lsn pg_lsn, target_lsn pg_lsn, status text)
| function worker_split_shard_replication_setup(split_shard_info[],bigint) SETOF replication_slot_info
| operator <(cluster_clock,cluster_clock) | operator <(cluster_clock,cluster_clock)
| operator <=(cluster_clock,cluster_clock) | operator <=(cluster_clock,cluster_clock)
| operator <>(cluster_clock,cluster_clock) | operator <>(cluster_clock,cluster_clock)
@ -1230,7 +1232,7 @@ SELECT * FROM multi_extension.print_extension_changes();
| operator family cluster_clock_ops for access method btree | operator family cluster_clock_ops for access method btree
| sequence pg_dist_clock_logical_seq | sequence pg_dist_clock_logical_seq
| type cluster_clock | type cluster_clock
(29 rows) (31 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -1472,7 +1472,7 @@ BEGIN;
SET LOCAL citus.log_remote_commands TO ON; SET LOCAL citus.log_remote_commands TO ON;
SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical'); SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical');
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx, binary=true) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx, binary=true)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
citus_move_shard_placement citus_move_shard_placement
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1487,7 +1487,7 @@ SET LOCAL citus.log_remote_commands TO ON;
SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
SET LOCAL citus.enable_binary_protocol = FALSE; SET LOCAL citus.enable_binary_protocol = FALSE;
SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical'); SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical');
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx_xxxxxxx)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
citus_move_shard_placement citus_move_shard_placement
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -11,7 +11,7 @@ SET client_min_messages TO ERROR;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]); ], 0);
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -21,7 +21,7 @@ SET client_min_messages TO WARNING;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]); ], 0);
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -37,7 +37,7 @@ SELECT pg_catalog.worker_split_shard_release_dsm();
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]); ], 0);
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1

View File

@ -69,7 +69,7 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info,
ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ], 0);
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -78,8 +78,16 @@ WARNING: Previous split shard worflow was not successfully and could not comple
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset
SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset -- we create replication slots with a name including the next_operation_id as a suffix
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset -- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;
citus.next_operation_id
---------------------------------------------------------------------
0
(1 row)
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset
-- Create subscription at worker2 with copy_data to 'false' -- Create subscription at worker2 with copy_data to 'false'
\c - postgres - :worker_2_port \c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema; SET search_path TO split_shard_replication_setup_schema;

View File

@ -65,13 +65,21 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ], 0);
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
(1 row) (1 row)
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset -- we create replication slots with a name including the next_operation_id as a suffix
-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;
citus.next_operation_id
---------------------------------------------------------------------
0
(1 row)
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema; SET search_path TO split_shard_replication_setup_schema;

View File

@ -13,13 +13,21 @@ CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_s
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]); ], 0);
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
(1 row) (1 row)
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset -- we create replication slots with a name including the next_operation_id as a suffix
-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;
citus.next_operation_id
---------------------------------------------------------------------
0
(1 row)
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset
-- Create subscription at worker1 with copy_data to 'false' a -- Create subscription at worker1 with copy_data to 'false' a
BEGIN; BEGIN;
CREATE SUBSCRIPTION local_subscription CREATE SUBSCRIPTION local_subscription

View File

@ -11,15 +11,23 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ], 0);
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
2 2
(1 row) (1 row)
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset -- we create replication slots with a name including the next_operation_id as a suffix
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset -- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;
citus.next_operation_id
---------------------------------------------------------------------
0
(1 row)
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset
-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1'
CREATE SUBSCRIPTION sub_worker1 CREATE SUBSCRIPTION sub_worker1
CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression'

View File

@ -253,7 +253,7 @@ ORDER BY 1;
function worker_save_query_explain_analyze(text,jsonb) function worker_save_query_explain_analyze(text,jsonb)
function worker_split_copy(bigint,text,split_copy_info[]) function worker_split_copy(bigint,text,split_copy_info[])
function worker_split_shard_release_dsm() function worker_split_shard_release_dsm()
function worker_split_shard_replication_setup(split_shard_info[]) function worker_split_shard_replication_setup(split_shard_info[],bigint)
operator <(cluster_clock,cluster_clock) operator <(cluster_clock,cluster_clock)
operator <=(cluster_clock,cluster_clock) operator <=(cluster_clock,cluster_clock)
operator <>(cluster_clock,cluster_clock) operator <>(cluster_clock,cluster_clock)

View File

@ -131,7 +131,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
-- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup -- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup
SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10$$); SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10_15$$);
-- cleanup leftovers -- cleanup leftovers
-- verify we don't see any error for already dropped subscription -- verify we don't see any error for already dropped subscription
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;

View File

@ -13,16 +13,16 @@ SET client_min_messages TO ERROR;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]); ], 0);
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]); ], 0);
SELECT pg_catalog.worker_split_shard_release_dsm(); SELECT pg_catalog.worker_split_shard_release_dsm();
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]); ], 0);

View File

@ -71,14 +71,18 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info,
ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ], 0);
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset
SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset
SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset -- we create replication slots with a name including the next_operation_id as a suffix
-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset
SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset
-- Create subscription at worker2 with copy_data to 'false' -- Create subscription at worker2 with copy_data to 'false'
\c - postgres - :worker_2_port \c - postgres - :worker_2_port

View File

@ -69,9 +69,13 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ], 0);
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset -- we create replication slots with a name including the next_operation_id as a suffix
-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset
-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
\c - - - :worker_2_port \c - - - :worker_2_port

View File

@ -16,9 +16,13 @@ CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_s
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]); ], 0);
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset -- we create replication slots with a name including the next_operation_id as a suffix
-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset
-- Create subscription at worker1 with copy_data to 'false' a -- Create subscription at worker1 with copy_data to 'false' a
BEGIN; BEGIN;

View File

@ -14,10 +14,14 @@ CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_s
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ], 0);
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset -- we create replication slots with a name including the next_operation_id as a suffix
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset -- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset
-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1'
CREATE SUBSCRIPTION sub_worker1 CREATE SUBSCRIPTION sub_worker1