Introduce global PID

global_pid_guc
Halil Ozan Akgul 2022-01-06 12:31:00 +03:00
parent dcb9c71f19
commit 6292662f9d
32 changed files with 848 additions and 176 deletions

View File

@ -10,6 +10,7 @@
#include "postgres.h"
#include "distributed/backend_data.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
@ -232,6 +233,10 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
*/
char nodePortString[12] = "";
StringInfo applicationName = makeStringInfo();
appendStringInfo(applicationName, "%s%ld", CITUS_APPLICATION_NAME_PREFIX,
GetGlobalPID());
/*
* This function has three sections:
* - Initialize the keywords and values (to be copied later) of global parameters
@ -260,7 +265,7 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
key->database,
key->user,
GetDatabaseEncodingName(),
CITUS_APPLICATION_NAME
applicationName->data
};
/*

View File

@ -19,6 +19,7 @@
#include "access/hash.h"
#include "commands/dbcommands.h"
#include "distributed/backend_data.h"
#include "distributed/connection_management.h"
#include "distributed/errormessage.h"
#include "distributed/error_codes.h"
@ -1448,7 +1449,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
bool
IsCitusInitiatedRemoteBackend(void)
{
return application_name && strcmp(application_name, CITUS_APPLICATION_NAME) == 0;
return ExtractGlobalPID() != 0;
}

View File

@ -201,6 +201,9 @@ static bool workerNodeHashValid = false;
/* default value is -1, for coordinator it's 0 and for worker nodes > 0 */
static int32 LocalGroupId = -1;
/* default value is -1, increases with every node starting from 1 */
static int32 LocalNodeId = -1;
/* built first time through in InitializeDistCache */
static ScanKeyData DistPartitionScanKey[1];
static ScanKeyData DistShardScanKey[1];
@ -3618,6 +3621,56 @@ GetLocalGroupId(void)
}
/*
* GetNodeId returns the node identifier of the local node.
*/
int32
GetLocalNodeId(void)
{
InitializeCaches();
/*
* Already set the node id, no need to read the heap again.
*/
if (LocalNodeId != -1)
{
return LocalNodeId;
}
uint32 nodeId = -1;
int32 localGroupId = GetLocalGroupId();
bool includeNodesFromOtherClusters = false;
List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
if (workerNode->groupId == localGroupId &&
workerNode->isActive)
{
nodeId = workerNode->nodeId;
break;
}
}
if (nodeId == -1)
{
elog(DEBUG1, "there is no active node with group id '%d' on pg_dist_node",
localGroupId);
if (IsCoordinator())
{
nodeId = 0;
}
}
LocalNodeId = nodeId;
return nodeId;
}
/*
* RegisterLocalGroupIdCacheCallbacks registers the callbacks required to
* maintain LocalGroupId at a consistent value. It's separate from

View File

@ -474,6 +474,7 @@ StartupCitusBackend(void)
InitializeMaintenanceDaemonBackend();
InitializeBackendData();
RegisterConnectionCleanup();
AssignGlobalPID();
}

View File

@ -13,6 +13,56 @@
#include "udfs/citus_run_local_command/11.0-1.sql"
#include "udfs/worker_drop_sequence_dependency/11.0-1.sql"
#include "udfs/get_all_active_transactions/11.0-1.sql"
#include "udfs/get_global_active_transactions/11.0-1.sql"
#include "udfs/citus_worker_stat_activity/11.0-1.sql"
#include "udfs/citus_dist_stat_activity/11.0-1.sql"
SET search_path = 'pg_catalog';
CREATE VIEW citus.citus_lock_waits AS
WITH
citus_dist_stat_activity AS
(
SELECT * FROM citus_dist_stat_activity
),
unique_global_wait_edges AS
(
SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges()
),
citus_dist_stat_activity_with_node_id AS
(
SELECT
citus_dist_stat_activity.*, (CASE citus_dist_stat_activity.distributed_query_host_name WHEN 'coordinator_host' THEN 0 ELSE pg_dist_node.nodeid END) as initiator_node_id
FROM
citus_dist_stat_activity LEFT JOIN pg_dist_node
ON
citus_dist_stat_activity.distributed_query_host_name = pg_dist_node.nodename AND
citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport
)
SELECT
waiting.pid AS waiting_pid,
blocking.pid AS blocking_pid,
waiting.query AS blocked_statement,
blocking.query AS current_statement_in_blocking_process,
waiting.initiator_node_id AS waiting_node_id,
blocking.initiator_node_id AS blocking_node_id,
waiting.distributed_query_host_name AS waiting_node_name,
blocking.distributed_query_host_name AS blocking_node_name,
waiting.distributed_query_host_port AS waiting_node_port,
blocking.distributed_query_host_port AS blocking_node_port
FROM
unique_global_wait_edges
JOIN
citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id)
JOIN
citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id);
ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;
RESET search_path;
DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text);
DROP FUNCTION pg_catalog.master_get_table_metadata(text);

View File

@ -83,3 +83,127 @@ DROP FUNCTION pg_catalog.citus_shards_on_worker();
DROP FUNCTION pg_catalog.citus_shard_indexes_on_worker();
#include "../udfs/create_distributed_function/9.0-1.sql"
ALTER TABLE citus.pg_dist_object DROP COLUMN force_delegation;
SET search_path = 'pg_catalog';
DROP FUNCTION IF EXISTS get_all_active_transactions();
CREATE OR REPLACE FUNCTION get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL,
OUT transaction_number int8, OUT transaction_stamp timestamptz)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME',
$$get_all_active_transactions$$;
COMMENT ON FUNCTION get_all_active_transactions(OUT datid oid, OUT datname text, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL,
OUT transaction_number int8, OUT transaction_stamp timestamptz)
IS 'returns distributed transaction ids of active distributed transactions';
DROP FUNCTION IF EXISTS get_global_active_transactions();
CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$get_global_active_transactions$$;
COMMENT ON FUNCTION get_global_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
IS 'returns distributed transaction ids of active distributed transactions from each node of the cluster';
RESET search_path;
DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME',
$$citus_dist_stat_activity$$;
COMMENT ON FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text)
IS 'returns distributed transaction activity on distributed tables';
CREATE VIEW citus.citus_dist_stat_activity AS
SELECT * FROM pg_catalog.citus_dist_stat_activity();
ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC;
SET search_path = 'pg_catalog';
CREATE VIEW citus.citus_lock_waits AS
WITH
citus_dist_stat_activity AS
(
SELECT * FROM citus_dist_stat_activity
),
unique_global_wait_edges AS
(
SELECT DISTINCT ON(waiting_node_id, waiting_transaction_num, blocking_node_id, blocking_transaction_num) * FROM dump_global_wait_edges()
),
citus_dist_stat_activity_with_node_id AS
(
SELECT
citus_dist_stat_activity.*, (CASE citus_dist_stat_activity.distributed_query_host_name WHEN 'coordinator_host' THEN 0 ELSE pg_dist_node.nodeid END) as initiator_node_id
FROM
citus_dist_stat_activity LEFT JOIN pg_dist_node
ON
citus_dist_stat_activity.distributed_query_host_name = pg_dist_node.nodename AND
citus_dist_stat_activity.distributed_query_host_port = pg_dist_node.nodeport
)
SELECT
waiting.pid AS waiting_pid,
blocking.pid AS blocking_pid,
waiting.query AS blocked_statement,
blocking.query AS current_statement_in_blocking_process,
waiting.initiator_node_id AS waiting_node_id,
blocking.initiator_node_id AS blocking_node_id,
waiting.distributed_query_host_name AS waiting_node_name,
blocking.distributed_query_host_name AS blocking_node_name,
waiting.distributed_query_host_port AS waiting_node_port,
blocking.distributed_query_host_port AS blocking_node_port
FROM
unique_global_wait_edges
JOIN
citus_dist_stat_activity_with_node_id waiting ON (unique_global_wait_edges.waiting_transaction_num = waiting.transaction_number AND unique_global_wait_edges.waiting_node_id = waiting.initiator_node_id)
JOIN
citus_dist_stat_activity_with_node_id blocking ON (unique_global_wait_edges.blocking_transaction_num = blocking.transaction_number AND unique_global_wait_edges.blocking_node_id = blocking.initiator_node_id);
ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;
DROP FUNCTION citus_worker_stat_activity CASCADE;
CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME',
$$citus_worker_stat_activity$$;
COMMENT ON FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text)
IS 'returns distributed transaction activity on shards of distributed tables';
CREATE VIEW citus.citus_worker_stat_activity AS
SELECT * FROM pg_catalog.citus_worker_stat_activity();
ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC;
RESET search_path;

View File

@ -0,0 +1,24 @@
DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME',
$$citus_dist_stat_activity$$;
COMMENT ON FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8)
IS 'returns distributed transaction activity on distributed tables';
CREATE VIEW citus.citus_dist_stat_activity AS
SELECT * FROM pg_catalog.citus_dist_stat_activity();
ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC;

View File

@ -0,0 +1,24 @@
DROP FUNCTION pg_catalog.citus_dist_stat_activity CASCADE;
CREATE OR REPLACE FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME',
$$citus_dist_stat_activity$$;
COMMENT ON FUNCTION pg_catalog.citus_dist_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8)
IS 'returns distributed transaction activity on distributed tables';
CREATE VIEW citus.citus_dist_stat_activity AS
SELECT * FROM pg_catalog.citus_dist_stat_activity();
ALTER VIEW citus.citus_dist_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_dist_stat_activity TO PUBLIC;

View File

@ -0,0 +1,24 @@
DROP FUNCTION citus_worker_stat_activity CASCADE;
CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME',
$$citus_worker_stat_activity$$;
COMMENT ON FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8)
IS 'returns distributed transaction activity on shards of distributed tables';
CREATE VIEW citus.citus_worker_stat_activity AS
SELECT * FROM pg_catalog.citus_worker_stat_activity();
ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC;

View File

@ -0,0 +1,24 @@
DROP FUNCTION citus_worker_stat_activity CASCADE;
CREATE OR REPLACE FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME',
$$citus_worker_stat_activity$$;
COMMENT ON FUNCTION citus_worker_stat_activity(OUT query_hostname text, OUT query_hostport int, OUT distributed_query_host_name text, OUT distributed_query_host_port int,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT datid oid, OUT datname name,
OUT pid int, OUT usesysid oid, OUT usename name, OUT application_name text, OUT client_addr INET,
OUT client_hostname TEXT, OUT client_port int, OUT backend_start timestamptz, OUT xact_start timestamptz,
OUT query_start timestamptz, OUT state_change timestamptz, OUT wait_event_type text, OUT wait_event text,
OUT state text, OUT backend_xid xid, OUT backend_xmin xid, OUT query text, OUT backend_type text, OUT global_pid int8)
IS 'returns distributed transaction activity on shards of distributed tables';
CREATE VIEW citus.citus_worker_stat_activity AS
SELECT * FROM pg_catalog.citus_worker_stat_activity();
ALTER VIEW citus.citus_worker_stat_activity SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC;

View File

@ -0,0 +1,10 @@
DROP FUNCTION IF EXISTS get_all_active_transactions();
CREATE OR REPLACE FUNCTION get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME',
$$get_all_active_transactions$$;
COMMENT ON FUNCTION get_all_active_transactions(OUT datid oid, OUT datname text, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8)
IS 'returns distributed transaction ids of active distributed transactions';

View File

@ -0,0 +1,10 @@
DROP FUNCTION IF EXISTS get_all_active_transactions();
CREATE OR REPLACE FUNCTION get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8)
RETURNS SETOF RECORD
LANGUAGE C STRICT AS 'MODULE_PATHNAME',
$$get_all_active_transactions$$;
COMMENT ON FUNCTION get_all_active_transactions(OUT datid oid, OUT datname text, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL,
OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8)
IS 'returns distributed transaction ids of active distributed transactions';

View File

@ -0,0 +1,7 @@
DROP FUNCTION IF EXISTS get_global_active_transactions();
CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$get_global_active_transactions$$;
COMMENT ON FUNCTION get_global_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8)
IS 'returns distributed transaction ids of active distributed transactions from each node of the cluster';

View File

@ -0,0 +1,7 @@
DROP FUNCTION IF EXISTS get_global_active_transactions();
CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$get_global_active_transactions$$;
COMMENT ON FUNCTION get_global_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8)
IS 'returns distributed transaction ids of active distributed transactions from each node of the cluster';

View File

@ -15,6 +15,9 @@
#include "distributed/pg_version_constants.h"
#include "miscadmin.h"
#include "unistd.h"
#include "safe_lib.h"
#include "funcapi.h"
#include "access/htup_details.h"
@ -43,7 +46,7 @@
#define GET_ACTIVE_TRANSACTION_QUERY "SELECT * FROM get_all_active_transactions();"
#define ACTIVE_TRANSACTION_COLUMN_COUNT 6
#define ACTIVE_TRANSACTION_COLUMN_COUNT 7
/*
* Each backend's data reside in the shared memory
@ -78,6 +81,7 @@ typedef struct BackendManagementShmemData
static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc
tupleDescriptor);
static uint64 GenerateGlobalPID(void);
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static BackendManagementShmemData *backendManagementShmemData = NULL;
@ -86,6 +90,7 @@ static BackendData *MyBackendData = NULL;
static void BackendManagementShmemInit(void);
static size_t BackendManagementShmemSize(void);
static void UnSetGlobalPID(void);
PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
@ -315,6 +320,7 @@ get_global_active_transactions(PG_FUNCTION_ARGS)
values[3] = ParseBoolField(result, rowIndex, 3);
values[4] = ParseIntField(result, rowIndex, 4);
values[5] = ParseTimestampTzField(result, rowIndex, 5);
values[6] = ParseIntField(result, rowIndex, 6);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
@ -384,8 +390,7 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
SpinLockAcquire(&currentBackend->mutex);
/* we're only interested in backends initiated by Citus */
if (currentBackend->citusBackend.initiatorNodeIdentifier < 0)
if (currentBackend->globalPID == 0)
{
SpinLockRelease(&currentBackend->mutex);
continue;
@ -427,6 +432,7 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
values[3] = !coordinatorOriginatedQuery;
values[4] = UInt64GetDatum(transactionNumber);
values[5] = TimestampTzGetDatum(transactionIdTimestamp);
values[6] = UInt64GetDatum(currentBackend->globalPID);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
@ -631,6 +637,7 @@ InitializeBackendData(void)
/* zero out the backend data */
UnSetDistributedTransactionId();
UnSetGlobalPID();
UnlockBackendSharedMemory();
}
@ -664,6 +671,24 @@ UnSetDistributedTransactionId(void)
}
/*
* UnSetGlobalPID resets the global pid for the current backend
*/
static void
UnSetGlobalPID(void)
{
/* backend does not exist if the extension is not created */
if (MyBackendData)
{
SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->globalPID = 0;
SpinLockRelease(&MyBackendData->mutex);
}
}
/*
* LockBackendSharedMemory is a simple wrapper around LWLockAcquire on the
* shared memory lock.
@ -780,6 +805,113 @@ MarkCitusInitiatedCoordinatorBackend(void)
}
/*
* AssignGlobalPID assigns a global process id for the current backend if
* it is not already assigned
*/
void
AssignGlobalPID(void)
{
if (MyBackendData->globalPID != 0)
{
return;
}
if (!IsCitusInitiatedRemoteBackend())
{
uint64 globalPID = GenerateGlobalPID();
SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->globalPID = globalPID;
SpinLockRelease(&MyBackendData->mutex);
return;
}
uint64 globalPID = ExtractGlobalPID();
SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->globalPID = globalPID;
SpinLockRelease(&MyBackendData->mutex);
}
/*
* GetGlobalPID returns the global process id of the current backend
*/
uint64
GetGlobalPID(void)
{
/* assign here incase it is not assigned already */
AssignGlobalPID();
uint64 globalPID = 0;
SpinLockAcquire(&MyBackendData->mutex);
globalPID = MyBackendData->globalPID;
SpinLockRelease(&MyBackendData->mutex);
return globalPID;
}
/*
* GenerateGlobalPID generates the global process id for the current backend
*/
static uint64
GenerateGlobalPID(void)
{
/*
* We try to create a human readable global pid that consists of node id and process id.
* By multiplying node id with 10^10 and adding pid we generate a number where the smallest
* 10 digit represent the pid and the remaining digits are the node id
*
* Both node id and pid are 32 bit. We use 10^10 to fit all possible pids. Some very large
* node ids might cause overflow. But even for the applications that scale around 50 nodes every
* day it'd take about 100K years. So we are not worried.
*/
return (((uint64) GetLocalNodeId()) * 10000000000) + getpid();
}
/*
* ExtractGlobalPID extracts the global process id from the application name and returns it
* if the application name is not compatible with Citus' application names returns 0
*/
uint64
ExtractGlobalPID(void)
{
/* does application name exist */
if (!application_name)
{
return 0;
}
uint64 prefixLength = strlen(CITUS_APPLICATION_NAME_PREFIX);
/* does application name start with Citus's application name prefix */
if (strncmp(application_name, CITUS_APPLICATION_NAME_PREFIX, prefixLength) != 0)
{
return 0;
}
/* are the remaining characters of the application name numbers */
uint64 numberOfRemainingChars = strlen(application_name) - prefixLength;
if (!strisdigit_s(application_name + prefixLength, numberOfRemainingChars))
{
return 0;
}
StringInfo applicationName = makeStringInfo();
appendStringInfoString(applicationName, application_name);
char *globalPIDString = &applicationName->data[strlen(CITUS_APPLICATION_NAME_PREFIX)];
uint64 globalPID = strtoul(globalPIDString, NULL, 10);
return globalPID;
}
/*
* CurrentDistributedTransactionNumber returns the transaction number of the
* current distributed transaction. The caller must make sure a distributed

View File

@ -108,7 +108,7 @@
* showing the initiator_node_id we expand it to initiator_node_host and
* initiator_node_port.
*/
#define CITUS_DIST_STAT_ACTIVITY_QUERY_COLS 23
#define CITUS_DIST_STAT_ACTIVITY_QUERY_COLS 24
#define CITUS_DIST_STAT_ADDITIONAL_COLS 3
#define CITUS_DIST_STAT_ACTIVITY_COLS \
CITUS_DIST_STAT_ACTIVITY_QUERY_COLS + CITUS_DIST_STAT_ADDITIONAL_COLS
@ -147,11 +147,12 @@ SELECT \
pg_stat_activity.backend_xid, \
pg_stat_activity.backend_xmin, \
pg_stat_activity.query, \
pg_stat_activity.backend_type \
pg_stat_activity.backend_type, \
dist_txs.global_pid \
FROM \
pg_stat_activity \
INNER JOIN \
get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp) \
get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp, global_pid) \
ON pg_stat_activity.pid = dist_txs.process_id \
WHERE \
dist_txs.worker_query = false;"
@ -181,14 +182,15 @@ SELECT \
pg_stat_activity.backend_xid, \
pg_stat_activity.backend_xmin, \
pg_stat_activity.query, \
pg_stat_activity.backend_type \
pg_stat_activity.backend_type, \
dist_txs.global_id \
FROM \
pg_stat_activity \
LEFT JOIN \
get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp) \
get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp, global_id) \
ON pg_stat_activity.pid = dist_txs.process_id \
WHERE \
pg_stat_activity.application_name = 'citus' \
pg_stat_activity.application_name SIMILAR TO 'citus gpid=\\d+' \
AND \
pg_stat_activity.query NOT ILIKE '%stat_activity%';"
@ -223,6 +225,7 @@ typedef struct CitusDistStat
TransactionId backend_xmin;
text *query;
text *backend_type;
uint64 global_pid;
} CitusDistStat;
@ -501,6 +504,7 @@ ParseCitusDistStat(PGresult *result, int64 rowIndex)
citusDistStat->backend_xmin = ParseXIDField(result, rowIndex, 20);
citusDistStat->query = ParseTextField(result, rowIndex, 21);
citusDistStat->backend_type = ParseTextField(result, rowIndex, 22);
citusDistStat->global_pid = ParseIntField(result, rowIndex, 23);
return citusDistStat;
}
@ -688,6 +692,7 @@ HeapTupleToCitusDistStat(HeapTuple result, TupleDesc rowDescriptor)
citusDistStat->backend_xmin = ParseXIDFieldFromHeapTuple(result, rowDescriptor, 21);
citusDistStat->query = ParseTextFieldFromHeapTuple(result, rowDescriptor, 22);
citusDistStat->backend_type = ParseTextFieldFromHeapTuple(result, rowDescriptor, 23);
citusDistStat->global_pid = ParseIntFieldFromHeapTuple(result, rowDescriptor, 24);
return citusDistStat;
}
@ -1098,6 +1103,8 @@ ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo)
nulls[25] = true;
}
values[26] = Int32GetDatum(citusDistStat->global_pid);
tuplestore_putvalues(tupleStore, tupleDesc, values, nulls);
}
}

View File

@ -50,6 +50,7 @@ typedef struct BackendData
Oid userId;
slock_t mutex;
bool cancelledDueToDeadlock;
uint64 globalPID;
CitusInitiatedBackend citusBackend;
DistributedTransactionId transactionId;
} BackendData;
@ -63,6 +64,9 @@ extern void UnlockBackendSharedMemory(void);
extern void UnSetDistributedTransactionId(void);
extern void AssignDistributedTransactionId(void);
extern void MarkCitusInitiatedCoordinatorBackend(void);
extern void AssignGlobalPID(void);
extern uint64 GetGlobalPID(void);
extern uint64 ExtractGlobalPID(void);
extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
extern void CancelTransactionDueToDeadlock(PGPROC *proc);
extern bool MyBackendGotCancelledDueToDeadlock(bool clearState);

View File

@ -28,8 +28,8 @@
/* used for libpq commands that get an error buffer. Postgres docs recommend 256. */
#define ERROR_BUFFER_SIZE 256
/* application name used for internal connections in Citus */
#define CITUS_APPLICATION_NAME "citus"
/* application name prefix used for internal connections in Citus */
#define CITUS_APPLICATION_NAME_PREFIX "citus gpid="
/* forward declare, to avoid forcing large headers on everyone */
struct pg_conn; /* target of the PGconn typedef */

View File

@ -165,6 +165,7 @@ extern CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId);
extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32
objsubid);
extern int32 GetLocalGroupId(void);
extern int32 GetLocalNodeId(void);
extern void CitusTableCacheFlushInvalidatedEntries(void);
extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok);
extern List * ShardPlacementListIncludingOrphanedPlacements(uint64 shardId);

View File

@ -225,8 +225,8 @@ SELECT count(*) FROM single_replicatated WHERE key = 100;
RESET client_min_messages;
-- verify get_global_active_transactions works when a timeout happens on a connection
SELECT get_global_active_transactions();
get_global_active_transactions
SELECT * FROM get_global_active_transactions() WHERE datid != 0;
datid | process_id | initiator_node_identifier | worker_query | transaction_number | transaction_stamp | global_pid
---------------------------------------------------------------------
(0 rows)

View File

@ -94,7 +94,8 @@ step s1-verify-current-xact-is-on-worker:
get_current_transaction_id() as xact,
run_command_on_workers($$
SELECT row(initiator_node_identifier, transaction_number)
FROM get_all_active_transactions();
FROM get_all_active_transactions()
WHERE datid != 0;
$$) as remote
ORDER BY remote.nodeport ASC;

View File

@ -35,8 +35,8 @@ step s2-begin-insert:
step s3-as-admin:
-- Admin should be able to see all transactions
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
SELECT count(*) FROM get_all_active_transactions() WHERE datid != 0;
SELECT count(*) FROM get_global_active_transactions() WHERE datid != 0;
count
---------------------------------------------------------------------
@ -51,8 +51,8 @@ count
step s3-as-user-1:
-- User should only be able to see its own transactions
SET ROLE test_user_1;
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
SELECT count(*) FROM get_all_active_transactions() WHERE datid != 0;
SELECT count(*) FROM get_global_active_transactions() WHERE datid != 0;
count
---------------------------------------------------------------------
@ -67,8 +67,8 @@ count
step s3-as-readonly:
-- Other user should not see transactions
SET ROLE test_readonly;
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
SELECT count(*) FROM get_all_active_transactions() WHERE datid != 0;
SELECT count(*) FROM get_global_active_transactions() WHERE datid != 0;
count
---------------------------------------------------------------------
@ -83,8 +83,8 @@ count
step s3-as-monitor:
-- Monitor should see all transactions
SET ROLE test_monitor;
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
SELECT count(*) FROM get_all_active_transactions() WHERE datid != 0;
SELECT count(*) FROM get_global_active_transactions() WHERE datid != 0;
count
---------------------------------------------------------------------

View File

@ -0,0 +1,99 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-select s2-citus_dist_stat_activity s1-commit
step s1-begin:
BEGIN;
step s1-select:
SELECT * FROM dist_table;
a|b
---------------------------------------------------------------------
(0 rows)
step s2-citus_dist_stat_activity:
SELECT global_pid != 0 FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%';
?column?
---------------------------------------------------------------------
t
(1 row)
step s1-commit:
COMMIT;
starting permutation: s1-begin s1-select s2-citus_worker_stat_activity s1-commit
step s1-begin:
BEGIN;
step s1-select:
SELECT * FROM dist_table;
a|b
---------------------------------------------------------------------
(0 rows)
step s2-citus_worker_stat_activity:
SELECT count(*) FROM citus_worker_stat_activity() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
);
count
---------------------------------------------------------------------
2
(1 row)
step s1-commit:
COMMIT;
starting permutation: s1-begin s1-select s2-get_all_active_transactions s1-commit
step s1-begin:
BEGIN;
step s1-select:
SELECT * FROM dist_table;
a|b
---------------------------------------------------------------------
(0 rows)
step s2-get_all_active_transactions:
SELECT count(*) FROM get_all_active_transactions() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
);
count
---------------------------------------------------------------------
1
(1 row)
step s1-commit:
COMMIT;
starting permutation: s1-begin s1-select s2-get_global_active_transactions s1-commit
step s1-begin:
BEGIN;
step s1-select:
SELECT * FROM dist_table;
a|b
---------------------------------------------------------------------
(0 rows)
step s2-get_global_active_transactions:
SELECT count(*) FROM get_global_active_transactions() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
);
count
---------------------------------------------------------------------
3
(1 row)
step s1-commit:
COMMIT;

View File

@ -141,8 +141,8 @@ step s1-update-ref-table:
step s2-active-transactions:
-- Admin should be able to see all transactions
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
SELECT count(*) FROM get_all_active_transactions() WHERE datid != 0;
SELECT count(*) FROM get_global_active_transactions() WHERE datid != 0;
count
---------------------------------------------------------------------

View File

@ -36,7 +36,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ERROR: This is an internal Citus function can only be used in a distributed transaction
ROLLBACK;
@ -73,7 +73,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ERROR: must be owner of table test
ROLLBACK;
@ -85,7 +85,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test'::regclass, 10);
ERROR: must be owner of table test
ROLLBACK;
@ -99,7 +99,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's');
citus_internal_add_partition_metadata
---------------------------------------------------------------------
@ -121,7 +121,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's');
ERROR: Metadata syncing is only allowed for hash, reference and local tables:X
ROLLBACK;
@ -133,7 +133,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'non_existing_col', 0, 's');
ERROR: column "non_existing_col" of relation "test_2" does not exist
ROLLBACK;
@ -145,7 +145,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata (NULL, 'h', 'non_existing_col', 0, 's');
ERROR: relation cannot be NULL
ROLLBACK;
@ -157,7 +157,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', -1, 's');
ERROR: Metadata syncing is only allowed for valid colocation id values.
ROLLBACK;
@ -169,7 +169,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 'X');
ERROR: Metadata syncing is only allowed for hash, reference and local tables:X
ROLLBACK;
@ -181,7 +181,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's');
citus_internal_add_partition_metadata
@ -200,7 +200,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's');
citus_internal_add_partition_metadata
@ -219,7 +219,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', NULL, 0, 's');
ERROR: Distribution column cannot be NULL for relation "test_2"
@ -252,7 +252,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's');
citus_internal_add_partition_metadata
---------------------------------------------------------------------
@ -268,7 +268,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420007, 10000, 11111);
ERROR: could not find valid entry for shard xxxxx
@ -298,7 +298,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's');
ERROR: role "non_existing_user" does not exist
ROLLBACK;
@ -329,7 +329,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', 'col_1', 0, 's');
ERROR: Reference or local tables cannot have distribution columns
ROLLBACK;
@ -341,7 +341,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'A');
ERROR: Metadata syncing is only allowed for known replication models.
ROLLBACK;
@ -353,7 +353,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'c');
ERROR: Local or references tables can only have 's' or 't' as the replication model.
ROLLBACK;
@ -368,7 +368,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('super_user_table'::regclass, 'h', 'col_1', 0, 's');
citus_internal_add_partition_metadata
---------------------------------------------------------------------
@ -387,7 +387,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('super_user_table'::regclass, 1420000::bigint, 't'::"char", '-2147483648'::text, '-1610612737'::text))
@ -402,7 +402,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '-2147483648'::text, '-1610612737'::text))
@ -417,7 +417,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 250, 's');
citus_internal_add_partition_metadata
---------------------------------------------------------------------
@ -445,7 +445,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232);
citus_internal_update_relation_colocation
---------------------------------------------------------------------
@ -461,7 +461,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, -1, 't'::"char", '-2147483648'::text, '-1610612737'::text))
@ -476,7 +476,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000, 'X'::"char", '-2147483648'::text, '-1610612737'::text))
@ -491,7 +491,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000, 't'::"char", NULL, '-1610612737'::text))
@ -506,7 +506,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", 'non-int'::text, '-1610612737'::text))
@ -521,7 +521,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '-1610612737'::text, '-2147483648'::text))
@ -536,7 +536,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '10'::text, '20'::text),
@ -554,7 +554,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('non_existing_type', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false))
@ -569,7 +569,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -100, 0, false))
@ -583,7 +583,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -1, -1, false))
@ -598,7 +598,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false))
@ -614,7 +614,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], 0, NULL::int, false))
@ -635,7 +635,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
CREATE TABLE publication_test_table(id int);
CREATE PUBLICATION publication_test FOR TABLE publication_test_table;
@ -653,7 +653,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
CREATE FUNCTION distribution_test_function(int) RETURNS int
AS $$ SELECT $1 $$
@ -674,7 +674,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
UPDATE pg_dist_partition SET partmethod = 'X';
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
@ -693,7 +693,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '10'::text, '20'::text))
@ -720,7 +720,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '11'::text, '20'::text),
@ -751,7 +751,7 @@ BEGIN;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
ERROR: cannot colocate tables test_2 and test_3
ROLLBACK;
@ -763,7 +763,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_3'::regclass, 1420009::bigint, 't'::"char", '21'::text, '30'::text),
@ -790,7 +790,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_ref'::regclass, 1420003::bigint, 't'::"char", '-1610612737'::text, NULL))
@ -805,7 +805,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_ref'::regclass, 1420006::bigint, 't'::"char", NULL, NULL),
@ -821,7 +821,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_ref'::regclass, 1420006::bigint, 't'::"char", NULL, NULL))
@ -842,7 +842,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('super_user_table'::regclass, 1420007::bigint, 't'::"char", '11'::text, '20'::text))
@ -864,7 +864,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (-10, 1, 0::bigint, 1::int, 1500000::bigint))
@ -879,7 +879,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1420000, 1, 0::bigint, 1::int, -10))
@ -894,7 +894,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1430100, 1, 0::bigint, 1::int, 10))
@ -909,7 +909,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1420000, 10, 0::bigint, 1::int, 1500000))
@ -924,7 +924,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES ( 1420000, 1, 0::bigint, 123123123::int, 1500000))
@ -952,7 +952,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1420000, 1, 0::bigint, get_node_id(), 1500000),
@ -968,7 +968,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1420007, 1, 0::bigint, get_node_id(), 1500000))
@ -983,7 +983,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1420000, 1, 0::bigint, get_node_id(), 1500000),
@ -1024,7 +1024,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
citus_internal_update_relation_colocation
---------------------------------------------------------------------
@ -1041,7 +1041,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420000, get_node_id(), get_node_id()+1000);
ERROR: Node with group id 1014 for shard placement xxxxx does not exist
@ -1054,7 +1054,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420000, get_node_id()+10000, get_node_id());
ERROR: Active placement for shard xxxxx is not found on group:14
@ -1067,7 +1067,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(0, get_node_id(), get_node_id()+1);
ERROR: Shard id does not exists: 0
@ -1080,7 +1080,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(213123123123, get_node_id(), get_node_id()+1);
ERROR: Shard id does not exists: 213123123123
@ -1093,7 +1093,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420007, get_node_id(), get_node_id()+1);
ERROR: must be owner of table super_user_table
@ -1106,7 +1106,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420007))
@ -1115,7 +1115,7 @@ ERROR: must be owner of table super_user_table
ROLLBACK;
-- the user only allowed to delete shards in a distributed transaction
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420007))
@ -1130,7 +1130,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420100))
@ -1157,7 +1157,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420000))
@ -1191,7 +1191,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
-- with an ugly trick, update the repmodel
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET repmodel = 't'
@ -1206,7 +1206,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
-- with an ugly trick, update the vartype of table from int to bigint
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partkey = '{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}'
@ -1221,7 +1221,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
-- with an ugly trick, update the partmethod of the table to not-valid
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = ''
@ -1236,7 +1236,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
-- with an ugly trick, update the partmethod of the table to not-valid
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = 'a'
@ -1254,7 +1254,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_add_partition_metadata ('test_5'::regclass, 'h', 'int_col', 500, 's');
citus_internal_add_partition_metadata
@ -1277,7 +1277,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_add_partition_metadata ('test_7'::regclass, 'h', 'text_col', 500, 's');
citus_internal_add_partition_metadata

View File

@ -63,6 +63,7 @@ test: shared_connection_waits
test: isolation_cancellation
test: isolation_undistribute_table
test: isolation_fix_partition_shard_index_names
test: isolation_global_pid
# Rebalancer
test: isolation_blocking_move_single_shard_commands

View File

@ -54,7 +54,8 @@ step "s1-verify-current-xact-is-on-worker"
get_current_transaction_id() as xact,
run_command_on_workers($$
SELECT row(initiator_node_identifier, transaction_number)
FROM get_all_active_transactions();
FROM get_all_active_transactions()
WHERE datid != 0;
$$) as remote
ORDER BY remote.nodeport ASC;
}

View File

@ -71,32 +71,32 @@ session "s3"
step "s3-as-admin"
{
-- Admin should be able to see all transactions
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
SELECT count(*) FROM get_all_active_transactions() WHERE datid != 0;
SELECT count(*) FROM get_global_active_transactions() WHERE datid != 0;
}
step "s3-as-user-1"
{
-- User should only be able to see its own transactions
SET ROLE test_user_1;
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
SELECT count(*) FROM get_all_active_transactions() WHERE datid != 0;
SELECT count(*) FROM get_global_active_transactions() WHERE datid != 0;
}
step "s3-as-readonly"
{
-- Other user should not see transactions
SET ROLE test_readonly;
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
SELECT count(*) FROM get_all_active_transactions() WHERE datid != 0;
SELECT count(*) FROM get_global_active_transactions() WHERE datid != 0;
}
step "s3-as-monitor"
{
-- Monitor should see all transactions
SET ROLE test_monitor;
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
SELECT count(*) FROM get_all_active_transactions() WHERE datid != 0;
SELECT count(*) FROM get_global_active_transactions() WHERE datid != 0;
}
permutation "s1-grant" "s1-begin-insert" "s2-begin-insert" "s3-as-admin" "s3-as-user-1" "s3-as-readonly" "s3-as-monitor" "s1-commit" "s2-commit"

View File

@ -0,0 +1,62 @@
setup
{
CREATE TABLE dist_table (a INT, b INT);
SELECT create_distributed_table('dist_table', 'a');
SET citus.force_max_query_parallelization TO ON;
}
teardown
{
DROP TABLE dist_table;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-select"
{
SELECT * FROM dist_table;
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-citus_dist_stat_activity"
{
SELECT global_pid != 0 FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%';
}
step "s2-citus_worker_stat_activity"
{
SELECT count(*) FROM citus_worker_stat_activity() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
);
}
step "s2-get_all_active_transactions"
{
SELECT count(*) FROM get_all_active_transactions() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
);
}
step "s2-get_global_active_transactions"
{
SELECT count(*) FROM get_global_active_transactions() WHERE global_pid IN (
SELECT global_pid FROM citus_dist_stat_activity() WHERE query LIKE '%SELECT * FROM dist\_table%'
);
}
permutation "s1-begin" "s1-select" "s2-citus_dist_stat_activity" "s1-commit"
permutation "s1-begin" "s1-select" "s2-citus_worker_stat_activity" "s1-commit"
permutation "s1-begin" "s1-select" "s2-get_all_active_transactions" "s1-commit"
permutation "s1-begin" "s1-select" "s2-get_global_active_transactions" "s1-commit"

View File

@ -106,8 +106,8 @@ step "s2-sleep"
step "s2-active-transactions"
{
-- Admin should be able to see all transactions
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
SELECT count(*) FROM get_all_active_transactions() WHERE datid != 0;
SELECT count(*) FROM get_global_active_transactions() WHERE datid != 0;
}
// we disable the daemon during the regression tests in order to get consistent results

View File

@ -124,7 +124,7 @@ SELECT count(*) FROM single_replicatated WHERE key = 100;
RESET client_min_messages;
-- verify get_global_active_transactions works when a timeout happens on a connection
SELECT get_global_active_transactions();
SELECT * FROM get_global_active_transactions() WHERE datid != 0;
-- tests for connectivity checks
SET client_min_messages TO ERROR;

View File

@ -28,7 +28,7 @@ ROLLBACK;
-- but we are on the coordinator, so still not allowed
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ROLLBACK;
@ -67,14 +67,14 @@ SET search_path TO metadata_sync_helpers;
-- owner of the table test
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ROLLBACK;
-- we do not own the relation
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test'::regclass, 10);
ROLLBACK;
@ -83,7 +83,7 @@ CREATE TABLE test_2(col_1 int, col_2 int);
CREATE TABLE test_3(col_1 int, col_2 int);
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's');
SELECT count(*) FROM pg_dist_partition WHERE logicalrelid = 'metadata_sync_helpers.test_2'::regclass;
ROLLBACK;
@ -91,42 +91,42 @@ ROLLBACK;
-- fails because there is no X distribution method
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's');
ROLLBACK;
-- fails because there is the column does not exist
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'non_existing_col', 0, 's');
ROLLBACK;
--- fails because we do not allow NULL parameters
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata (NULL, 'h', 'non_existing_col', 0, 's');
ROLLBACK;
-- fails because colocationId cannot be negative
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', -1, 's');
ROLLBACK;
-- fails because there is no X replication model
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 'X');
ROLLBACK;
-- the same table cannot be added twice, that is enforced by a primary key
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's');
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's');
@ -135,7 +135,7 @@ ROLLBACK;
-- the same table cannot be added twice, that is enforced by a primary key even if distribution key changes
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's');
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_2', 0, 's');
@ -144,7 +144,7 @@ ROLLBACK;
-- hash distributed table cannot have NULL distribution key
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', NULL, 0, 's');
ROLLBACK;
@ -165,14 +165,14 @@ SET search_path TO metadata_sync_helpers;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's');
ROLLBACK;
-- should throw error even if we skip the checks, there are no such nodes
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420007, 10000, 11111);
ROLLBACK;
@ -189,7 +189,7 @@ SET search_path TO metadata_sync_helpers;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'X', 'col_1', 0, 's');
ROLLBACK;
@ -207,21 +207,21 @@ SET search_path TO metadata_sync_helpers;
CREATE TABLE test_ref(col_1 int, col_2 int);
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', 'col_1', 0, 's');
ROLLBACK;
-- non-valid replication model
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'A');
ROLLBACK;
-- not-matching replication model for reference table
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'c');
ROLLBACK;
@ -231,7 +231,7 @@ SET search_path TO metadata_sync_helpers;
CREATE TABLE super_user_table(col_1 int);
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('super_user_table'::regclass, 'h', 'col_1', 0, 's');
COMMIT;
@ -244,7 +244,7 @@ SET search_path TO metadata_sync_helpers;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('super_user_table'::regclass, 1420000::bigint, 't'::"char", '-2147483648'::text, '-1610612737'::text))
@ -254,7 +254,7 @@ ROLLBACK;
-- the user is only allowed to add a shard for add a table which is in pg_dist_partition
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '-2147483648'::text, '-1610612737'::text))
@ -264,7 +264,7 @@ ROLLBACK;
-- ok, now add the table to the pg_dist_partition
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 250, 's');
SELECT citus_internal_add_partition_metadata ('test_3'::regclass, 'h', 'col_1', 251, 's');
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 't');
@ -273,14 +273,14 @@ COMMIT;
-- we can update to a non-existing colocation group (e.g., colocate_with:=none)
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232);
ROLLBACK;
-- invalid shard ids are not allowed
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, -1, 't'::"char", '-2147483648'::text, '-1610612737'::text))
@ -290,7 +290,7 @@ ROLLBACK;
-- invalid storage types are not allowed
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000, 'X'::"char", '-2147483648'::text, '-1610612737'::text))
@ -300,7 +300,7 @@ ROLLBACK;
-- NULL shard ranges are not allowed for hash distributed tables
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000, 't'::"char", NULL, '-1610612737'::text))
@ -310,7 +310,7 @@ ROLLBACK;
-- non-integer shard ranges are not allowed
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", 'non-int'::text, '-1610612737'::text))
@ -320,7 +320,7 @@ ROLLBACK;
-- shardMinValue should be smaller than shardMaxValue
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '-1610612737'::text, '-2147483648'::text))
@ -330,7 +330,7 @@ ROLLBACK;
-- we do not allow overlapping shards for the same table
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '10'::text, '20'::text),
@ -344,7 +344,7 @@ ROLLBACK;
-- check with non-existing object type
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('non_existing_type', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false))
@ -354,7 +354,7 @@ ROLLBACK;
-- check the sanity of distributionArgumentIndex and colocationId
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -100, 0, false))
@ -363,7 +363,7 @@ ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -1, -1, false))
@ -373,7 +373,7 @@ ROLLBACK;
-- check with non-existing object
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false))
@ -384,7 +384,7 @@ ROLLBACK;
-- if any parameter is NULL
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], 0, NULL::int, false))
@ -397,7 +397,7 @@ ROLLBACK;
-- which is known how to distribute
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
CREATE TABLE publication_test_table(id int);
@ -412,7 +412,7 @@ ROLLBACK;
-- Show that citus_internal_add_object_metadata checks the priviliges
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
CREATE FUNCTION distribution_test_function(int) RETURNS int
@ -430,7 +430,7 @@ ROLLBACK;
SET search_path TO metadata_sync_helpers;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
UPDATE pg_dist_partition SET partmethod = 'X';
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
@ -444,7 +444,7 @@ ROLLBACK;
SET search_path TO metadata_sync_helpers;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '10'::text, '20'::text))
@ -462,7 +462,7 @@ SET search_path TO metadata_sync_helpers;
-- now, add few shards
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_2'::regclass, 1420000::bigint, 't'::"char", '11'::text, '20'::text),
@ -478,14 +478,14 @@ COMMIT;
-- we cannot mark these two tables colocated because they are not colocated
BEGIN;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
-- now, add few more shards for test_3 to make it colocated with test_2
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_3'::regclass, 1420009::bigint, 't'::"char", '21'::text, '30'::text),
@ -499,7 +499,7 @@ COMMIT;
-- shardMin/MaxValues should be NULL for reference tables
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_ref'::regclass, 1420003::bigint, 't'::"char", '-1610612737'::text, NULL))
@ -509,7 +509,7 @@ ROLLBACK;
-- reference tables cannot have multiple shards
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_ref'::regclass, 1420006::bigint, 't'::"char", NULL, NULL),
@ -520,7 +520,7 @@ ROLLBACK;
-- finally, add a shard for reference tables
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('test_ref'::regclass, 1420006::bigint, 't'::"char", NULL, NULL))
@ -533,7 +533,7 @@ SET search_path TO metadata_sync_helpers;
-- and a shard for the superuser table
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
AS (VALUES ('super_user_table'::regclass, 1420007::bigint, 't'::"char", '11'::text, '20'::text))
@ -548,7 +548,7 @@ SET search_path TO metadata_sync_helpers;
-- shard does not exist
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (-10, 1, 0::bigint, 1::int, 1500000::bigint))
@ -558,7 +558,7 @@ ROLLBACK;
-- invalid placementid
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1420000, 1, 0::bigint, 1::int, -10))
@ -568,7 +568,7 @@ ROLLBACK;
-- non-existing shard
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1430100, 1, 0::bigint, 1::int, 10))
@ -578,7 +578,7 @@ ROLLBACK;
-- invalid shard state
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1420000, 10, 0::bigint, 1::int, 1500000))
@ -588,7 +588,7 @@ ROLLBACK;
-- non-existing node with non-existing node-id 123123123
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES ( 1420000, 1, 0::bigint, 123123123::int, 1500000))
@ -612,7 +612,7 @@ END; $$ language plpgsql;
-- fails because we ingest more placements for the same shards to the same worker node
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1420000, 1, 0::bigint, get_node_id(), 1500000),
@ -623,7 +623,7 @@ ROLLBACK;
-- shard is not owned by us
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1420007, 1, 0::bigint, get_node_id(), 1500000))
@ -633,7 +633,7 @@ ROLLBACK;
-- sucessfully add placements
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS
(VALUES (1420000, 1, 0::bigint, get_node_id(), 1500000),
@ -654,7 +654,7 @@ COMMIT;
-- we should be able to colocate both tables now
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
@ -663,7 +663,7 @@ ROLLBACK;
-- fails because we are trying to update it to non-existing node
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420000, get_node_id(), get_node_id()+1000);
COMMIT;
@ -671,7 +671,7 @@ COMMIT;
-- fails because the source node doesn't contain the shard
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420000, get_node_id()+10000, get_node_id());
COMMIT;
@ -679,7 +679,7 @@ COMMIT;
-- fails because shard does not exist
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(0, get_node_id(), get_node_id()+1);
COMMIT;
@ -687,7 +687,7 @@ COMMIT;
-- fails because none-existing shard
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(213123123123, get_node_id(), get_node_id()+1);
COMMIT;
@ -695,7 +695,7 @@ COMMIT;
-- fails because we do not own the shard
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420007, get_node_id(), get_node_id()+1);
COMMIT;
@ -703,7 +703,7 @@ COMMIT;
-- the user only allowed to delete their own shards
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420007))
@ -712,7 +712,7 @@ ROLLBACK;
-- the user only allowed to delete shards in a distributed transaction
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420007))
@ -722,7 +722,7 @@ ROLLBACK;
-- the user cannot delete non-existing shards
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420100))
@ -737,7 +737,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420000))
@ -754,7 +754,7 @@ ROLLBACK;
SET search_path TO metadata_sync_helpers;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
-- with an ugly trick, update the repmodel
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET repmodel = 't'
@ -765,7 +765,7 @@ ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
-- with an ugly trick, update the vartype of table from int to bigint
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partkey = '{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}'
@ -775,7 +775,7 @@ ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
-- with an ugly trick, update the partmethod of the table to not-valid
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = ''
@ -785,7 +785,7 @@ ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
-- with an ugly trick, update the partmethod of the table to not-valid
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = 'a'
@ -799,7 +799,7 @@ CREATE TABLE test_6(int_col int, text_col text);
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_add_partition_metadata ('test_5'::regclass, 'h', 'int_col', 500, 's');
SELECT citus_internal_add_partition_metadata ('test_6'::regclass, 'h', 'text_col', 500, 's');
@ -815,7 +815,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
CREATE TABLE test_8(int_col int, text_col text COLLATE "caseinsensitive");
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SET application_name to 'citus gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_add_partition_metadata ('test_7'::regclass, 'h', 'text_col', 500, 's');
SELECT citus_internal_add_partition_metadata ('test_8'::regclass, 'h', 'text_col', 500, 's');