From 8ee02b29d0b02c885fb0716f932c1c1f4f7e9640 Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Thu, 6 Jan 2022 12:31:00 +0300 Subject: [PATCH] Introduce global PID --- .../connection/connection_configuration.c | 7 +- .../connection/connection_management.c | 3 +- .../distributed/metadata/metadata_cache.c | 61 ++++++ src/backend/distributed/shared_library_init.c | 1 + .../distributed/sql/citus--10.2-4--11.0-1.sql | 19 ++ .../sql/downgrades/citus--11.0-1--10.2-4.sql | 125 ++++++++++++ .../udfs/citus_dist_stat_activity/11.0-1.sql | 19 ++ .../udfs/citus_dist_stat_activity/latest.sql | 19 ++ .../sql/udfs/citus_lock_waits/11.0-1.sql | 44 ++++ .../sql/udfs/citus_lock_waits/latest.sql | 44 ++++ .../citus_worker_stat_activity/11.0-1.sql | 19 ++ .../citus_worker_stat_activity/latest.sql | 19 ++ .../get_all_active_transactions/11.0-1.sql | 12 ++ .../get_all_active_transactions/latest.sql | 12 ++ .../get_global_active_transactions/11.0-1.sql | 9 + .../get_global_active_transactions/latest.sql | 9 + .../distributed/transaction/backend_data.c | 134 ++++++++++++- .../transaction/citus_dist_stat_activity.c | 19 +- src/include/distributed/backend_data.h | 6 + .../distributed/connection_management.h | 2 +- src/include/distributed/metadata_cache.h | 1 + .../failure_connection_establishment.out | 4 +- .../isolation_distributed_transaction_id.out | 3 +- .../isolation_get_all_active_transactions.out | 16 +- .../regress/expected/isolation_global_pid.out | 145 ++++++++++++++ ...licate_reference_tables_to_coordinator.out | 4 +- .../expected/metadata_sync_helpers.out | 188 +++++++++++------- src/test/regress/isolation_schedule | 1 + .../isolation_distributed_transaction_id.spec | 3 +- ...isolation_get_all_active_transactions.spec | 16 +- .../regress/spec/isolation_global_pid.spec | 96 +++++++++ ...icate_reference_tables_to_coordinator.spec | 4 +- .../sql/failure_connection_establishment.sql | 2 +- .../regress/sql/metadata_sync_helpers.sql | 168 +++++++++------- 34 files changed, 1057 insertions(+), 177 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_dist_stat_activity/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_dist_stat_activity/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_worker_stat_activity/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_worker_stat_activity/latest.sql create mode 100644 src/backend/distributed/sql/udfs/get_all_active_transactions/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/get_all_active_transactions/latest.sql create mode 100644 src/backend/distributed/sql/udfs/get_global_active_transactions/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/get_global_active_transactions/latest.sql create mode 100644 src/test/regress/expected/isolation_global_pid.out create mode 100644 src/test/regress/spec/isolation_global_pid.spec diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index 32dc21e40..41017fab9 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -10,6 +10,7 @@ #include "postgres.h" +#include "distributed/backend_data.h" #include "distributed/citus_safe_lib.h" #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" @@ -232,6 +233,10 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, */ char nodePortString[12] = ""; + StringInfo applicationName = makeStringInfo(); + appendStringInfo(applicationName, "%s%ld", CITUS_APPLICATION_NAME_PREFIX, + GetGlobalPID()); + /* * This function has three sections: * - Initialize the keywords and values (to be copied later) of global parameters @@ -260,7 +265,7 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, key->database, key->user, GetDatabaseEncodingName(), - CITUS_APPLICATION_NAME + applicationName->data }; /* diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 89a863109..13b52790a 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -19,6 +19,7 @@ #include "access/hash.h" #include "commands/dbcommands.h" +#include "distributed/backend_data.h" #include "distributed/connection_management.h" #include "distributed/errormessage.h" #include "distributed/error_codes.h" @@ -1459,7 +1460,7 @@ IsRebalancerInternalBackend(void) bool IsCitusInternalBackend(void) { - return application_name && strcmp(application_name, CITUS_APPLICATION_NAME) == 0; + return ExtractGlobalPID(application_name) != INVALID_CITUS_INTERNAL_BACKEND_GPID; } diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index ac6db8c61..161a56942 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -201,6 +201,9 @@ static bool workerNodeHashValid = false; /* default value is -1, for coordinator it's 0 and for worker nodes > 0 */ static int32 LocalGroupId = -1; +/* default value is -1, increases with every node starting from 1 */ +static int32 LocalNodeId = -1; + /* built first time through in InitializeDistCache */ static ScanKeyData DistPartitionScanKey[1]; static ScanKeyData DistShardScanKey[1]; @@ -3618,6 +3621,62 @@ GetLocalGroupId(void) } +/* + * GetNodeId returns the node identifier of the local node. + */ +int32 +GetLocalNodeId(void) +{ + InitializeCaches(); + + /* + * Already set the node id, no need to read the heap again. + */ + if (LocalNodeId != -1) + { + return LocalNodeId; + } + + uint32 nodeId = -1; + + int32 localGroupId = GetLocalGroupId(); + + bool includeNodesFromOtherClusters = false; + List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters); + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + if (workerNode->groupId == localGroupId && + workerNode->isActive) + { + nodeId = workerNode->nodeId; + break; + } + } + + /* + * nodeId is -1 if we cannot find an active node whose group id is + * localGroupId in pg_dist_node. + */ + if (nodeId == -1) + { + elog(DEBUG4, "there is no active node with group id '%d' on pg_dist_node", + localGroupId); + + /* + * This is expected if the coordinator is not added to the metadata. + * We'll return 0 for this case and for all cases so views can function almost normally + */ + nodeId = 0; + } + + LocalNodeId = nodeId; + + return nodeId; +} + + /* * RegisterLocalGroupIdCacheCallbacks registers the callbacks required to * maintain LocalGroupId at a consistent value. It's separate from @@ -4019,6 +4078,7 @@ InvalidateMetadataSystemCache(void) memset(&MetadataCache, 0, sizeof(MetadataCache)); workerNodeHashValid = false; LocalGroupId = -1; + LocalNodeId = -1; } @@ -4110,6 +4170,7 @@ InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId) if (relationId == InvalidOid || relationId == MetadataCache.distNodeRelationId) { workerNodeHashValid = false; + LocalNodeId = -1; } } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index e109bceed..70d3f549c 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -458,6 +458,7 @@ StartupCitusBackend(void) InitializeMaintenanceDaemonBackend(); InitializeBackendData(); RegisterConnectionCleanup(); + AssignGlobalPID(); } diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index 9c23b31c2..c3ffbb1cb 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -14,6 +14,25 @@ #include "udfs/worker_drop_sequence_dependency/11.0-1.sql" #include "udfs/worker_drop_shell_table/11.0-1.sql" +#include "udfs/get_all_active_transactions/11.0-1.sql" +#include "udfs/get_global_active_transactions/11.0-1.sql" + +#include "udfs/citus_worker_stat_activity/11.0-1.sql" + +CREATE VIEW citus.citus_worker_stat_activity AS +SELECT * FROM pg_catalog.citus_worker_stat_activity(); +ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; + +#include "udfs/citus_dist_stat_activity/11.0-1.sql" + +CREATE VIEW citus.citus_dist_stat_activity AS +SELECT * FROM pg_catalog.citus_dist_stat_activity(); +ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; + +-- we have to recreate this view because recreated citus_dist_stat_activity that this view depends +#include "udfs/citus_lock_waits/11.0-1.sql" DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text); DROP FUNCTION pg_catalog.master_get_table_metadata(text); diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 6fbe07ea3..e94ed0bbf 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -84,3 +84,128 @@ DROP FUNCTION pg_catalog.citus_shards_on_worker(); DROP FUNCTION pg_catalog.citus_shard_indexes_on_worker(); #include "../udfs/create_distributed_function/9.0-1.sql" ALTER TABLE citus.pg_dist_object DROP COLUMN force_delegation; + + +SET search_path = 'pg_catalog'; + + +DROP FUNCTION IF EXISTS get_all_active_transactions(); + + +CREATE OR REPLACE FUNCTION get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$get_all_active_transactions$$; + +COMMENT ON FUNCTION get_all_active_transactions(OUT datid oid, OUT datname text, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz) +IS 'returns distributed transaction ids of active distributed transactions'; + +DROP FUNCTION IF EXISTS get_global_active_transactions(); + +CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz) + RETURNS SETOF RECORD + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$get_global_active_transactions$$; + COMMENT ON FUNCTION get_global_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz) + IS 'returns distributed transaction ids of active distributed transactions from each node of the cluster'; + +RESET search_path; + +DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$citus_dist_stat_activity$$; + +COMMENT ON FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +IS 'returns distributed transaction activity on distributed tables'; + +CREATE VIEW citus.citus_dist_stat_activity AS +SELECT * FROM pg_catalog.citus_dist_stat_activity(); +ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC; + +SET search_path = 'pg_catalog'; + +-- we have to recreate this view because we drop citus_dist_stat_activity that this view depends +CREATE VIEW citus.citus_lock_waits AS + +WITH +citus_dist_stat_activity AS +( + SELECT * FROM citus_dist_stat_activity +), +unique_global_wait_edges AS +( + SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() +), +citus_dist_stat_activity_with_node_id AS +( + SELECT + citus_dist_stat_activity.*, (CASE citus_dist_stat_activity.distributed_query_host_name WHEN 'coordinator_host' THEN 0 ELSE pg_dist_node.nodeid END) as initiator_node_id + FROM + citus_dist_stat_activity LEFT JOIN pg_dist_node + ON + citus_dist_stat_activity.distributed_query_host_name = pg_dist_node.nodename AND + citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport +) +SELECT + waiting.pid AS waiting_pid, + blocking.pid AS blocking_pid, + waiting.query AS blocked_statement, + blocking.query AS current_statement_in_blocking_process, + waiting.initiator_node_id AS waiting_node_id, + blocking.initiator_node_id AS blocking_node_id, + waiting.distributed_query_host_name AS waiting_node_name, + blocking.distributed_query_host_name AS blocking_node_name, + waiting.distributed_query_host_port AS waiting_node_port, + blocking.distributed_query_host_port AS blocking_node_port +FROM + unique_global_wait_edges +JOIN + citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id) +JOIN + citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id); + +ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; + +DROP FUNCTION citus_worker_stat_activity CASCADE; + +CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$citus_worker_stat_activity$$; + +COMMENT ON FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text) +IS 'returns distributed transaction activity on shards of distributed tables'; + +CREATE VIEW citus.citus_worker_stat_activity AS +SELECT * FROM pg_catalog.citus_worker_stat_activity(); +ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; + +RESET search_path; diff --git a/src/backend/distributed/sql/udfs/citus_dist_stat_activity/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_dist_stat_activity/11.0-1.sql new file mode 100644 index 000000000..7b38f627d --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_dist_stat_activity/11.0-1.sql @@ -0,0 +1,19 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_dist_stat_activity CASCADE; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$citus_dist_stat_activity$$; + +COMMENT ON FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) +IS 'returns distributed transaction activity on distributed tables'; diff --git a/src/backend/distributed/sql/udfs/citus_dist_stat_activity/latest.sql b/src/backend/distributed/sql/udfs/citus_dist_stat_activity/latest.sql new file mode 100644 index 000000000..7b38f627d --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_dist_stat_activity/latest.sql @@ -0,0 +1,19 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_dist_stat_activity CASCADE; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$citus_dist_stat_activity$$; + +COMMENT ON FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) +IS 'returns distributed transaction activity on distributed tables'; diff --git a/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql new file mode 100644 index 000000000..2ae40374a --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_lock_waits/11.0-1.sql @@ -0,0 +1,44 @@ +SET search_path = 'pg_catalog'; + +CREATE VIEW citus.citus_lock_waits AS +WITH +citus_dist_stat_activity AS +( + SELECT * FROM citus_dist_stat_activity +), +unique_global_wait_edges AS +( + SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() +), +citus_dist_stat_activity_with_node_id AS +( + SELECT + citus_dist_stat_activity.*, (CASE citus_dist_stat_activity.distributed_query_host_name WHEN 'coordinator_host' THEN 0 ELSE pg_dist_node.nodeid END) as initiator_node_id + FROM + citus_dist_stat_activity LEFT JOIN pg_dist_node + ON + citus_dist_stat_activity.distributed_query_host_name = pg_dist_node.nodename AND + citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport +) +SELECT + waiting.pid AS waiting_pid, + blocking.pid AS blocking_pid, + waiting.query AS blocked_statement, + blocking.query AS current_statement_in_blocking_process, + waiting.initiator_node_id AS waiting_node_id, + blocking.initiator_node_id AS blocking_node_id, + waiting.distributed_query_host_name AS waiting_node_name, + blocking.distributed_query_host_name AS blocking_node_name, + waiting.distributed_query_host_port AS waiting_node_port, + blocking.distributed_query_host_port AS blocking_node_port +FROM + unique_global_wait_edges +JOIN + citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id) +JOIN + citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id); + +ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; + +RESET search_path; diff --git a/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql b/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql new file mode 100644 index 000000000..2ae40374a --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_lock_waits/latest.sql @@ -0,0 +1,44 @@ +SET search_path = 'pg_catalog'; + +CREATE VIEW citus.citus_lock_waits AS +WITH +citus_dist_stat_activity AS +( + SELECT * FROM citus_dist_stat_activity +), +unique_global_wait_edges AS +( + SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges() +), +citus_dist_stat_activity_with_node_id AS +( + SELECT + citus_dist_stat_activity.*, (CASE citus_dist_stat_activity.distributed_query_host_name WHEN 'coordinator_host' THEN 0 ELSE pg_dist_node.nodeid END) as initiator_node_id + FROM + citus_dist_stat_activity LEFT JOIN pg_dist_node + ON + citus_dist_stat_activity.distributed_query_host_name = pg_dist_node.nodename AND + citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport +) +SELECT + waiting.pid AS waiting_pid, + blocking.pid AS blocking_pid, + waiting.query AS blocked_statement, + blocking.query AS current_statement_in_blocking_process, + waiting.initiator_node_id AS waiting_node_id, + blocking.initiator_node_id AS blocking_node_id, + waiting.distributed_query_host_name AS waiting_node_name, + blocking.distributed_query_host_name AS blocking_node_name, + waiting.distributed_query_host_port AS waiting_node_port, + blocking.distributed_query_host_port AS blocking_node_port +FROM + unique_global_wait_edges +JOIN + citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id) +JOIN + citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id); + +ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; + +RESET search_path; diff --git a/src/backend/distributed/sql/udfs/citus_worker_stat_activity/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_worker_stat_activity/11.0-1.sql new file mode 100644 index 000000000..6f585b2e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_worker_stat_activity/11.0-1.sql @@ -0,0 +1,19 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_worker_stat_activity CASCADE; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$citus_worker_stat_activity$$; + +COMMENT ON FUNCTION pg_catalog.citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) +IS 'returns distributed transaction activity on shards of distributed tables'; diff --git a/src/backend/distributed/sql/udfs/citus_worker_stat_activity/latest.sql b/src/backend/distributed/sql/udfs/citus_worker_stat_activity/latest.sql new file mode 100644 index 000000000..6f585b2e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_worker_stat_activity/latest.sql @@ -0,0 +1,19 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_worker_stat_activity CASCADE; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$citus_worker_stat_activity$$; + +COMMENT ON FUNCTION pg_catalog.citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name, + OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET, + OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz, + OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text, + OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8) +IS 'returns distributed transaction activity on shards of distributed tables'; diff --git a/src/backend/distributed/sql/udfs/get_all_active_transactions/11.0-1.sql b/src/backend/distributed/sql/udfs/get_all_active_transactions/11.0-1.sql new file mode 100644 index 000000000..636abb0dd --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_all_active_transactions/11.0-1.sql @@ -0,0 +1,12 @@ +DROP FUNCTION IF EXISTS pg_catalog.get_all_active_transactions(); +CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, + OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, + OUT global_pid int8) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$get_all_active_transactions$$; + +COMMENT ON FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT datname text, OUT process_id int, OUT initiator_node_identifier int4, + OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, + OUT global_pid int8) +IS 'returns transaction information for all Citus initiated transactions'; diff --git a/src/backend/distributed/sql/udfs/get_all_active_transactions/latest.sql b/src/backend/distributed/sql/udfs/get_all_active_transactions/latest.sql new file mode 100644 index 000000000..636abb0dd --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_all_active_transactions/latest.sql @@ -0,0 +1,12 @@ +DROP FUNCTION IF EXISTS pg_catalog.get_all_active_transactions(); +CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, + OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, + OUT global_pid int8) +RETURNS SETOF RECORD +LANGUAGE C STRICT AS 'MODULE_PATHNAME', +$$get_all_active_transactions$$; + +COMMENT ON FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT datname text, OUT process_id int, OUT initiator_node_identifier int4, + OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, + OUT global_pid int8) +IS 'returns transaction information for all Citus initiated transactions'; diff --git a/src/backend/distributed/sql/udfs/get_global_active_transactions/11.0-1.sql b/src/backend/distributed/sql/udfs/get_global_active_transactions/11.0-1.sql new file mode 100644 index 000000000..c0831b521 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_global_active_transactions/11.0-1.sql @@ -0,0 +1,9 @@ +DROP FUNCTION IF EXISTS pg_catalog.get_global_active_transactions(); +CREATE OR REPLACE FUNCTION pg_catalog.get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8) + RETURNS SETOF RECORD + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$get_global_active_transactions$$; +COMMENT ON FUNCTION pg_catalog.get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8) + IS 'returns transaction information for all Citus initiated transactions from each node of the cluster'; diff --git a/src/backend/distributed/sql/udfs/get_global_active_transactions/latest.sql b/src/backend/distributed/sql/udfs/get_global_active_transactions/latest.sql new file mode 100644 index 000000000..c0831b521 --- /dev/null +++ b/src/backend/distributed/sql/udfs/get_global_active_transactions/latest.sql @@ -0,0 +1,9 @@ +DROP FUNCTION IF EXISTS pg_catalog.get_global_active_transactions(); +CREATE OR REPLACE FUNCTION pg_catalog.get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8) + RETURNS SETOF RECORD + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$get_global_active_transactions$$; +COMMENT ON FUNCTION pg_catalog.get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, + OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8) + IS 'returns transaction information for all Citus initiated transactions from each node of the cluster'; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index c76a80460..4e037e30c 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -15,6 +15,9 @@ #include "distributed/pg_version_constants.h" #include "miscadmin.h" +#include "unistd.h" + +#include "safe_lib.h" #include "funcapi.h" #include "access/htup_details.h" @@ -43,7 +46,7 @@ #define GET_ACTIVE_TRANSACTION_QUERY "SELECT * FROM get_all_active_transactions();" -#define ACTIVE_TRANSACTION_COLUMN_COUNT 6 +#define ACTIVE_TRANSACTION_COLUMN_COUNT 7 /* * Each backend's data reside in the shared memory @@ -78,6 +81,7 @@ typedef struct BackendManagementShmemData static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); +static uint64 GenerateGlobalPID(void); static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static BackendManagementShmemData *backendManagementShmemData = NULL; @@ -86,6 +90,7 @@ static BackendData *MyBackendData = NULL; static void BackendManagementShmemInit(void); static size_t BackendManagementShmemSize(void); +static void UnSetGlobalPID(void); PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); @@ -315,6 +320,7 @@ get_global_active_transactions(PG_FUNCTION_ARGS) values[3] = ParseBoolField(result, rowIndex, 3); values[4] = ParseIntField(result, rowIndex, 4); values[5] = ParseTimestampTzField(result, rowIndex, 5); + values[6] = ParseIntField(result, rowIndex, 6); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } @@ -384,8 +390,7 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto SpinLockAcquire(¤tBackend->mutex); - /* we're only interested in backends initiated by Citus */ - if (currentBackend->citusBackend.initiatorNodeIdentifier < 0) + if (currentBackend->globalPID == INVALID_CITUS_INTERNAL_BACKEND_GPID) { SpinLockRelease(¤tBackend->mutex); continue; @@ -427,6 +432,7 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto values[3] = !coordinatorOriginatedQuery; values[4] = UInt64GetDatum(transactionNumber); values[5] = TimestampTzGetDatum(transactionIdTimestamp); + values[6] = UInt64GetDatum(currentBackend->globalPID); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); @@ -631,6 +637,7 @@ InitializeBackendData(void) /* zero out the backend data */ UnSetDistributedTransactionId(); + UnSetGlobalPID(); UnlockBackendSharedMemory(); } @@ -664,6 +671,24 @@ UnSetDistributedTransactionId(void) } +/* + * UnSetGlobalPID resets the global pid for the current backend. + */ +static void +UnSetGlobalPID(void) +{ + /* backend does not exist if the extension is not created */ + if (MyBackendData) + { + SpinLockAcquire(&MyBackendData->mutex); + + MyBackendData->globalPID = 0; + + SpinLockRelease(&MyBackendData->mutex); + } +} + + /* * LockBackendSharedMemory is a simple wrapper around LWLockAcquire on the * shared memory lock. @@ -780,6 +805,109 @@ MarkCitusInitiatedCoordinatorBackend(void) } +/* + * AssignGlobalPID assigns a global process id for the current backend. + * If this is a Citus initiated backend, which means it is distributed part of a distributed + * query, then this function assigns the global pid extracted from the application name. + * If not, this function assigns a new generated global pid. + */ +void +AssignGlobalPID(void) +{ + uint64 globalPID = INVALID_CITUS_INTERNAL_BACKEND_GPID; + + if (!IsCitusInternalBackend()) + { + globalPID = GenerateGlobalPID(); + } + else + { + globalPID = ExtractGlobalPID(application_name); + } + + SpinLockAcquire(&MyBackendData->mutex); + MyBackendData->globalPID = globalPID; + SpinLockRelease(&MyBackendData->mutex); +} + + +/* + * GetGlobalPID returns the global process id of the current backend. + */ +uint64 +GetGlobalPID(void) +{ + uint64 globalPID = INVALID_CITUS_INTERNAL_BACKEND_GPID; + + if (MyBackendData) + { + SpinLockAcquire(&MyBackendData->mutex); + globalPID = MyBackendData->globalPID; + SpinLockRelease(&MyBackendData->mutex); + } + + return globalPID; +} + + +/* + * GenerateGlobalPID generates the global process id for the current backend. + */ +static uint64 +GenerateGlobalPID(void) +{ + /* + * We try to create a human readable global pid that consists of node id and process id. + * By multiplying node id with 10^10 and adding pid we generate a number where the smallest + * 10 digit represent the pid and the remaining digits are the node id. + * + * Both node id and pid are 32 bit. We use 10^10 to fit all possible pids. Some very large + * node ids might cause overflow. But even for the applications that scale around 50 nodes every + * day it'd take about 100K years. So we are not worried. + */ + return (((uint64) GetLocalNodeId()) * 10000000000) + getpid(); +} + + +/* + * ExtractGlobalPID extracts the global process id from the application name and returns it + * if the application name is not compatible with Citus' application names returns 0. + */ +uint64 +ExtractGlobalPID(char *applicationName) +{ + /* does application name exist */ + if (!applicationName) + { + return INVALID_CITUS_INTERNAL_BACKEND_GPID; + } + + /* we create our own copy of application name incase the original changes */ + char *applicationNameCopy = pstrdup(applicationName); + + uint64 prefixLength = strlen(CITUS_APPLICATION_NAME_PREFIX); + + /* does application name start with Citus's application name prefix */ + if (strncmp(applicationNameCopy, CITUS_APPLICATION_NAME_PREFIX, prefixLength) != 0) + { + return INVALID_CITUS_INTERNAL_BACKEND_GPID; + } + + /* are the remaining characters of the application name numbers */ + uint64 numberOfRemainingChars = strlen(applicationNameCopy) - prefixLength; + if (numberOfRemainingChars <= 0 || + !strisdigit_s(applicationNameCopy + prefixLength, numberOfRemainingChars)) + { + return INVALID_CITUS_INTERNAL_BACKEND_GPID; + } + + char *globalPIDString = &applicationNameCopy[prefixLength]; + uint64 globalPID = strtoul(globalPIDString, NULL, 10); + + return globalPID; +} + + /* * CurrentDistributedTransactionNumber returns the transaction number of the * current distributed transaction. The caller must make sure a distributed diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index d85959925..04014693e 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -108,7 +108,7 @@ * showing the initiator_node_id we expand it to initiator_node_host and * initiator_node_port. */ -#define CITUS_DIST_STAT_ACTIVITY_QUERY_COLS 23 +#define CITUS_DIST_STAT_ACTIVITY_QUERY_COLS 24 #define CITUS_DIST_STAT_ADDITIONAL_COLS 3 #define CITUS_DIST_STAT_ACTIVITY_COLS \ CITUS_DIST_STAT_ACTIVITY_QUERY_COLS + CITUS_DIST_STAT_ADDITIONAL_COLS @@ -147,11 +147,12 @@ SELECT \ pg_stat_activity.backend_xid, \ pg_stat_activity.backend_xmin, \ pg_stat_activity.query, \ - pg_stat_activity.backend_type \ + pg_stat_activity.backend_type, \ + dist_txs.global_pid \ FROM \ pg_stat_activity \ INNER JOIN \ - get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp) \ + get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp, global_pid) \ ON pg_stat_activity.pid = dist_txs.process_id \ WHERE \ dist_txs.worker_query = false;" @@ -181,14 +182,15 @@ SELECT \ pg_stat_activity.backend_xid, \ pg_stat_activity.backend_xmin, \ pg_stat_activity.query, \ - pg_stat_activity.backend_type \ + pg_stat_activity.backend_type, \ + dist_txs.global_id \ FROM \ pg_stat_activity \ LEFT JOIN \ - get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp) \ + get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp, global_id) \ ON pg_stat_activity.pid = dist_txs.process_id \ WHERE \ - pg_stat_activity.application_name = 'citus_internal' \ + pg_stat_activity.application_name SIMILAR TO 'citus_internal gpid=\\d+' \ AND \ pg_stat_activity.query NOT ILIKE '%stat_activity%';" @@ -223,6 +225,7 @@ typedef struct CitusDistStat TransactionId backend_xmin; text *query; text *backend_type; + uint64 global_pid; } CitusDistStat; @@ -501,6 +504,7 @@ ParseCitusDistStat(PGresult *result, int64 rowIndex) citusDistStat->backend_xmin = ParseXIDField(result, rowIndex, 20); citusDistStat->query = ParseTextField(result, rowIndex, 21); citusDistStat->backend_type = ParseTextField(result, rowIndex, 22); + citusDistStat->global_pid = ParseIntField(result, rowIndex, 23); return citusDistStat; } @@ -688,6 +692,7 @@ HeapTupleToCitusDistStat(HeapTuple result, TupleDesc rowDescriptor) citusDistStat->backend_xmin = ParseXIDFieldFromHeapTuple(result, rowDescriptor, 21); citusDistStat->query = ParseTextFieldFromHeapTuple(result, rowDescriptor, 22); citusDistStat->backend_type = ParseTextFieldFromHeapTuple(result, rowDescriptor, 23); + citusDistStat->global_pid = ParseIntFieldFromHeapTuple(result, rowDescriptor, 24); return citusDistStat; } @@ -1098,6 +1103,8 @@ ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo) nulls[25] = true; } + values[26] = Int32GetDatum(citusDistStat->global_pid); + tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); } } diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index c68bfcf16..bf9a57fd1 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -50,6 +50,7 @@ typedef struct BackendData Oid userId; slock_t mutex; bool cancelledDueToDeadlock; + uint64 globalPID; CitusInitiatedBackend citusBackend; DistributedTransactionId transactionId; } BackendData; @@ -63,6 +64,9 @@ extern void UnlockBackendSharedMemory(void); extern void UnSetDistributedTransactionId(void); extern void AssignDistributedTransactionId(void); extern void MarkCitusInitiatedCoordinatorBackend(void); +extern void AssignGlobalPID(void); +extern uint64 GetGlobalPID(void); +extern uint64 ExtractGlobalPID(char *applicationName); extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern bool MyBackendGotCancelledDueToDeadlock(bool clearState); @@ -73,4 +77,6 @@ extern int GetAllActiveClientBackendCount(void); extern void IncrementClientBackendCounter(void); extern void DecrementClientBackendCounter(void); +#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0 + #endif /* BACKEND_DATA_H */ diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 721617474..ad575cfe5 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -29,7 +29,7 @@ #define ERROR_BUFFER_SIZE 256 /* application name used for internal connections in Citus */ -#define CITUS_APPLICATION_NAME "citus_internal" +#define CITUS_APPLICATION_NAME_PREFIX "citus_internal gpid=" /* application name used for internal connections in rebalancer */ #define CITUS_REBALANCER_NAME "citus_rebalancer" diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 6dac101af..46ba72a49 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -165,6 +165,7 @@ extern CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId); extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid); extern int32 GetLocalGroupId(void); +extern int32 GetLocalNodeId(void); extern void CitusTableCacheFlushInvalidatedEntries(void); extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok); extern List * ShardPlacementListIncludingOrphanedPlacements(uint64 shardId); diff --git a/src/test/regress/expected/failure_connection_establishment.out b/src/test/regress/expected/failure_connection_establishment.out index 6284107d2..9c44269a3 100644 --- a/src/test/regress/expected/failure_connection_establishment.out +++ b/src/test/regress/expected/failure_connection_establishment.out @@ -225,8 +225,8 @@ SELECT count(*) FROM single_replicatated WHERE key = 100; RESET client_min_messages; -- verify get_global_active_transactions works when a timeout happens on a connection -SELECT get_global_active_transactions(); - get_global_active_transactions +SELECT * FROM get_global_active_transactions() WHERE transaction_number != 0; + datid | process_id | initiator_node_identifier | worker_query | transaction_number | transaction_stamp | global_pid --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/isolation_distributed_transaction_id.out b/src/test/regress/expected/isolation_distributed_transaction_id.out index 1e41f8957..3d44f0069 100644 --- a/src/test/regress/expected/isolation_distributed_transaction_id.out +++ b/src/test/regress/expected/isolation_distributed_transaction_id.out @@ -94,7 +94,8 @@ step s1-verify-current-xact-is-on-worker: get_current_transaction_id() as xact, run_command_on_workers($$ SELECT row(initiator_node_identifier, transaction_number) - FROM get_all_active_transactions(); + FROM get_all_active_transactions() + WHERE transaction_number != 0; $$) as remote ORDER BY remote.nodeport ASC; diff --git a/src/test/regress/expected/isolation_get_all_active_transactions.out b/src/test/regress/expected/isolation_get_all_active_transactions.out index 9d94470ec..d5c4765b8 100644 --- a/src/test/regress/expected/isolation_get_all_active_transactions.out +++ b/src/test/regress/expected/isolation_get_all_active_transactions.out @@ -35,8 +35,8 @@ step s2-begin-insert: step s3-as-admin: -- Admin should be able to see all transactions - SELECT count(*) FROM get_all_active_transactions(); - SELECT count(*) FROM get_global_active_transactions(); + SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; + SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; count --------------------------------------------------------------------- @@ -51,8 +51,8 @@ count step s3-as-user-1: -- User should only be able to see its own transactions SET ROLE test_user_1; - SELECT count(*) FROM get_all_active_transactions(); - SELECT count(*) FROM get_global_active_transactions(); + SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; + SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; count --------------------------------------------------------------------- @@ -67,8 +67,8 @@ count step s3-as-readonly: -- Other user should not see transactions SET ROLE test_readonly; - SELECT count(*) FROM get_all_active_transactions(); - SELECT count(*) FROM get_global_active_transactions(); + SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; + SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; count --------------------------------------------------------------------- @@ -83,8 +83,8 @@ count step s3-as-monitor: -- Monitor should see all transactions SET ROLE test_monitor; - SELECT count(*) FROM get_all_active_transactions(); - SELECT count(*) FROM get_global_active_transactions(); + SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; + SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_global_pid.out b/src/test/regress/expected/isolation_global_pid.out new file mode 100644 index 000000000..19e055079 --- /dev/null +++ b/src/test/regress/expected/isolation_global_pid.out @@ -0,0 +1,145 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-start-session-level-connection s1-worker-begin s1-worker-select s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s1-worker-commit s1-stop-session-level-connection +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-worker-begin: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-worker-select: + SELECT run_commands_on_session_level_connection_to_node('SET citus.enable_local_execution TO off; SET citus.force_max_query_parallelization TO ON; SELECT * FROM dist_table'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s2-coordinator-citus_dist_stat_activity: + SELECT global_pid != 0 FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'; + +?column? +--------------------------------------------------------------------- +t +(1 row) + +step s2-coordinator-citus_worker_stat_activity: + SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN ( + SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + ) + ORDER BY 1; + +query +--------------------------------------------------------------------- +SELECT a, b FROM public.dist_table_12345000 dist_table WHERE true +SELECT a, b FROM public.dist_table_12345001 dist_table WHERE true +SELECT a, b FROM public.dist_table_12345002 dist_table WHERE true +SELECT a, b FROM public.dist_table_12345003 dist_table WHERE true +(4 rows) + +step s1-worker-commit: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +step s1-stop-session-level-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node +--------------------------------------------------------------------- + +(1 row) + +restore_isolation_tester_func +--------------------------------------------------------------------- + +(1 row) + + +starting permutation: s1-coordinator-begin s1-coordinator-select s2-coordinator-citus_dist_stat_activity s2-coordinator-citus_worker_stat_activity s2-coordinator-get_all_active_transactions s2-coordinator-get_global_active_transactions s1-coordinator-commit +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-coordinator-begin: + BEGIN; + +step s1-coordinator-select: + SET citus.enable_local_execution TO off; + SET citus.force_max_query_parallelization TO ON; + SELECT * FROM dist_table; + +a|b +--------------------------------------------------------------------- +(0 rows) + +step s2-coordinator-citus_dist_stat_activity: + SELECT global_pid != 0 FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'; + +?column? +--------------------------------------------------------------------- +t +(1 row) + +step s2-coordinator-citus_worker_stat_activity: + SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN ( + SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + ) + ORDER BY 1; + +query +--------------------------------------------------------------------- +SELECT a, b FROM public.dist_table_12345000 dist_table WHERE true +SELECT a, b FROM public.dist_table_12345001 dist_table WHERE true +SELECT a, b FROM public.dist_table_12345002 dist_table WHERE true +SELECT a, b FROM public.dist_table_12345003 dist_table WHERE true +(4 rows) + +step s2-coordinator-get_all_active_transactions: + SELECT count(*) FROM get_all_active_transactions() WHERE global_pid IN ( + SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + ); + +count +--------------------------------------------------------------------- + 1 +(1 row) + +step s2-coordinator-get_global_active_transactions: + SELECT count(*) FROM get_global_active_transactions() WHERE global_pid IN ( + SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + ) + AND transaction_number != 0; + +count +--------------------------------------------------------------------- + 5 +(1 row) + +step s1-coordinator-commit: + COMMIT; + +restore_isolation_tester_func +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out index d28a6b714..04e427ce6 100644 --- a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out @@ -136,8 +136,8 @@ step s1-update-ref-table: step s2-active-transactions: -- Admin should be able to see all transactions - SELECT count(*) FROM get_all_active_transactions(); - SELECT count(*) FROM get_global_active_transactions(); + SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; + SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/metadata_sync_helpers.out b/src/test/regress/expected/metadata_sync_helpers.out index cb3b113e2..26d300e84 100644 --- a/src/test/regress/expected/metadata_sync_helpers.out +++ b/src/test/regress/expected/metadata_sync_helpers.out @@ -36,7 +36,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's'); ERROR: This is an internal Citus function can only be used in a distributed transaction ROLLBACK; @@ -73,7 +73,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's'); ERROR: must be owner of table test ROLLBACK; @@ -85,7 +85,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_update_relation_colocation ('test'::regclass, 10); ERROR: must be owner of table test ROLLBACK; @@ -99,7 +99,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); citus_internal_add_partition_metadata --------------------------------------------------------------------- @@ -112,6 +112,54 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; 1 (1 row) +ROLLBACK; +-- application_name with incorrect gpid +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus_internal gpid=not a correct gpid'; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); +ERROR: This is an internal Citus function can only be used in a distributed transaction +ROLLBACK; +-- application_name with empty gpid +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus_internal gpid='; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); +ERROR: This is an internal Citus function can only be used in a distributed transaction +ROLLBACK; +-- empty application_name +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to ''; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); +ERROR: This is an internal Citus function can only be used in a distributed transaction +ROLLBACK; +-- application_name with incorrect prefix +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus gpid=10000000001'; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); +ERROR: This is an internal Citus function can only be used in a distributed transaction ROLLBACK; -- fails because there is no X distribution method BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; @@ -121,7 +169,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's'); ERROR: Metadata syncing is only allowed for hash, reference and local tables:X ROLLBACK; @@ -133,7 +181,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'non_existing_col', 0, 's'); ERROR: column "non_existing_col" of relation "test_2" does not exist ROLLBACK; @@ -145,7 +193,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata (NULL, 'h', 'non_existing_col', 0, 's'); ERROR: relation cannot be NULL ROLLBACK; @@ -157,7 +205,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', -1, 's'); ERROR: Metadata syncing is only allowed for valid colocation id values. ROLLBACK; @@ -169,7 +217,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 'X'); ERROR: Metadata syncing is only allowed for hash, reference and local tables:X ROLLBACK; @@ -181,7 +229,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); citus_internal_add_partition_metadata @@ -200,7 +248,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); citus_internal_add_partition_metadata @@ -219,7 +267,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', NULL, 0, 's'); ERROR: Distribution column cannot be NULL for relation "test_2" @@ -252,7 +300,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's'); citus_internal_add_partition_metadata --------------------------------------------------------------------- @@ -268,7 +316,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(1420007, 10000, 11111); ERROR: could not find valid entry for shard xxxxx @@ -298,7 +346,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's'); ERROR: role "non_existing_user" does not exist ROLLBACK; @@ -329,7 +377,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', 'col_1', 0, 's'); ERROR: Reference or local tables cannot have distribution columns ROLLBACK; @@ -341,7 +389,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'A'); ERROR: Metadata syncing is only allowed for known replication models. ROLLBACK; @@ -353,7 +401,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'c'); ERROR: Local or references tables can only have 's' or 't' as the replication model. ROLLBACK; @@ -368,7 +416,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('super_user_table'::regclass, 'h', 'col_1', 0, 's'); citus_internal_add_partition_metadata --------------------------------------------------------------------- @@ -387,7 +435,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('super_user_table'::regclass, 1420000::bigint, 't'::"char", '-2147483648'::text, '-1610612737'::text)) @@ -402,7 +450,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '-2147483648'::text, '-1610612737'::text)) @@ -417,7 +465,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 250, 's'); citus_internal_add_partition_metadata --------------------------------------------------------------------- @@ -445,7 +493,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232); citus_internal_update_relation_colocation --------------------------------------------------------------------- @@ -461,7 +509,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, -1, 't'::"char", '-2147483648'::text, '-1610612737'::text)) @@ -476,7 +524,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000, 'X'::"char", '-2147483648'::text, '-1610612737'::text)) @@ -491,7 +539,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000, 't'::"char", NULL, '-1610612737'::text)) @@ -506,7 +554,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", 'non-int'::text, '-1610612737'::text)) @@ -521,7 +569,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '-1610612737'::text, '-2147483648'::text)) @@ -536,7 +584,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '10'::text, '20'::text), @@ -554,7 +602,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('non_existing_type', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false)) @@ -569,7 +617,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -100, 0, false)) @@ -583,7 +631,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -1, -1, false)) @@ -598,7 +646,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false)) @@ -614,7 +662,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], 0, NULL::int, false)) @@ -635,7 +683,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse CREATE TABLE publication_test_table(id int); CREATE PUBLICATION publication_test FOR TABLE publication_test_table; @@ -653,7 +701,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse CREATE FUNCTION distribution_test_function(int) RETURNS int AS $$ SELECT $1 $$ @@ -671,7 +719,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse CREATE TYPE distributed_test_type AS (a int, b int); SET ROLE metadata_sync_helper_role; @@ -690,7 +738,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse UPDATE pg_dist_partition SET partmethod = 'X'; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) @@ -709,7 +757,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '10'::text, '20'::text)) @@ -736,7 +784,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '11'::text, '20'::text), @@ -767,7 +815,7 @@ BEGIN; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); ERROR: cannot colocate tables test_2 and test_3 ROLLBACK; @@ -779,7 +827,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_3'::regclass, 1420009::bigint, 't'::"char", '21'::text, '30'::text), @@ -806,7 +854,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_ref'::regclass, 1420003::bigint, 't'::"char", '-1610612737'::text, NULL)) @@ -821,7 +869,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_ref'::regclass, 1420006::bigint, 't'::"char", NULL, NULL), @@ -837,7 +885,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_ref'::regclass, 1420006::bigint, 't'::"char", NULL, NULL)) @@ -858,7 +906,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('super_user_table'::regclass, 1420007::bigint, 't'::"char", '11'::text, '20'::text)) @@ -880,7 +928,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (-10, 1, 0::bigint, 1::int, 1500000::bigint)) @@ -895,7 +943,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1420000, 1, 0::bigint, 1::int, -10)) @@ -910,7 +958,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1430100, 1, 0::bigint, 1::int, 10)) @@ -925,7 +973,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1420000, 10, 0::bigint, 1::int, 1500000)) @@ -940,7 +988,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES ( 1420000, 1, 0::bigint, 123123123::int, 1500000)) @@ -968,7 +1016,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1420000, 1, 0::bigint, get_node_id(), 1500000), @@ -984,7 +1032,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1420007, 1, 0::bigint, get_node_id(), 1500000)) @@ -999,7 +1047,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1420000, 1, 0::bigint, get_node_id(), 1500000), @@ -1040,7 +1088,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); citus_internal_update_relation_colocation --------------------------------------------------------------------- @@ -1057,7 +1105,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(1420000, get_node_id(), get_node_id()+1000); ERROR: Node with group id 1014 for shard placement xxxxx does not exist @@ -1070,7 +1118,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(1420000, get_node_id()+10000, get_node_id()); ERROR: Active placement for shard xxxxx is not found on group:14 @@ -1083,7 +1131,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(0, get_node_id(), get_node_id()+1); ERROR: Shard id does not exists: 0 @@ -1096,7 +1144,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(213123123123, get_node_id(), get_node_id()+1); ERROR: Shard id does not exists: 213123123123 @@ -1109,7 +1157,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(1420007, get_node_id(), get_node_id()+1); ERROR: must be owner of table super_user_table @@ -1122,7 +1170,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(shardid) AS (VALUES (1420007)) @@ -1131,7 +1179,7 @@ ERROR: must be owner of table super_user_table ROLLBACK; -- the user only allowed to delete shards in a distributed transaction BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(shardid) AS (VALUES (1420007)) @@ -1146,7 +1194,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(shardid) AS (VALUES (1420100)) @@ -1173,7 +1221,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(shardid) AS (VALUES (1420000)) @@ -1207,7 +1255,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; -- with an ugly trick, update the repmodel -- so that making two tables colocated fails UPDATE pg_dist_partition SET repmodel = 't' @@ -1222,7 +1270,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; -- with an ugly trick, update the vartype of table from int to bigint -- so that making two tables colocated fails UPDATE pg_dist_partition SET partkey = '{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}' @@ -1237,7 +1285,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; -- with an ugly trick, update the partmethod of the table to not-valid -- so that making two tables colocated fails UPDATE pg_dist_partition SET partmethod = '' @@ -1252,7 +1300,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; -- with an ugly trick, update the partmethod of the table to not-valid -- so that making two tables colocated fails UPDATE pg_dist_partition SET partmethod = 'a' @@ -1270,7 +1318,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_add_partition_metadata ('test_5'::regclass, 'h', 'int_col', 500, 's'); citus_internal_add_partition_metadata @@ -1293,7 +1341,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_add_partition_metadata ('test_7'::regclass, 'h', 'text_col', 500, 's'); citus_internal_add_partition_metadata diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index f0700c734..5df2e9a6d 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -63,6 +63,7 @@ test: shared_connection_waits test: isolation_cancellation test: isolation_undistribute_table test: isolation_fix_partition_shard_index_names +test: isolation_global_pid # Rebalancer test: isolation_blocking_move_single_shard_commands diff --git a/src/test/regress/spec/isolation_distributed_transaction_id.spec b/src/test/regress/spec/isolation_distributed_transaction_id.spec index 372cd8f78..f928918ed 100644 --- a/src/test/regress/spec/isolation_distributed_transaction_id.spec +++ b/src/test/regress/spec/isolation_distributed_transaction_id.spec @@ -54,7 +54,8 @@ step "s1-verify-current-xact-is-on-worker" get_current_transaction_id() as xact, run_command_on_workers($$ SELECT row(initiator_node_identifier, transaction_number) - FROM get_all_active_transactions(); + FROM get_all_active_transactions() + WHERE transaction_number != 0; $$) as remote ORDER BY remote.nodeport ASC; } diff --git a/src/test/regress/spec/isolation_get_all_active_transactions.spec b/src/test/regress/spec/isolation_get_all_active_transactions.spec index da0c4553c..fd69c0ac4 100644 --- a/src/test/regress/spec/isolation_get_all_active_transactions.spec +++ b/src/test/regress/spec/isolation_get_all_active_transactions.spec @@ -71,32 +71,32 @@ session "s3" step "s3-as-admin" { -- Admin should be able to see all transactions - SELECT count(*) FROM get_all_active_transactions(); - SELECT count(*) FROM get_global_active_transactions(); + SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; + SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; } step "s3-as-user-1" { -- User should only be able to see its own transactions SET ROLE test_user_1; - SELECT count(*) FROM get_all_active_transactions(); - SELECT count(*) FROM get_global_active_transactions(); + SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; + SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; } step "s3-as-readonly" { -- Other user should not see transactions SET ROLE test_readonly; - SELECT count(*) FROM get_all_active_transactions(); - SELECT count(*) FROM get_global_active_transactions(); + SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; + SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; } step "s3-as-monitor" { -- Monitor should see all transactions SET ROLE test_monitor; - SELECT count(*) FROM get_all_active_transactions(); - SELECT count(*) FROM get_global_active_transactions(); + SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; + SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; } permutation "s1-grant" "s1-begin-insert" "s2-begin-insert" "s3-as-admin" "s3-as-user-1" "s3-as-readonly" "s3-as-monitor" "s1-commit" "s2-commit" diff --git a/src/test/regress/spec/isolation_global_pid.spec b/src/test/regress/spec/isolation_global_pid.spec new file mode 100644 index 000000000..4306d2640 --- /dev/null +++ b/src/test/regress/spec/isolation_global_pid.spec @@ -0,0 +1,96 @@ +#include "isolation_mx_common.include.spec" + +setup +{ + SET citus.next_shard_id TO 12345000; + CREATE TABLE dist_table (a INT, b INT); + SELECT create_distributed_table('dist_table', 'a', shard_count:=4); +} + +teardown +{ + DROP TABLE dist_table; + SELECT citus_internal.restore_isolation_tester_func(); +} + +session "s1" + +step "s1-coordinator-begin" +{ + BEGIN; +} + +step "s1-coordinator-select" +{ + SET citus.enable_local_execution TO off; + SET citus.force_max_query_parallelization TO ON; + SELECT * FROM dist_table; +} + +step "s1-coordinator-commit" +{ + COMMIT; +} + +step "s1-start-session-level-connection" +{ + + SELECT start_session_level_connection_to_node('localhost', 57637); +} + +step "s1-worker-begin" +{ + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); +} + +step "s1-worker-select" +{ + SELECT run_commands_on_session_level_connection_to_node('SET citus.enable_local_execution TO off; SET citus.force_max_query_parallelization TO ON; SELECT * FROM dist_table'); +} + +step "s1-worker-commit" +{ + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); +} + +step "s1-stop-session-level-connection" +{ + SELECT stop_session_level_connection_to_node(); +} + +session "s2" + +step "s2-coordinator-citus_dist_stat_activity" +{ + SELECT global_pid != 0 FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'; +} + +step "s2-coordinator-citus_worker_stat_activity" +{ + SELECT query FROM citus_worker_stat_activity() WHERE global_pid IN ( + SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + ) + ORDER BY 1; +} + +step "s2-coordinator-get_all_active_transactions" +{ + SELECT count(*) FROM get_all_active_transactions() WHERE global_pid IN ( + SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + ); +} + +step "s2-coordinator-get_global_active_transactions" +{ + SELECT count(*) FROM get_global_active_transactions() WHERE global_pid IN ( + SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%' + ) + AND transaction_number != 0; +} + + +// worker - coordinator +permutation "s1-start-session-level-connection" "s1-worker-begin" "s1-worker-select" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s1-worker-commit" "s1-stop-session-level-connection" + +// coordinator - coordinator +permutation "s1-coordinator-begin" "s1-coordinator-select" "s2-coordinator-citus_dist_stat_activity" "s2-coordinator-citus_worker_stat_activity" "s2-coordinator-get_all_active_transactions" "s2-coordinator-get_global_active_transactions" "s1-coordinator-commit" diff --git a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec index 752f26399..12f11bef0 100644 --- a/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec +++ b/src/test/regress/spec/isolation_replicate_reference_tables_to_coordinator.spec @@ -106,8 +106,8 @@ step "s2-sleep" step "s2-active-transactions" { -- Admin should be able to see all transactions - SELECT count(*) FROM get_all_active_transactions(); - SELECT count(*) FROM get_global_active_transactions(); + SELECT count(*) FROM get_all_active_transactions() WHERE transaction_number != 0; + SELECT count(*) FROM get_global_active_transactions() WHERE transaction_number != 0; } // we disable the daemon during the regression tests in order to get consistent results diff --git a/src/test/regress/sql/failure_connection_establishment.sql b/src/test/regress/sql/failure_connection_establishment.sql index 43cb97b86..5029d40b7 100644 --- a/src/test/regress/sql/failure_connection_establishment.sql +++ b/src/test/regress/sql/failure_connection_establishment.sql @@ -124,7 +124,7 @@ SELECT count(*) FROM single_replicatated WHERE key = 100; RESET client_min_messages; -- verify get_global_active_transactions works when a timeout happens on a connection -SELECT get_global_active_transactions(); +SELECT * FROM get_global_active_transactions() WHERE transaction_number != 0; -- tests for connectivity checks SET client_min_messages TO ERROR; diff --git a/src/test/regress/sql/metadata_sync_helpers.sql b/src/test/regress/sql/metadata_sync_helpers.sql index 22e337443..8e7f13ce5 100644 --- a/src/test/regress/sql/metadata_sync_helpers.sql +++ b/src/test/regress/sql/metadata_sync_helpers.sql @@ -28,7 +28,7 @@ ROLLBACK; -- but we are on the coordinator, so still not allowed BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's'); ROLLBACK; @@ -67,14 +67,14 @@ SET search_path TO metadata_sync_helpers; -- owner of the table test BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's'); ROLLBACK; -- we do not own the relation BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_update_relation_colocation ('test'::regclass, 10); ROLLBACK; @@ -83,50 +83,78 @@ CREATE TABLE test_2(col_1 int, col_2 int); CREATE TABLE test_3(col_1 int, col_2 int); BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); SELECT count(*) FROM pg_dist_partition WHERE logicalrelid = 'metadata_sync_helpers.test_2'::regclass; ROLLBACK; +-- application_name with incorrect gpid +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus_internal gpid=not a correct gpid'; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); +ROLLBACK; + +-- application_name with empty gpid +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus_internal gpid='; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); +ROLLBACK; + +-- empty application_name +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to ''; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); +ROLLBACK; + +-- application_name with incorrect prefix +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus gpid=10000000001'; + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); +ROLLBACK; + -- fails because there is no X distribution method BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's'); ROLLBACK; -- fails because there is the column does not exist BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'non_existing_col', 0, 's'); ROLLBACK; --- fails because we do not allow NULL parameters BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata (NULL, 'h', 'non_existing_col', 0, 's'); ROLLBACK; -- fails because colocationId cannot be negative BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', -1, 's'); ROLLBACK; -- fails because there is no X replication model BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 'X'); ROLLBACK; -- the same table cannot be added twice, that is enforced by a primary key BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); @@ -135,7 +163,7 @@ ROLLBACK; -- the same table cannot be added twice, that is enforced by a primary key even if distribution key changes BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_2', 0, 's'); @@ -144,7 +172,7 @@ ROLLBACK; -- hash distributed table cannot have NULL distribution key BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', NULL, 0, 's'); ROLLBACK; @@ -165,14 +193,14 @@ SET search_path TO metadata_sync_helpers; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's'); ROLLBACK; -- should throw error even if we skip the checks, there are no such nodes BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(1420007, 10000, 11111); ROLLBACK; @@ -189,7 +217,7 @@ SET search_path TO metadata_sync_helpers; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's'); ROLLBACK; @@ -207,21 +235,21 @@ SET search_path TO metadata_sync_helpers; CREATE TABLE test_ref(col_1 int, col_2 int); BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', 'col_1', 0, 's'); ROLLBACK; -- non-valid replication model BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'A'); ROLLBACK; -- not-matching replication model for reference table BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'c'); ROLLBACK; @@ -231,7 +259,7 @@ SET search_path TO metadata_sync_helpers; CREATE TABLE super_user_table(col_1 int); BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('super_user_table'::regclass, 'h', 'col_1', 0, 's'); COMMIT; @@ -244,7 +272,7 @@ SET search_path TO metadata_sync_helpers; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('super_user_table'::regclass, 1420000::bigint, 't'::"char", '-2147483648'::text, '-1610612737'::text)) @@ -254,7 +282,7 @@ ROLLBACK; -- the user is only allowed to add a shard for add a table which is in pg_dist_partition BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '-2147483648'::text, '-1610612737'::text)) @@ -264,7 +292,7 @@ ROLLBACK; -- ok, now add the table to the pg_dist_partition BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 250, 's'); SELECT citus_internal_add_partition_metadata ('test_3'::regclass, 'h', 'col_1', 251, 's'); SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 't'); @@ -273,14 +301,14 @@ COMMIT; -- we can update to a non-existing colocation group (e.g., colocate_with:=none) BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232); ROLLBACK; -- invalid shard ids are not allowed BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, -1, 't'::"char", '-2147483648'::text, '-1610612737'::text)) @@ -290,7 +318,7 @@ ROLLBACK; -- invalid storage types are not allowed BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000, 'X'::"char", '-2147483648'::text, '-1610612737'::text)) @@ -300,7 +328,7 @@ ROLLBACK; -- NULL shard ranges are not allowed for hash distributed tables BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000, 't'::"char", NULL, '-1610612737'::text)) @@ -310,7 +338,7 @@ ROLLBACK; -- non-integer shard ranges are not allowed BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", 'non-int'::text, '-1610612737'::text)) @@ -320,7 +348,7 @@ ROLLBACK; -- shardMinValue should be smaller than shardMaxValue BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '-1610612737'::text, '-2147483648'::text)) @@ -330,7 +358,7 @@ ROLLBACK; -- we do not allow overlapping shards for the same table BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '10'::text, '20'::text), @@ -344,7 +372,7 @@ ROLLBACK; -- check with non-existing object type BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('non_existing_type', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false)) @@ -354,7 +382,7 @@ ROLLBACK; -- check the sanity of distributionArgumentIndex and colocationId BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -100, 0, false)) @@ -363,7 +391,7 @@ ROLLBACK; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -1, -1, false)) @@ -373,7 +401,7 @@ ROLLBACK; -- check with non-existing object BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false)) @@ -384,7 +412,7 @@ ROLLBACK; -- if any parameter is NULL BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], 0, NULL::int, false)) @@ -397,7 +425,7 @@ ROLLBACK; -- which is known how to distribute BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse CREATE TABLE publication_test_table(id int); @@ -412,7 +440,7 @@ ROLLBACK; -- Show that citus_internal_add_object_metadata checks the priviliges BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse CREATE FUNCTION distribution_test_function(int) RETURNS int @@ -427,7 +455,7 @@ ROLLBACK; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse CREATE TYPE distributed_test_type AS (a int, b int); @@ -443,7 +471,7 @@ ROLLBACK; SET search_path TO metadata_sync_helpers; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse UPDATE pg_dist_partition SET partmethod = 'X'; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) @@ -457,7 +485,7 @@ ROLLBACK; SET search_path TO metadata_sync_helpers; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '10'::text, '20'::text)) @@ -475,7 +503,7 @@ SET search_path TO metadata_sync_helpers; -- now, add few shards BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '11'::text, '20'::text), @@ -491,14 +519,14 @@ COMMIT; -- we cannot mark these two tables colocated because they are not colocated BEGIN; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); ROLLBACK; -- now, add few more shards for test_3 to make it colocated with test_2 BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_3'::regclass, 1420009::bigint, 't'::"char", '21'::text, '30'::text), @@ -512,7 +540,7 @@ COMMIT; -- shardMin/MaxValues should be NULL for reference tables BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_ref'::regclass, 1420003::bigint, 't'::"char", '-1610612737'::text, NULL)) @@ -522,7 +550,7 @@ ROLLBACK; -- reference tables cannot have multiple shards BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_ref'::regclass, 1420006::bigint, 't'::"char", NULL, NULL), @@ -533,7 +561,7 @@ ROLLBACK; -- finally, add a shard for reference tables BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('test_ref'::regclass, 1420006::bigint, 't'::"char", NULL, NULL)) @@ -546,7 +574,7 @@ SET search_path TO metadata_sync_helpers; -- and a shard for the superuser table BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('super_user_table'::regclass, 1420007::bigint, 't'::"char", '11'::text, '20'::text)) @@ -561,7 +589,7 @@ SET search_path TO metadata_sync_helpers; -- shard does not exist BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (-10, 1, 0::bigint, 1::int, 1500000::bigint)) @@ -571,7 +599,7 @@ ROLLBACK; -- invalid placementid BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1420000, 1, 0::bigint, 1::int, -10)) @@ -581,7 +609,7 @@ ROLLBACK; -- non-existing shard BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1430100, 1, 0::bigint, 1::int, 10)) @@ -591,7 +619,7 @@ ROLLBACK; -- invalid shard state BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1420000, 10, 0::bigint, 1::int, 1500000)) @@ -601,7 +629,7 @@ ROLLBACK; -- non-existing node with non-existing node-id 123123123 BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES ( 1420000, 1, 0::bigint, 123123123::int, 1500000)) @@ -625,7 +653,7 @@ END; $$ language plpgsql; -- fails because we ingest more placements for the same shards to the same worker node BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1420000, 1, 0::bigint, get_node_id(), 1500000), @@ -636,7 +664,7 @@ ROLLBACK; -- shard is not owned by us BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1420007, 1, 0::bigint, get_node_id(), 1500000)) @@ -646,7 +674,7 @@ ROLLBACK; -- sucessfully add placements BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1420000, 1, 0::bigint, get_node_id(), 1500000), @@ -667,7 +695,7 @@ COMMIT; -- we should be able to colocate both tables now BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); ROLLBACK; @@ -676,7 +704,7 @@ ROLLBACK; -- fails because we are trying to update it to non-existing node BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(1420000, get_node_id(), get_node_id()+1000); COMMIT; @@ -684,7 +712,7 @@ COMMIT; -- fails because the source node doesn't contain the shard BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(1420000, get_node_id()+10000, get_node_id()); COMMIT; @@ -692,7 +720,7 @@ COMMIT; -- fails because shard does not exist BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(0, get_node_id(), get_node_id()+1); COMMIT; @@ -700,7 +728,7 @@ COMMIT; -- fails because none-existing shard BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(213123123123, get_node_id(), get_node_id()+1); COMMIT; @@ -708,7 +736,7 @@ COMMIT; -- fails because we do not own the shard BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_update_placement_metadata(1420007, get_node_id(), get_node_id()+1); COMMIT; @@ -716,7 +744,7 @@ COMMIT; -- the user only allowed to delete their own shards BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(shardid) AS (VALUES (1420007)) @@ -725,7 +753,7 @@ ROLLBACK; -- the user only allowed to delete shards in a distributed transaction BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(shardid) AS (VALUES (1420007)) @@ -735,7 +763,7 @@ ROLLBACK; -- the user cannot delete non-existing shards BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(shardid) AS (VALUES (1420100)) @@ -750,7 +778,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse WITH shard_data(shardid) AS (VALUES (1420000)) @@ -767,7 +795,7 @@ ROLLBACK; SET search_path TO metadata_sync_helpers; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; -- with an ugly trick, update the repmodel -- so that making two tables colocated fails UPDATE pg_dist_partition SET repmodel = 't' @@ -778,7 +806,7 @@ ROLLBACK; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; -- with an ugly trick, update the vartype of table from int to bigint -- so that making two tables colocated fails UPDATE pg_dist_partition SET partkey = '{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}' @@ -788,7 +816,7 @@ ROLLBACK; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; -- with an ugly trick, update the partmethod of the table to not-valid -- so that making two tables colocated fails UPDATE pg_dist_partition SET partmethod = '' @@ -798,7 +826,7 @@ ROLLBACK; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; -- with an ugly trick, update the partmethod of the table to not-valid -- so that making two tables colocated fails UPDATE pg_dist_partition SET partmethod = 'a' @@ -812,7 +840,7 @@ CREATE TABLE test_6(int_col int, text_col text); BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_add_partition_metadata ('test_5'::regclass, 'h', 'int_col', 500, 's'); SELECT citus_internal_add_partition_metadata ('test_6'::regclass, 'h', 'text_col', 500, 's'); @@ -828,7 +856,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; CREATE TABLE test_8(int_col int, text_col text COLLATE "caseinsensitive"); SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); - SET application_name to 'citus_internal'; + SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse SELECT citus_internal_add_partition_metadata ('test_7'::regclass, 'h', 'text_col', 500, 's'); SELECT citus_internal_add_partition_metadata ('test_8'::regclass, 'h', 'text_col', 500, 's');