diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index bb9c7d221..1edd48c48 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -131,8 +131,6 @@ static void ExecuteCreateIndexStatisticsCommands(List *logicalRepTargetList); static void ExecuteRemainingPostLoadTableCommands(List *logicalRepTargetList); static char * escape_param_str(const char *str); static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); -static bool RelationSubscriptionsAreReady( - GroupedLogicalRepTargets *groupedLogicalRepTargets); static void WaitForMiliseconds(long timeout); static XLogRecPtr GetSubscriptionPosition( GroupedLogicalRepTargets *groupedLogicalRepTargets); @@ -152,8 +150,6 @@ static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode, List *shardIntervals); static List * CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList); -static void WaitForGroupedLogicalRepTargetsToBecomeReady( - GroupedLogicalRepTargets *groupedLogicalRepTargets); static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition, GroupedLogicalRepTargets * groupedLogicalRepTargets); @@ -386,18 +382,6 @@ CompleteNonBlockingShardTransfer(List *shardList, sourceConnection->port, PLACEMENT_UPDATE_STATUS_CATCHING_UP); - /* - * The following check is a leftover from when used subscriptions with - * copy_data=true. It's probably not really necessary anymore, but it - * seemed like a nice check to keep. At least for debugging issues it - * seems nice to report differences between the subscription never - * becoming ready and the subscriber not applying WAL. It's also not - * entirely clear if the catchup check handles the case correctly where - * the subscription is not in the ready state yet, because so far it - * never had to. - */ - WaitForAllSubscriptionsToBecomeReady(groupedLogicalRepTargetsHash); - /* * Wait until all the subscriptions are caught up to changes that * happened after the initial COPY on the shards. @@ -2198,116 +2182,6 @@ CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash) } -/* - * WaitForRelationSubscriptionsBecomeReady waits until the states of the - * subsriptions in the groupedLogicalRepTargetsHash becomes ready. This should happen - * very quickly, because we don't use the COPY logic from the subscriptions. So - * all that's needed is to start reading from the replication slot. - * - * The function errors if the subscriptions on one of the nodes don't become - * ready within LogicalReplicationErrorTimeout. The function also reports its - * progress every logicalReplicationProgressReportTimeout. - */ -void -WaitForAllSubscriptionsToBecomeReady(HTAB *groupedLogicalRepTargetsHash) -{ - HASH_SEQ_STATUS status; - GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL; - foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash) - { - WaitForGroupedLogicalRepTargetsToBecomeReady(groupedLogicalRepTargets); - } - elog(LOG, "The states of all subscriptions have become READY"); -} - - -/* - * WaitForRelationSubscriptionsBecomeReady waits until the states of the - * subsriptions for each shard becomes ready. This should happen very quickly, - * because we don't use the COPY logic from the subscriptions. So all that's - * needed is to start reading from the replication slot. - * - * The function errors if the subscriptions don't become ready within - * LogicalReplicationErrorTimeout. The function also reports its progress in - * every logicalReplicationProgressReportTimeout. - */ -static void -WaitForGroupedLogicalRepTargetsToBecomeReady( - GroupedLogicalRepTargets *groupedLogicalRepTargets) -{ - TimestampTz previousSizeChangeTime = GetCurrentTimestamp(); - TimestampTz previousReportTime = GetCurrentTimestamp(); - - MultiConnection *superuserConnection = groupedLogicalRepTargets->superuserConnection; - - /* - * We might be in the loop for a while. Since we don't need to preserve - * any memory beyond this function, we can simply switch to a child context - * and reset it on every iteration to make sure we don't slowly build up - * a lot of memory. - */ - MemoryContext loopContext = AllocSetContextCreateInternal(CurrentMemoryContext, - "WaitForRelationSubscriptionsBecomeReady", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - - MemoryContext oldContext = MemoryContextSwitchTo(loopContext); - while (true) - { - /* we're done, all relations are ready */ - if (RelationSubscriptionsAreReady(groupedLogicalRepTargets)) - { - ereport(LOG, (errmsg("The states of the relations belonging to the " - "subscriptions became READY on the " - "target node %s:%d", - superuserConnection->hostname, - superuserConnection->port))); - - break; - } - - /* log the progress if necessary */ - if (TimestampDifferenceExceeds(previousReportTime, - GetCurrentTimestamp(), - logicalReplicationProgressReportTimeout)) - { - ereport(LOG, (errmsg("Not all subscriptions on target node %s:%d " - "are READY yet", - superuserConnection->hostname, - superuserConnection->port))); - - previousReportTime = GetCurrentTimestamp(); - } - - /* Error out if the size does not change within the given time threshold */ - if (TimestampDifferenceExceeds(previousSizeChangeTime, - GetCurrentTimestamp(), - LogicalReplicationTimeout)) - { - ereport(ERROR, (errmsg("The logical replication waiting timeout " - "of %d msec is exceeded", - LogicalReplicationTimeout), - errdetail("The subscribed relations haven't become " - "ready on the target node %s:%d", - superuserConnection->hostname, - superuserConnection->port), - errhint( - "Logical replication has failed to initialize " - "on the target node. If not, consider using " - "higher values for " - "citus.logical_replication_timeout"))); - } - - /* wait for 1 second (1000 miliseconds) and try again */ - WaitForMiliseconds(1000); - MemoryContextReset(loopContext); - } - - MemoryContextSwitchTo(oldContext); -} - - /* * SubscriptionNamesValueList returns a SQL value list containing the * subscription names from the logicalRepTargetList. This value list can @@ -2339,62 +2213,6 @@ SubscriptionNamesValueList(List *logicalRepTargetList) } -/* - * RelationSubscriptionsAreReady gets the subscription status for each - * subscriptions and returns false if at least one of them is not ready. - */ -static bool -RelationSubscriptionsAreReady(GroupedLogicalRepTargets *groupedLogicalRepTargets) -{ - bool raiseInterrupts = false; - - List *logicalRepTargetList = groupedLogicalRepTargets->logicalRepTargetList; - MultiConnection *superuserConnection = groupedLogicalRepTargets->superuserConnection; - - char *subscriptionValueList = SubscriptionNamesValueList(logicalRepTargetList); - char *query = psprintf( - "SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription " - "WHERE srsubid = subid AND srsubstate != 'r' AND subname IN %s", - subscriptionValueList); - int querySent = SendRemoteCommand(superuserConnection, query); - if (querySent == 0) - { - ReportConnectionError(superuserConnection, ERROR); - } - - PGresult *result = GetRemoteCommandResult(superuserConnection, raiseInterrupts); - if (!IsResponseOK(result)) - { - ReportResultError(superuserConnection, result, ERROR); - } - - int rowCount = PQntuples(result); - int columnCount = PQnfields(result); - - if (columnCount != 1) - { - ereport(ERROR, (errmsg("unexpected number of columns returned while reading "))); - } - if (rowCount != 1) - { - ereport(ERROR, (errmsg("unexpected number of rows returned while reading "))); - } - - int columnIndex = 0; - int rowIndex = 0; - - /* we're using the pstrdup to copy the data into the current memory context */ - char *resultString = pstrdup(PQgetvalue(result, rowIndex, columnIndex)); - - PQclear(result); - ForgetResults(superuserConnection); - - int64 resultInt = SafeStringToInt64(resultString); - - return resultInt == 0; -} - - /* * WaitForAllSubscriptionToCatchUp waits until the last LSN reported by the * subscription. diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 16ac4ac02..1db36402b 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -163,7 +163,6 @@ extern char * ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t n extern char * SubscriptionName(LogicalRepType type, Oid ownerId); extern char * SubscriptionRoleName(LogicalRepType type, Oid ownerId); -extern void WaitForAllSubscriptionsToBecomeReady(HTAB *groupedLogicalRepTargetsHash); extern void WaitForAllSubscriptionsToCatchUp(MultiConnection *sourceConnection, HTAB *groupedLogicalRepTargetsHash); extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, diff --git a/src/test/regress/expected/failure_online_move_shard_placement.out b/src/test/regress/expected/failure_online_move_shard_placement.out index 0ea70a2eb..280936067 100644 --- a/src/test/regress/expected/failure_online_move_shard_placement.out +++ b/src/test/regress/expected/failure_online_move_shard_placement.out @@ -114,8 +114,8 @@ CONTEXT: while executing command on localhost:xxxxx -- https://www.postgresql.org/message-id/flat/HE1PR8303MB0075BF78AF1BE904050DA16BF7729%40HE1PR8303MB0075.EURPRD83.prod.outlook.com -- SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").cancel(' || :pid || ')'); -- SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); --- failure on polling subscription state -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").kill()'); +-- failure on dropping subscription +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").kill()'); mitmproxy --------------------------------------------------------------------- @@ -124,15 +124,25 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscrip SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx --- cancellation on polling subscription state -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").cancel(' || :pid || ')'); +-- try again +SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -ERROR: canceling statement due to user request + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port); + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + -- failure on polling last write-ahead log location reported to origin WAL sender SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").kill()'); mitmproxy diff --git a/src/test/regress/expected/failure_split_cleanup.out b/src/test/regress/expected/failure_split_cleanup.out index ec4810a5c..7c9cc2739 100644 --- a/src/test/regress/expected/failure_split_cleanup.out +++ b/src/test/regress/expected/failure_split_cleanup.out @@ -3,9 +3,8 @@ --2. Failure while creating shared memory segment --3. Failure while creating replication slots --4. Failure while enabling subscription ---5. Failure on polling subscription state ---6. Failure on polling last write-ahead log location reported to origin WAL sender ---7. Failure on dropping subscription +--5. Failure on polling last write-ahead log location reported to origin WAL sender +--6. Failure on dropping subscription CREATE SCHEMA "citus_failure_split_cleanup_schema"; SET search_path TO "citus_failure_split_cleanup_schema"; SET citus.next_shard_id TO 8981000; @@ -459,118 +458,7 @@ NOTICE: cleaned up 4 orphaned resources citus_shard_split_subscription_xxxxxxx (1 row) ---5. Failure on polling subscription state - \c - postgres - :master_port - SET citus.next_shard_id TO 8981002; - SET citus.next_operation_id TO 777; - SET citus.next_cleanup_record_id TO 11; - SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").killall()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - - SELECT pg_catalog.citus_split_shard_by_split_points( - 8981000, - ARRAY['-100000'], - ARRAY[:worker_1_node, :worker_2_node], - 'force_logical'); -ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open - SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; - operation_id | object_type | object_name | node_group_id | policy_type ---------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 -(4 rows) - - -- we need to allow connection so that we can connect to proxy - SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - - \c - - - :worker_2_proxy_port - SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; - SET citus.show_shards_for_app_name_prefixes = '*'; - -- Left over child shards - SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; - relname ---------------------------------------------------------------------- - table_to_split_8981000 - table_to_split_8981002 - table_to_split_8981003 -(3 rows) - - -- Left over publications - SELECT pubname FROM pg_publication; - pubname ---------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx -(2 rows) - - -- Left over replication slots - SELECT slot_name FROM pg_replication_slots; - slot_name ---------------------------------------------------------------------- - citus_shard_split_slot_xxxxxxx_xxxxxxx - citus_shard_split_slot_xxxxxxx_xxxxxxx -(2 rows) - - -- Left over subscriptions - SELECT subname FROM pg_subscription; - subname ---------------------------------------------------------------------- - citus_shard_split_subscription_xxxxxxx -(1 row) - - \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources - SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; - operation_id | object_type | object_name | node_group_id | policy_type ---------------------------------------------------------------------- -(0 rows) - - \c - - - :worker_2_proxy_port - SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; - SET citus.show_shards_for_app_name_prefixes = '*'; - -- Empty child shards after cleanup - SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; - relname ---------------------------------------------------------------------- - table_to_split_8981000 -(1 row) - - -- Empty publications - SELECT pubname FROM pg_publication; - pubname ---------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx -(2 rows) - - -- Empty replication slot table - SELECT slot_name FROM pg_replication_slots; - slot_name ---------------------------------------------------------------------- - citus_shard_split_slot_xxxxxxx_xxxxxxx - citus_shard_split_slot_xxxxxxx_xxxxxxx -(2 rows) - - -- Empty subscriptions - SELECT subname FROM pg_subscription; - subname ---------------------------------------------------------------------- - citus_shard_split_subscription_xxxxxxx -(1 row) - ---6. Failure on polling last write-ahead log location reported to origin WAL sender +--5. Failure on polling last write-ahead log location reported to origin WAL sender \c - postgres - :master_port SET citus.next_shard_id TO 8981002; SET citus.next_operation_id TO 777; @@ -681,7 +569,7 @@ NOTICE: cleaned up 4 orphaned resources citus_shard_split_subscription_xxxxxxx (1 row) ---7. Failure on dropping subscription +--6. Failure on dropping subscription \c - postgres - :master_port SET citus.next_shard_id TO 8981002; SET citus.next_operation_id TO 777; diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index 282a2895c..a8bec48da 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -65,13 +65,13 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' -- SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").cancel(' || :pid || ')'); -- SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); --- failure on polling subscription state -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").kill()'); +-- failure on dropping subscription +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); - --- cancellation on polling subscription state -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").cancel(' || :pid || ')'); +-- try again +SELECT citus.mitmproxy('conn.allow()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port); -- failure on polling last write-ahead log location reported to origin WAL sender SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").kill()'); diff --git a/src/test/regress/sql/failure_split_cleanup.sql b/src/test/regress/sql/failure_split_cleanup.sql index 034bb2c1b..17434917a 100644 --- a/src/test/regress/sql/failure_split_cleanup.sql +++ b/src/test/regress/sql/failure_split_cleanup.sql @@ -3,9 +3,8 @@ --2. Failure while creating shared memory segment --3. Failure while creating replication slots --4. Failure while enabling subscription ---5. Failure on polling subscription state ---6. Failure on polling last write-ahead log location reported to origin WAL sender ---7. Failure on dropping subscription +--5. Failure on polling last write-ahead log location reported to origin WAL sender +--6. Failure on dropping subscription CREATE SCHEMA "citus_failure_split_cleanup_schema"; SET search_path TO "citus_failure_split_cleanup_schema"; @@ -208,53 +207,7 @@ SELECT create_distributed_table('table_to_split', 'id'); -- Empty subscriptions SELECT subname FROM pg_subscription; ---5. Failure on polling subscription state - \c - postgres - :master_port - SET citus.next_shard_id TO 8981002; - SET citus.next_operation_id TO 777; - SET citus.next_cleanup_record_id TO 11; - - SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").killall()'); - SELECT pg_catalog.citus_split_shard_by_split_points( - 8981000, - ARRAY['-100000'], - ARRAY[:worker_1_node, :worker_2_node], - 'force_logical'); - SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; - -- we need to allow connection so that we can connect to proxy - SELECT citus.mitmproxy('conn.allow()'); - - \c - - - :worker_2_proxy_port - SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; - SET citus.show_shards_for_app_name_prefixes = '*'; - -- Left over child shards - SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; - -- Left over publications - SELECT pubname FROM pg_publication; - -- Left over replication slots - SELECT slot_name FROM pg_replication_slots; - -- Left over subscriptions - SELECT subname FROM pg_subscription; - - \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); - SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; - - \c - - - :worker_2_proxy_port - SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; - SET citus.show_shards_for_app_name_prefixes = '*'; - -- Empty child shards after cleanup - SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; - -- Empty publications - SELECT pubname FROM pg_publication; - -- Empty replication slot table - SELECT slot_name FROM pg_replication_slots; - -- Empty subscriptions - SELECT subname FROM pg_subscription; - ---6. Failure on polling last write-ahead log location reported to origin WAL sender +--5. Failure on polling last write-ahead log location reported to origin WAL sender \c - postgres - :master_port SET citus.next_shard_id TO 8981002; SET citus.next_operation_id TO 777; @@ -300,7 +253,7 @@ SELECT create_distributed_table('table_to_split', 'id'); -- Empty subscriptions SELECT subname FROM pg_subscription; ---7. Failure on dropping subscription +--6. Failure on dropping subscription \c - postgres - :master_port SET citus.next_shard_id TO 8981002; SET citus.next_operation_id TO 777;