mirror of https://github.com/citusdata/citus.git
Merge branch 'main' into fix-flaky-insert_select_connection_leak
commit
9c52194e4b
|
@ -10,6 +10,10 @@ on:
|
|||
required: false
|
||||
default: false
|
||||
type: boolean
|
||||
push:
|
||||
branches:
|
||||
- "main"
|
||||
- "release-*"
|
||||
pull_request:
|
||||
types: [opened, reopened,synchronize]
|
||||
jobs:
|
||||
|
|
|
@ -90,6 +90,28 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsMetadataSynced checks the workers to see if all workers with metadata are
|
||||
* synced.
|
||||
*/
|
||||
static bool
|
||||
IsMetadataSynced(void)
|
||||
{
|
||||
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerList)
|
||||
{
|
||||
if (workerNode->hasMetadata && !workerNode->metadataSynced)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* wait_until_metadata_sync waits until the maintenance daemon does a metadata
|
||||
* sync, or times out.
|
||||
|
@ -99,19 +121,10 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
|
|||
{
|
||||
uint32 timeout = PG_GETARG_UINT32(0);
|
||||
|
||||
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
|
||||
bool waitNotifications = false;
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerList)
|
||||
{
|
||||
/* if already has metadata, no need to do it again */
|
||||
if (workerNode->hasMetadata && !workerNode->metadataSynced)
|
||||
{
|
||||
waitNotifications = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* First we start listening. */
|
||||
MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION,
|
||||
LOCAL_HOST_NAME, PostPortNumber);
|
||||
ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL);
|
||||
|
||||
/*
|
||||
* If all the metadata nodes have already been synced, we should not wait.
|
||||
|
@ -119,15 +132,12 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
|
|||
* the notification and we'd wait unnecessarily here. Worse, the test outputs
|
||||
* might be inconsistent across executions due to the warning.
|
||||
*/
|
||||
if (!waitNotifications)
|
||||
if (IsMetadataSynced())
|
||||
{
|
||||
CloseConnection(connection);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION,
|
||||
LOCAL_HOST_NAME, PostPortNumber);
|
||||
ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL);
|
||||
|
||||
int waitFlags = WL_SOCKET_READABLE | WL_TIMEOUT | WL_POSTMASTER_DEATH;
|
||||
int waitResult = WaitLatchOrSocket(NULL, waitFlags, PQsocket(connection->pgConn),
|
||||
timeout, 0);
|
||||
|
@ -139,7 +149,7 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
|
|||
{
|
||||
ClearResults(connection, true);
|
||||
}
|
||||
else if (waitResult & WL_TIMEOUT)
|
||||
else if (waitResult & WL_TIMEOUT && !IsMetadataSynced())
|
||||
{
|
||||
elog(WARNING, "waiting for metadata sync timed out");
|
||||
}
|
||||
|
|
|
@ -277,12 +277,12 @@ CONTEXT: while executing command on localhost:xxxxx
|
|||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
operation_id | object_type | object_name | node_group_id | policy_type
|
||||
---------------------------------------------------------------------
|
||||
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
|
||||
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
|
||||
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
|
||||
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
|
||||
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
|
||||
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
|
||||
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
|
||||
|
@ -336,7 +336,7 @@ CONTEXT: while executing command on localhost:xxxxx
|
|||
(1 row)
|
||||
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
operation_id | object_type | object_name | node_group_id | policy_type
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
@ -388,7 +388,7 @@ CONTEXT: while executing command on localhost:xxxxx
|
|||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
operation_id | object_type | object_name | node_group_id | policy_type
|
||||
---------------------------------------------------------------------
|
||||
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
|
||||
|
@ -455,7 +455,7 @@ CONTEXT: while executing command on localhost:xxxxx
|
|||
(1 row)
|
||||
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
operation_id | object_type | object_name | node_group_id | policy_type
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
@ -507,7 +507,7 @@ CONTEXT: while executing command on localhost:xxxxx
|
|||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
operation_id | object_type | object_name | node_group_id | policy_type
|
||||
---------------------------------------------------------------------
|
||||
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
|
||||
|
@ -574,7 +574,7 @@ CONTEXT: while executing command on localhost:xxxxx
|
|||
(1 row)
|
||||
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
operation_id | object_type | object_name | node_group_id | policy_type
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
@ -634,7 +634,7 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin
|
|||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
operation_id | object_type | object_name | node_group_id | policy_type
|
||||
---------------------------------------------------------------------
|
||||
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
|
||||
|
@ -701,7 +701,7 @@ CONTEXT: while executing command on localhost:xxxxx
|
|||
(1 row)
|
||||
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
operation_id | object_type | object_name | node_group_id | policy_type
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1 s1-abort s2-abort
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-begin: BEGIN;
|
||||
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
|
||||
step s2-begin: BEGIN;
|
||||
step s2-update-node-1:
|
||||
-- update a specific node by address
|
||||
SELECT master_update_node(nodeid, 'localhost', nodeport + 10)
|
||||
FROM pg_dist_node
|
||||
WHERE nodename = 'localhost'
|
||||
AND nodeport = 57637;
|
||||
<waiting ...>
|
||||
step s1-abort: ABORT;
|
||||
step s2-update-node-1: <... completed>
|
||||
master_update_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-abort: ABORT;
|
||||
master_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1-force s2-abort s1-abort
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-begin: BEGIN;
|
||||
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
|
||||
step s2-begin: BEGIN;
|
||||
step s2-update-node-1-force:
|
||||
-- update a specific node by address (force)
|
||||
SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100)
|
||||
FROM pg_dist_node
|
||||
WHERE nodename = 'localhost'
|
||||
AND nodeport = 57637;
|
||||
<waiting ...>
|
||||
step s2-update-node-1-force: <... completed>
|
||||
master_update_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-abort: ABORT;
|
||||
step s1-abort: ABORT;
|
||||
FATAL: terminating connection due to administrator command
|
||||
FATAL: terminating connection due to administrator command
|
||||
SSL connection has been closed unexpectedly
|
||||
server closed the connection unexpectedly
|
||||
|
||||
master_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
(2 rows)
|
||||
|
|
@ -90,7 +90,7 @@ SELECT citus_disable_node('localhost', :worker_2_port);
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT public.wait_until_metadata_sync(60000);
|
||||
SELECT public.wait_until_metadata_sync(20000);
|
||||
wait_until_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
@ -812,7 +812,7 @@ SELECT citus_disable_node('localhost', 9999);
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT public.wait_until_metadata_sync(60000);
|
||||
SELECT public.wait_until_metadata_sync(20000);
|
||||
wait_until_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -136,7 +136,7 @@ SELECT create_distributed_table('table_to_split', 'id');
|
|||
ARRAY[:worker_1_node, :worker_2_node],
|
||||
'force_logical');
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
-- we need to allow connection so that we can connect to proxy
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
|
||||
|
@ -155,7 +155,7 @@ SELECT create_distributed_table('table_to_split', 'id');
|
|||
\c - postgres - :master_port
|
||||
SELECT public.wait_for_resource_cleanup();
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
|
||||
\c - - - :worker_2_proxy_port
|
||||
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
|
||||
|
@ -182,7 +182,7 @@ SELECT create_distributed_table('table_to_split', 'id');
|
|||
ARRAY[:worker_1_node, :worker_2_node],
|
||||
'force_logical');
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
-- we need to allow connection so that we can connect to proxy
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
|
||||
|
@ -201,7 +201,7 @@ SELECT create_distributed_table('table_to_split', 'id');
|
|||
\c - postgres - :master_port
|
||||
SELECT public.wait_for_resource_cleanup();
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
|
||||
\c - - - :worker_2_proxy_port
|
||||
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
|
||||
|
@ -228,7 +228,7 @@ SELECT create_distributed_table('table_to_split', 'id');
|
|||
ARRAY[:worker_1_node, :worker_2_node],
|
||||
'force_logical');
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
-- we need to allow connection so that we can connect to proxy
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
|
||||
|
@ -247,7 +247,7 @@ SELECT create_distributed_table('table_to_split', 'id');
|
|||
\c - postgres - :master_port
|
||||
SELECT public.wait_for_resource_cleanup();
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
|
||||
\c - - - :worker_2_proxy_port
|
||||
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
|
||||
|
@ -275,7 +275,7 @@ SELECT create_distributed_table('table_to_split', 'id');
|
|||
'force_logical');
|
||||
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname;
|
||||
-- we need to allow connection so that we can connect to proxy
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
|
@ -295,7 +295,7 @@ SELECT create_distributed_table('table_to_split', 'id');
|
|||
\c - postgres - :master_port
|
||||
SELECT public.wait_for_resource_cleanup();
|
||||
SELECT operation_id, object_type, object_name, node_group_id, policy_type
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
|
||||
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
|
||||
|
||||
\c - - - :worker_2_proxy_port
|
||||
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
|
||||
|
|
|
@ -39,7 +39,7 @@ SELECT master_get_active_worker_nodes();
|
|||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
|
||||
SELECT citus_disable_node('localhost', :worker_2_port);
|
||||
SELECT public.wait_until_metadata_sync(60000);
|
||||
SELECT public.wait_until_metadata_sync(20000);
|
||||
SELECT master_get_active_worker_nodes();
|
||||
|
||||
-- add some shard placements to the cluster
|
||||
|
@ -328,7 +328,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', 9996, groupid => :worker_2_g
|
|||
SELECT master_add_inactive_node('localhost', 9999, groupid => :worker_2_group, nodecluster => 'olap', noderole => 'secondary');
|
||||
SELECT master_activate_node('localhost', 9999);
|
||||
SELECT citus_disable_node('localhost', 9999);
|
||||
SELECT public.wait_until_metadata_sync(60000);
|
||||
SELECT public.wait_until_metadata_sync(20000);
|
||||
SELECT master_remove_node('localhost', 9999);
|
||||
|
||||
-- check that you can't manually add two primaries to a group
|
||||
|
|
Loading…
Reference in New Issue