mirror of https://github.com/citusdata/citus.git
* Addresses comments (adding PreprocessLockStatement fxn, adding quote literal string)
* Adds tests for distributed locks (both isolation and normal tests) * TRUNCATE now cannot be issued on a worker if the coordinator is not in the metadatapull/5899/head
parent
a250c4f350
commit
5d6ac5d0a1
|
@ -470,14 +470,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
|
||||
if (IsA(parsetree, LockStmt))
|
||||
{
|
||||
LockStmt *stmt = (LockStmt *) parsetree;
|
||||
|
||||
bool isTopLevel = context == PROCESS_UTILITY_TOPLEVEL;
|
||||
ErrorIfUnsupportedLockStmt(stmt, isTopLevel);
|
||||
|
||||
uint32 nowaitFlag = stmt->nowait ? DIST_LOCK_NOWAIT : 0;
|
||||
AcquireDistributedLockOnRelations(stmt->relations, stmt->mode,
|
||||
DIST_LOCK_VIEWS_RECUR | nowaitFlag);
|
||||
PreprocessLockStatement((LockStmt *) parsetree, context);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -1239,6 +1239,17 @@ AcquireDistributedLockOnRelations(List *relationList, LOCKMODE lockMode, uint32
|
|||
|
||||
if (distributedRelationList != NIL)
|
||||
{
|
||||
if (!IsCoordinator() && !CoordinatorAddedAsWorkerNode())
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errmsg(
|
||||
"Cannot acquire a distributed lock from a worker node since the "
|
||||
"coordinator is not in the metadata."),
|
||||
errhint(
|
||||
"Either run this command on the coordinator or add the coordinator "
|
||||
"in the metadata by using: SELECT citus_set_coordinator_host('<hostname>', <port>);")));
|
||||
}
|
||||
|
||||
bool nowait = (configs & DIST_LOCK_NOWAIT) > 0;
|
||||
AcquireDistributedLockOnRelations_Internal(distributedRelationList, lockMode,
|
||||
nowait);
|
||||
|
@ -1252,7 +1263,7 @@ AcquireDistributedLockOnRelations(List *relationList, LOCKMODE lockMode, uint32
|
|||
* - The relation id-s being locked do not exist
|
||||
* - Locking shard, but citus.enable_manual_changes_to_shards is false
|
||||
*/
|
||||
void
|
||||
static void
|
||||
ErrorIfUnsupportedLockStmt(LockStmt *stmt, bool isTopLevel)
|
||||
{
|
||||
RequireTransactionBlock(isTopLevel, "LOCK TABLE");
|
||||
|
@ -1272,3 +1283,21 @@ ErrorIfUnsupportedLockStmt(LockStmt *stmt, bool isTopLevel)
|
|||
ErrorIfIllegallyChangingKnownShard(relationId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* PreprocessLockStatement makes sure that the lock is allowed
|
||||
* before calling AcquireDistributedLockOnRelations on the locked tables/views
|
||||
* with flags DIST_LOCK_VIEWS_RECUR and DIST_LOCK_NOWAIT if nowait is true for
|
||||
* the lock statement
|
||||
*/
|
||||
void
|
||||
PreprocessLockStatement(LockStmt *stmt, ProcessUtilityContext context)
|
||||
{
|
||||
bool isTopLevel = context == PROCESS_UTILITY_TOPLEVEL;
|
||||
ErrorIfUnsupportedLockStmt(stmt, isTopLevel);
|
||||
|
||||
uint32 nowaitFlag = stmt->nowait ? DIST_LOCK_NOWAIT : 0;
|
||||
AcquireDistributedLockOnRelations(stmt->relations, stmt->mode,
|
||||
DIST_LOCK_VIEWS_RECUR | nowaitFlag);
|
||||
}
|
||||
|
|
|
@ -116,9 +116,10 @@ typedef enum CitusOperations
|
|||
#define DIST_LOCK_REFERENCING_TABLES 1 << 1
|
||||
#define DIST_LOCK_NOWAIT 1 << 2
|
||||
|
||||
#define LOCK_RELATION_IF_EXISTS "SELECT pg_catalog.lock_relation_if_exists(%s, '%s');"
|
||||
#define LOCK_RELATION_IF_EXISTS \
|
||||
"SELECT pg_catalog.lock_relation_if_exists(%s, quote_literal_cstr(%s));"
|
||||
#define LOCK_RELATION_IF_EXISTS_NOWAIT \
|
||||
"SELECT pg_catalog.lock_relation_if_exists(%s, '%s', nowait => true);"
|
||||
"SELECT pg_catalog.lock_relation_if_exists(%s, quote_literal_cstr(%s), nowait => true);"
|
||||
|
||||
/* Lock shard/relation metadata for safe modifications */
|
||||
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
|
||||
|
@ -163,5 +164,5 @@ extern LOCKMODE LockModeCStringToLockMode(const char *lockModeName);
|
|||
extern const char * LockModeToLockModeCString(LOCKMODE lockMode);
|
||||
extern void AcquireDistributedLockOnRelations(List *relationList, LOCKMODE lockMode,
|
||||
uint32 configs);
|
||||
extern void ErrorIfUnsupportedLockStmt(LockStmt *stmt, bool isTopLevel);
|
||||
extern void PreprocessLockStatement(LockStmt *stmt, ProcessUtilityContext context);
|
||||
#endif /* RESOURCE_LOCK_H */
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
CREATE SCHEMA distribute_lock_tests;
|
||||
SET search_path TO distribute_lock_tests;
|
||||
SET citus.next_shard_id TO 10000;
|
||||
CREATE TABLE dist_table(a int);
|
||||
SELECT create_distributed_table('dist_table', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
|
||||
-- Test acquiring lock outside transaction
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ERROR: LOCK TABLE can only be used in transaction blocks
|
||||
-- Test acquiring lock inside procedure
|
||||
DO $$
|
||||
BEGIN
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
END;
|
||||
$$;
|
||||
-- Test that when the user does not have the required permissions to lock
|
||||
-- the locks are not forwarded to the workers
|
||||
CREATE USER read_only_user;
|
||||
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
|
||||
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
|
||||
GRANT SELECT ON distribute_lock_tests.dist_table TO read_only_user;
|
||||
SET ROLE read_only_user;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
BEGIN;
|
||||
LOCK distribute_lock_tests.dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ERROR: permission denied for schema distribute_lock_tests
|
||||
ROLLBACK;
|
||||
RESET citus.log_local_commands;
|
||||
RESET ROLE;
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO distribute_lock_tests;
|
||||
-- Test trying to lock from a worker when the coordinator is not in the metadata
|
||||
BEGIN;
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ERROR: Cannot acquire a distributed lock from a worker node since the coordinator is not in the metadata.
|
||||
HINT: Either run this command on the coordinator or add the coordinator in the metadata by using: SELECT citus_set_coordinator_host('<hostname>', <port>);
|
||||
ROLLBACK;
|
||||
-- Verify that the same restriction does not apply to worker local tables
|
||||
CREATE TABLE local_table(a int);
|
||||
-- Verify that no locks will be distributed for the local lock
|
||||
SET citus.log_remote_commands TO ON;
|
||||
BEGIN;
|
||||
LOCK local_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
RESET citus.log_remote_commands;
|
||||
-- Cleanup local table
|
||||
DROP TABLE local_table;
|
||||
SET citus.enable_manual_changes_to_shards TO OFF;
|
||||
-- Test locking a shard
|
||||
BEGIN;
|
||||
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
|
||||
ERROR: cannot modify "dist_table_10000" because it is a shard of a distributed table
|
||||
HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly
|
||||
ROLLBACK;
|
||||
-- Test allowing shard locks with the citus.enable_manual_changes_to_shards guc
|
||||
SET citus.enable_manual_changes_to_shards TO ON;
|
||||
BEGIN;
|
||||
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
RESET citus.enable_manual_changes_to_shards;
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA distribute_lock_tests CASCADE;
|
||||
NOTICE: drop cascades to table distribute_lock_tests.dist_table
|
||||
DROP USER read_only_user;
|
|
@ -94,5 +94,6 @@ test: isolation_metadata_sync_deadlock
|
|||
test: isolation_replicated_dist_on_mx
|
||||
test: isolation_replicate_reference_tables_to_coordinator
|
||||
test: isolation_multiuser_locking
|
||||
test: isolation_acquire_distributed_locks
|
||||
|
||||
test: isolation_check_mx
|
||||
|
|
|
@ -59,6 +59,7 @@ test: locally_execute_intermediate_results
|
|||
test: multi_mx_alter_distributed_table
|
||||
test: update_colocation_mx
|
||||
test: resync_metadata_with_sequences
|
||||
test: distribute_locks
|
||||
|
||||
# should be executed sequentially because it modifies metadata
|
||||
test: local_shard_execution_dropped_column
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
#include "isolation_mx_common.include.spec"
|
||||
|
||||
setup
|
||||
{
|
||||
CREATE TABLE dist_table(a int);
|
||||
CREATE TABLE local_table(a int);
|
||||
CREATE TABLE ref_table(a int);
|
||||
|
||||
SELECT create_distributed_table('dist_table', 'a');
|
||||
SELECT create_reference_table('ref_table');
|
||||
|
||||
CREATE VIEW sub_view(a) AS
|
||||
SELECT 2 * a AS a
|
||||
FROM ref_table;
|
||||
|
||||
CREATE VIEW main_view AS
|
||||
SELECT t1.a a1, t2.a a2, t3.a a3
|
||||
FROM dist_table t1
|
||||
JOIN local_table t2 ON t1.a = t2.a
|
||||
JOIN sub_view t3 ON t2.a = t3.a;
|
||||
|
||||
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
|
||||
INSERT INTO local_table SELECT n FROM generate_series(1, 5) n;
|
||||
INSERT INTO ref_table SELECT n FROM generate_series(1, 5) n;
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP VIEW main_view;
|
||||
DROP VIEW sub_view;
|
||||
DROP TABLE dist_table;
|
||||
DROP TABLE local_table;
|
||||
DROP TABLE ref_table;
|
||||
|
||||
SELECT citus_internal.restore_isolation_tester_func();
|
||||
}
|
||||
|
||||
session "deadlock-checker"
|
||||
|
||||
step "deadlock-checker-call"
|
||||
{
|
||||
SELECT check_distributed_deadlocks();
|
||||
}
|
||||
|
||||
// coordinator session
|
||||
session "s1"
|
||||
|
||||
step "s1-add-coordinator-to-metadata"
|
||||
{
|
||||
SELECT citus_set_coordinator_host('localhost', 57636);
|
||||
}
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s1-acquire-aggresive-lock-on-dist-table"
|
||||
{
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "s1-acquire-aggresive-lock-on-dist-table-nowait"
|
||||
{
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE NOWAIT;
|
||||
}
|
||||
|
||||
step "s1-acquire-weak-lock-on-dist-table"
|
||||
{
|
||||
LOCK dist_table IN ACCESS SHARE MODE;
|
||||
}
|
||||
|
||||
step "s1-acquire-aggresive-lock-on-view"
|
||||
{
|
||||
LOCK main_view IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "s1-acquire-aggresive-lock-on-view-nowait"
|
||||
{
|
||||
LOCK main_view IN ACCESS EXCLUSIVE MODE NOWAIT;
|
||||
}
|
||||
|
||||
step "s1-lock-all"
|
||||
{
|
||||
LOCK dist_table, local_table, ref_table, main_view, sub_view IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "s1-read-dist-table"
|
||||
{
|
||||
SELECT * FROM dist_table;
|
||||
}
|
||||
|
||||
step "s1-read-ref-table"
|
||||
{
|
||||
SELECT * FROM ref_table;
|
||||
}
|
||||
|
||||
step "s1-rollback"
|
||||
{
|
||||
ROLLBACK;
|
||||
}
|
||||
|
||||
// worker 1 xact session
|
||||
session "s2"
|
||||
|
||||
step "s2-start-session-level-connection"
|
||||
{
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
}
|
||||
|
||||
step "s2-begin"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
}
|
||||
|
||||
step "s2-read-dist-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM dist_table');
|
||||
}
|
||||
|
||||
step "s2-read-ref-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table');
|
||||
}
|
||||
|
||||
step "s2-acquire-aggressive-lock-dist-table" {
|
||||
SELECT run_commands_on_session_level_connection_to_node('LOCK dist_table IN ACCESS EXCLUSIVE MODE');
|
||||
}
|
||||
|
||||
step "s2-acquire-aggressive-lock-dist-table-nowait" {
|
||||
SELECT run_commands_on_session_level_connection_to_node('LOCK dist_table IN ACCESS EXCLUSIVE MODE NOWAIT');
|
||||
}
|
||||
|
||||
step "s2-lock-reference-table"
|
||||
{
|
||||
LOCK ref_table IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "s2-rollback"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('ROLLBACK');
|
||||
}
|
||||
|
||||
step "s2-stop-connection"
|
||||
{
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
}
|
||||
|
||||
// worker 2 xact session
|
||||
session "s3"
|
||||
|
||||
|
||||
step "s3-start-session-level-connection"
|
||||
{
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
}
|
||||
|
||||
step "s3-begin"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
}
|
||||
|
||||
step "s3-read-dist-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM dist_table');
|
||||
}
|
||||
|
||||
step "s3-acquire-aggressive-lock-dist-table" {
|
||||
SELECT run_commands_on_session_level_connection_to_node('LOCK dist_table IN ACCESS EXCLUSIVE MODE');
|
||||
}
|
||||
|
||||
step "s3-acquire-aggressive-lock-dist-table-nowait" {
|
||||
SELECT run_commands_on_session_level_connection_to_node('LOCK dist_table IN ACCESS EXCLUSIVE MODE NOWAIT');
|
||||
}
|
||||
|
||||
step "s3-rollback"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('ROLLBACK');
|
||||
}
|
||||
|
||||
step "s3-stop-connection"
|
||||
{
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
}
|
||||
|
||||
permutation "s1-begin" "s1-acquire-aggresive-lock-on-dist-table" "s2-start-session-level-connection" "s2-begin" "s2-read-dist-table" "s1-rollback" "s2-rollback" "s2-stop-connection"
|
||||
permutation "s1-add-coordinator-to-metadata" "s1-begin" "s1-acquire-aggresive-lock-on-dist-table" "s2-start-session-level-connection" "s2-begin" "s2-acquire-aggressive-lock-dist-table" "s1-rollback" "s1-read-dist-table" "s2-rollback" "s2-stop-connection"
|
||||
permutation "s1-add-coordinator-to-metadata" "s1-begin" "s1-acquire-aggresive-lock-on-dist-table" "s2-start-session-level-connection" "s2-begin" "s2-acquire-aggressive-lock-dist-table-nowait" "s2-rollback" "s1-rollback" "s2-stop-connection"
|
||||
permutation "s1-add-coordinator-to-metadata" "s2-start-session-level-connection" "s2-begin" "s2-acquire-aggressive-lock-dist-table-nowait" "s1-begin" "s1-acquire-aggresive-lock-on-dist-table-nowait" "s1-rollback" "s2-rollback" "s2-stop-connection"
|
||||
permutation "s1-add-coordinator-to-metadata" "s2-start-session-level-connection" "s2-begin" "s3-start-session-level-connection" "s3-begin" "s2-acquire-aggressive-lock-dist-table" "s3-acquire-aggressive-lock-dist-table" "s2-rollback" "s2-read-dist-table" "s3-rollback" "s2-stop-connection" "s3-stop-connection"
|
||||
permutation "s1-add-coordinator-to-metadata" "s3-start-session-level-connection" "s3-begin" "s3-read-dist-table" "s2-start-session-level-connection" "s2-begin" "s2-acquire-aggressive-lock-dist-table" "s3-acquire-aggressive-lock-dist-table" "deadlock-checker-call" "s2-rollback" "s3-rollback" "s2-stop-connection" "s3-stop-connection"
|
||||
permutation "s1-add-coordinator-to-metadata" "s1-begin" "s1-acquire-weak-lock-on-dist-table" "s2-start-session-level-connection" "s2-begin" "s2-read-dist-table" "s2-acquire-aggressive-lock-dist-table" "s1-rollback" "s2-rollback" "s2-stop-connection"
|
||||
permutation "s1-add-coordinator-to-metadata" "s2-start-session-level-connection" "s2-begin" "s2-lock-reference-table" "s1-begin" "s1-read-ref-table" "s2-rollback" "s1-rollback" "s2-stop-connection"
|
||||
permutation "s1-begin" "s1-acquire-aggresive-lock-on-view" "s2-start-session-level-connection" "s2-begin" "s2-read-dist-table" "s1-rollback" "s2-rollback" "s2-stop-connection"
|
||||
permutation "s1-add-coordinator-to-metadata" "s1-begin" "s1-acquire-aggresive-lock-on-view" "s2-start-session-level-connection" "s2-begin" "s2-acquire-aggressive-lock-dist-table" "s1-rollback" "s2-rollback" "s2-stop-connection"
|
||||
permutation "s1-begin" "s1-acquire-aggresive-lock-on-view" "s2-start-session-level-connection" "s2-begin" "s2-read-ref-table" "s1-rollback" "s2-rollback" "s2-stop-connection"
|
||||
permutation "s1-add-coordinator-to-metadata" "s2-start-session-level-connection" "s2-begin" "s2-acquire-aggressive-lock-dist-table" "s1-acquire-aggresive-lock-on-view-nowait" "s1-rollback" "s2-rollback" "s2-stop-connection"
|
||||
permutation "s1-begin" "s1-lock-all" "s2-start-session-level-connection" "s2-begin" "s2-read-ref-table" "s3-start-session-level-connection" "s3-begin" "s3-acquire-aggressive-lock-dist-table-nowait" "s1-rollback" "s2-rollback" "s3-rollback" "s2-stop-connection" "s3-stop-connection"
|
|
@ -0,0 +1,77 @@
|
|||
CREATE SCHEMA distribute_lock_tests;
|
||||
SET search_path TO distribute_lock_tests;
|
||||
|
||||
SET citus.next_shard_id TO 10000;
|
||||
|
||||
CREATE TABLE dist_table(a int);
|
||||
|
||||
SELECT create_distributed_table('dist_table', 'a');
|
||||
|
||||
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
|
||||
|
||||
-- Test acquiring lock outside transaction
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
|
||||
-- Test acquiring lock inside procedure
|
||||
DO $$
|
||||
BEGIN
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- Test that when the user does not have the required permissions to lock
|
||||
-- the locks are not forwarded to the workers
|
||||
CREATE USER read_only_user;
|
||||
GRANT SELECT ON distribute_lock_tests.dist_table TO read_only_user;
|
||||
SET ROLE read_only_user;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
|
||||
BEGIN;
|
||||
LOCK distribute_lock_tests.dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
RESET citus.log_local_commands;
|
||||
RESET ROLE;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO distribute_lock_tests;
|
||||
|
||||
-- Test trying to lock from a worker when the coordinator is not in the metadata
|
||||
BEGIN;
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
-- Verify that the same restriction does not apply to worker local tables
|
||||
CREATE TABLE local_table(a int);
|
||||
|
||||
-- Verify that no locks will be distributed for the local lock
|
||||
SET citus.log_remote_commands TO ON;
|
||||
|
||||
BEGIN;
|
||||
LOCK local_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
RESET citus.log_remote_commands;
|
||||
|
||||
-- Cleanup local table
|
||||
DROP TABLE local_table;
|
||||
|
||||
SET citus.enable_manual_changes_to_shards TO OFF;
|
||||
|
||||
-- Test locking a shard
|
||||
BEGIN;
|
||||
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
-- Test allowing shard locks with the citus.enable_manual_changes_to_shards guc
|
||||
SET citus.enable_manual_changes_to_shards TO ON;
|
||||
|
||||
BEGIN;
|
||||
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
RESET citus.enable_manual_changes_to_shards;
|
||||
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA distribute_lock_tests CASCADE;
|
||||
DROP USER read_only_user;
|
Loading…
Reference in New Issue