Add failure test for shard move (#6325)

DESCRIPTION: Adds failure test for shard move
DESCRIPTION: Remove function `WaitForAllSubscriptionsToBecomeReady` and
related tests

Adding some failure tests for shard moves.
Dropping the not-needed-anymore function
`WaitForAllSubscriptionsToBecomeReady`, as the subscriptions now start
as ready from the beginning because we don't use logical replication
table sync workers anymore.

fixes: #6260
pull/6444/head
Ahmet Gedemenli 2022-10-19 15:25:26 +03:00 committed by GitHub
parent 56da3cf6aa
commit cdbda9ea6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 28 additions and 360 deletions

View File

@ -131,8 +131,6 @@ static void ExecuteCreateIndexStatisticsCommands(List *logicalRepTargetList);
static void ExecuteRemainingPostLoadTableCommands(List *logicalRepTargetList);
static char * escape_param_str(const char *str);
static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command);
static bool RelationSubscriptionsAreReady(
GroupedLogicalRepTargets *groupedLogicalRepTargets);
static void WaitForMiliseconds(long timeout);
static XLogRecPtr GetSubscriptionPosition(
GroupedLogicalRepTargets *groupedLogicalRepTargets);
@ -152,8 +150,6 @@ static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode,
List *shardIntervals);
static List * CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash,
List *shardList);
static void WaitForGroupedLogicalRepTargetsToBecomeReady(
GroupedLogicalRepTargets *groupedLogicalRepTargets);
static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition,
GroupedLogicalRepTargets *
groupedLogicalRepTargets);
@ -386,18 +382,6 @@ CompleteNonBlockingShardTransfer(List *shardList,
sourceConnection->port,
PLACEMENT_UPDATE_STATUS_CATCHING_UP);
/*
* The following check is a leftover from when used subscriptions with
* copy_data=true. It's probably not really necessary anymore, but it
* seemed like a nice check to keep. At least for debugging issues it
* seems nice to report differences between the subscription never
* becoming ready and the subscriber not applying WAL. It's also not
* entirely clear if the catchup check handles the case correctly where
* the subscription is not in the ready state yet, because so far it
* never had to.
*/
WaitForAllSubscriptionsToBecomeReady(groupedLogicalRepTargetsHash);
/*
* Wait until all the subscriptions are caught up to changes that
* happened after the initial COPY on the shards.
@ -2198,116 +2182,6 @@ CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash)
}
/*
* WaitForRelationSubscriptionsBecomeReady waits until the states of the
* subsriptions in the groupedLogicalRepTargetsHash becomes ready. This should happen
* very quickly, because we don't use the COPY logic from the subscriptions. So
* all that's needed is to start reading from the replication slot.
*
* The function errors if the subscriptions on one of the nodes don't become
* ready within LogicalReplicationErrorTimeout. The function also reports its
* progress every logicalReplicationProgressReportTimeout.
*/
void
WaitForAllSubscriptionsToBecomeReady(HTAB *groupedLogicalRepTargetsHash)
{
HASH_SEQ_STATUS status;
GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL;
foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash)
{
WaitForGroupedLogicalRepTargetsToBecomeReady(groupedLogicalRepTargets);
}
elog(LOG, "The states of all subscriptions have become READY");
}
/*
* WaitForRelationSubscriptionsBecomeReady waits until the states of the
* subsriptions for each shard becomes ready. This should happen very quickly,
* because we don't use the COPY logic from the subscriptions. So all that's
* needed is to start reading from the replication slot.
*
* The function errors if the subscriptions don't become ready within
* LogicalReplicationErrorTimeout. The function also reports its progress in
* every logicalReplicationProgressReportTimeout.
*/
static void
WaitForGroupedLogicalRepTargetsToBecomeReady(
GroupedLogicalRepTargets *groupedLogicalRepTargets)
{
TimestampTz previousSizeChangeTime = GetCurrentTimestamp();
TimestampTz previousReportTime = GetCurrentTimestamp();
MultiConnection *superuserConnection = groupedLogicalRepTargets->superuserConnection;
/*
* We might be in the loop for a while. Since we don't need to preserve
* any memory beyond this function, we can simply switch to a child context
* and reset it on every iteration to make sure we don't slowly build up
* a lot of memory.
*/
MemoryContext loopContext = AllocSetContextCreateInternal(CurrentMemoryContext,
"WaitForRelationSubscriptionsBecomeReady",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext oldContext = MemoryContextSwitchTo(loopContext);
while (true)
{
/* we're done, all relations are ready */
if (RelationSubscriptionsAreReady(groupedLogicalRepTargets))
{
ereport(LOG, (errmsg("The states of the relations belonging to the "
"subscriptions became READY on the "
"target node %s:%d",
superuserConnection->hostname,
superuserConnection->port)));
break;
}
/* log the progress if necessary */
if (TimestampDifferenceExceeds(previousReportTime,
GetCurrentTimestamp(),
logicalReplicationProgressReportTimeout))
{
ereport(LOG, (errmsg("Not all subscriptions on target node %s:%d "
"are READY yet",
superuserConnection->hostname,
superuserConnection->port)));
previousReportTime = GetCurrentTimestamp();
}
/* Error out if the size does not change within the given time threshold */
if (TimestampDifferenceExceeds(previousSizeChangeTime,
GetCurrentTimestamp(),
LogicalReplicationTimeout))
{
ereport(ERROR, (errmsg("The logical replication waiting timeout "
"of %d msec is exceeded",
LogicalReplicationTimeout),
errdetail("The subscribed relations haven't become "
"ready on the target node %s:%d",
superuserConnection->hostname,
superuserConnection->port),
errhint(
"Logical replication has failed to initialize "
"on the target node. If not, consider using "
"higher values for "
"citus.logical_replication_timeout")));
}
/* wait for 1 second (1000 miliseconds) and try again */
WaitForMiliseconds(1000);
MemoryContextReset(loopContext);
}
MemoryContextSwitchTo(oldContext);
}
/*
* SubscriptionNamesValueList returns a SQL value list containing the
* subscription names from the logicalRepTargetList. This value list can
@ -2339,62 +2213,6 @@ SubscriptionNamesValueList(List *logicalRepTargetList)
}
/*
* RelationSubscriptionsAreReady gets the subscription status for each
* subscriptions and returns false if at least one of them is not ready.
*/
static bool
RelationSubscriptionsAreReady(GroupedLogicalRepTargets *groupedLogicalRepTargets)
{
bool raiseInterrupts = false;
List *logicalRepTargetList = groupedLogicalRepTargets->logicalRepTargetList;
MultiConnection *superuserConnection = groupedLogicalRepTargets->superuserConnection;
char *subscriptionValueList = SubscriptionNamesValueList(logicalRepTargetList);
char *query = psprintf(
"SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription "
"WHERE srsubid = subid AND srsubstate != 'r' AND subname IN %s",
subscriptionValueList);
int querySent = SendRemoteCommand(superuserConnection, query);
if (querySent == 0)
{
ReportConnectionError(superuserConnection, ERROR);
}
PGresult *result = GetRemoteCommandResult(superuserConnection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(superuserConnection, result, ERROR);
}
int rowCount = PQntuples(result);
int columnCount = PQnfields(result);
if (columnCount != 1)
{
ereport(ERROR, (errmsg("unexpected number of columns returned while reading ")));
}
if (rowCount != 1)
{
ereport(ERROR, (errmsg("unexpected number of rows returned while reading ")));
}
int columnIndex = 0;
int rowIndex = 0;
/* we're using the pstrdup to copy the data into the current memory context */
char *resultString = pstrdup(PQgetvalue(result, rowIndex, columnIndex));
PQclear(result);
ForgetResults(superuserConnection);
int64 resultInt = SafeStringToInt64(resultString);
return resultInt == 0;
}
/*
* WaitForAllSubscriptionToCatchUp waits until the last LSN reported by the
* subscription.

View File

@ -163,7 +163,6 @@ extern char * ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t n
extern char * SubscriptionName(LogicalRepType type, Oid ownerId);
extern char * SubscriptionRoleName(LogicalRepType type, Oid ownerId);
extern void WaitForAllSubscriptionsToBecomeReady(HTAB *groupedLogicalRepTargetsHash);
extern void WaitForAllSubscriptionsToCatchUp(MultiConnection *sourceConnection,
HTAB *groupedLogicalRepTargetsHash);
extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection,

View File

@ -114,8 +114,8 @@ CONTEXT: while executing command on localhost:xxxxx
-- https://www.postgresql.org/message-id/flat/HE1PR8303MB0075BF78AF1BE904050DA16BF7729%40HE1PR8303MB0075.EURPRD83.prod.outlook.com
-- SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").cancel(' || :pid || ')');
-- SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- failure on polling subscription state
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").kill()');
-- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").kill()');
mitmproxy
---------------------------------------------------------------------
@ -124,15 +124,25 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscrip
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on polling subscription state
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").cancel(' || :pid || ')');
-- try again
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port);
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- failure on polling last write-ahead log location reported to origin WAL sender
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").kill()');
mitmproxy

View File

@ -3,9 +3,8 @@
--2. Failure while creating shared memory segment
--3. Failure while creating replication slots
--4. Failure while enabling subscription
--5. Failure on polling subscription state
--6. Failure on polling last write-ahead log location reported to origin WAL sender
--7. Failure on dropping subscription
--5. Failure on polling last write-ahead log location reported to origin WAL sender
--6. Failure on dropping subscription
CREATE SCHEMA "citus_failure_split_cleanup_schema";
SET search_path TO "citus_failure_split_cleanup_schema";
SET citus.next_shard_id TO 8981000;
@ -459,118 +458,7 @@ NOTICE: cleaned up 4 orphaned resources
citus_shard_split_subscription_xxxxxxx
(1 row)
--5. Failure on polling subscription state
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").killall()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
(4 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname;
relname
---------------------------------------------------------------------
table_to_split_8981000
table_to_split_8981002
table_to_split_8981003
(3 rows)
-- Left over publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Left over replication slots
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
-- Left over subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
\c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname;
relname
---------------------------------------------------------------------
table_to_split_8981000
(1 row)
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
-- Empty subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
--6. Failure on polling last write-ahead log location reported to origin WAL sender
--5. Failure on polling last write-ahead log location reported to origin WAL sender
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
@ -681,7 +569,7 @@ NOTICE: cleaned up 4 orphaned resources
citus_shard_split_subscription_xxxxxxx
(1 row)
--7. Failure on dropping subscription
--6. Failure on dropping subscription
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;

View File

@ -65,13 +65,13 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
-- SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").cancel(' || :pid || ')');
-- SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- failure on polling subscription state
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").kill()');
-- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- cancellation on polling subscription state
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").cancel(' || :pid || ')');
-- try again
SELECT citus.mitmproxy('conn.allow()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port);
-- failure on polling last write-ahead log location reported to origin WAL sender
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").kill()');

View File

@ -3,9 +3,8 @@
--2. Failure while creating shared memory segment
--3. Failure while creating replication slots
--4. Failure while enabling subscription
--5. Failure on polling subscription state
--6. Failure on polling last write-ahead log location reported to origin WAL sender
--7. Failure on dropping subscription
--5. Failure on polling last write-ahead log location reported to origin WAL sender
--6. Failure on dropping subscription
CREATE SCHEMA "citus_failure_split_cleanup_schema";
SET search_path TO "citus_failure_split_cleanup_schema";
@ -208,53 +207,7 @@ SELECT create_distributed_table('table_to_split', 'id');
-- Empty subscriptions
SELECT subname FROM pg_subscription;
--5. Failure on polling subscription state
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").killall()');
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname;
-- Left over publications
SELECT pubname FROM pg_publication;
-- Left over replication slots
SELECT slot_name FROM pg_replication_slots;
-- Left over subscriptions
SELECT subname FROM pg_subscription;
\c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname;
-- Empty publications
SELECT pubname FROM pg_publication;
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
-- Empty subscriptions
SELECT subname FROM pg_subscription;
--6. Failure on polling last write-ahead log location reported to origin WAL sender
--5. Failure on polling last write-ahead log location reported to origin WAL sender
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
@ -300,7 +253,7 @@ SELECT create_distributed_table('table_to_split', 'id');
-- Empty subscriptions
SELECT subname FROM pg_subscription;
--7. Failure on dropping subscription
--6. Failure on dropping subscription
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;