Merge branch 'main' into fix-flaky-failure_split_cleanup

pull/7299/head
Jelte Fennema-Nio 2023-11-01 11:59:03 +01:00 committed by GitHub
commit 97ca794e97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 35 additions and 24 deletions

View File

@ -90,6 +90,28 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
} }
/*
* IsMetadataSynced checks the workers to see if all workers with metadata are
* synced.
*/
static bool
IsMetadataSynced(void)
{
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerList)
{
if (workerNode->hasMetadata && !workerNode->metadataSynced)
{
return false;
}
}
return true;
}
/* /*
* wait_until_metadata_sync waits until the maintenance daemon does a metadata * wait_until_metadata_sync waits until the maintenance daemon does a metadata
* sync, or times out. * sync, or times out.
@ -99,19 +121,10 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
{ {
uint32 timeout = PG_GETARG_UINT32(0); uint32 timeout = PG_GETARG_UINT32(0);
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); /* First we start listening. */
bool waitNotifications = false; MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION,
LOCAL_HOST_NAME, PostPortNumber);
WorkerNode *workerNode = NULL; ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL);
foreach_ptr(workerNode, workerList)
{
/* if already has metadata, no need to do it again */
if (workerNode->hasMetadata && !workerNode->metadataSynced)
{
waitNotifications = true;
break;
}
}
/* /*
* If all the metadata nodes have already been synced, we should not wait. * If all the metadata nodes have already been synced, we should not wait.
@ -119,15 +132,12 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
* the notification and we'd wait unnecessarily here. Worse, the test outputs * the notification and we'd wait unnecessarily here. Worse, the test outputs
* might be inconsistent across executions due to the warning. * might be inconsistent across executions due to the warning.
*/ */
if (!waitNotifications) if (IsMetadataSynced())
{ {
CloseConnection(connection);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION,
LOCAL_HOST_NAME, PostPortNumber);
ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL);
int waitFlags = WL_SOCKET_READABLE | WL_TIMEOUT | WL_POSTMASTER_DEATH; int waitFlags = WL_SOCKET_READABLE | WL_TIMEOUT | WL_POSTMASTER_DEATH;
int waitResult = WaitLatchOrSocket(NULL, waitFlags, PQsocket(connection->pgConn), int waitResult = WaitLatchOrSocket(NULL, waitFlags, PQsocket(connection->pgConn),
timeout, 0); timeout, 0);
@ -139,7 +149,7 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
{ {
ClearResults(connection, true); ClearResults(connection, true);
} }
else if (waitResult & WL_TIMEOUT) else if (waitResult & WL_TIMEOUT && !IsMetadataSynced())
{ {
elog(WARNING, "waiting for metadata sync timed out"); elog(WARNING, "waiting for metadata sync timed out");
} }

View File

@ -90,7 +90,7 @@ SELECT citus_disable_node('localhost', :worker_2_port);
(1 row) (1 row)
SELECT public.wait_until_metadata_sync(60000); SELECT public.wait_until_metadata_sync(20000);
wait_until_metadata_sync wait_until_metadata_sync
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -812,7 +812,7 @@ SELECT citus_disable_node('localhost', 9999);
(1 row) (1 row)
SELECT public.wait_until_metadata_sync(60000); SELECT public.wait_until_metadata_sync(20000);
wait_until_metadata_sync wait_until_metadata_sync
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -298,7 +298,8 @@ test: replicate_reference_tables_to_coordinator
test: citus_local_tables test: citus_local_tables
test: mixed_relkind_tests test: mixed_relkind_tests
test: multi_row_router_insert create_distributed_table_concurrently test: multi_row_router_insert create_distributed_table_concurrently
test: multi_reference_table citus_local_tables_queries test: multi_reference_table
test: citus_local_tables_queries
test: citus_local_table_triggers test: citus_local_table_triggers
test: coordinator_shouldhaveshards test: coordinator_shouldhaveshards
test: local_shard_utility_command_execution test: local_shard_utility_command_execution

View File

@ -39,7 +39,7 @@ SELECT master_get_active_worker_nodes();
SELECT 1 FROM master_add_node('localhost', :worker_2_port); SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT citus_disable_node('localhost', :worker_2_port); SELECT citus_disable_node('localhost', :worker_2_port);
SELECT public.wait_until_metadata_sync(60000); SELECT public.wait_until_metadata_sync(20000);
SELECT master_get_active_worker_nodes(); SELECT master_get_active_worker_nodes();
-- add some shard placements to the cluster -- add some shard placements to the cluster
@ -328,7 +328,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', 9996, groupid => :worker_2_g
SELECT master_add_inactive_node('localhost', 9999, groupid => :worker_2_group, nodecluster => 'olap', noderole => 'secondary'); SELECT master_add_inactive_node('localhost', 9999, groupid => :worker_2_group, nodecluster => 'olap', noderole => 'secondary');
SELECT master_activate_node('localhost', 9999); SELECT master_activate_node('localhost', 9999);
SELECT citus_disable_node('localhost', 9999); SELECT citus_disable_node('localhost', 9999);
SELECT public.wait_until_metadata_sync(60000); SELECT public.wait_until_metadata_sync(20000);
SELECT master_remove_node('localhost', 9999); SELECT master_remove_node('localhost', 9999);
-- check that you can't manually add two primaries to a group -- check that you can't manually add two primaries to a group