diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 6516930e7..f26cf7443 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -14,8 +14,10 @@ #include "distributed/insert_select_planner.h" #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" +#include "distributed/resource_lock.h" #include "distributed/transaction_management.h" #include "executor/executor.h" #include "nodes/execnodes.h" @@ -60,6 +62,16 @@ CoordinatorInsertSelectExecScan(CustomScanState *node) ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator"))); + /* + * If we are dealing with partitioned table, we also need to lock its + * partitions. Here we only lock targetRelation, we acquire necessary + * locks on selected tables during execution of those select queries. + */ + if (PartitionedTable(targetRelationId)) + { + LockPartitionRelations(targetRelationId, RowExclusiveLock); + } + ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery, executorState); diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 2a210ee5f..2492356b9 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -25,6 +25,7 @@ #include "distributed/multi_resowner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_utility.h" +#include "distributed/resource_lock.h" #include "distributed/worker_protocol.h" #include "executor/execdebug.h" #include "commands/copy.h" @@ -248,6 +249,9 @@ RealTimeExecScan(CustomScanState *node) MultiPlan *multiPlan = scanState->multiPlan; Job *workerJob = multiPlan->workerJob; + /* we are taking locks on partitions of partitioned tables */ + LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + PrepareMasterJobDirectory(workerJob); MultiRealTimeExecute(workerJob); @@ -445,6 +449,9 @@ TaskTrackerExecScan(CustomScanState *node) MultiPlan *multiPlan = scanState->multiPlan; Job *workerJob = multiPlan->workerJob; + /* we are taking locks on partitions of partitioned tables */ + LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + PrepareMasterJobDirectory(workerJob); MultiTaskTrackerExecute(workerJob); diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 5b6def0fc..9d7e47446 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -34,6 +34,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" @@ -98,6 +99,7 @@ static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connect bool failOnError, int64 *rows); static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows); +static LOCKMODE LockModeForModifyTask(Task *task); /* @@ -296,6 +298,11 @@ AcquireExecutorMultiShardLocks(List *taskList) lockMode = ExclusiveLock; } + /* + * If we are dealing with a partition we are also taking locks on parent table + * to prevent deadlocks on concurrent operations on a partition and its parent. + */ + LockParentShardResourceIfPartition(task->anchorShardId, lockMode); LockShardResource(task->anchorShardId, lockMode); /* @@ -434,6 +441,12 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) /* prevent concurrent placement changes */ AcquireMetadataLocks(taskList); + /* + * We are taking locks on partitions of partitioned tables. These locks are + * necessary for locking tables that appear in the SELECT part of the query. + */ + LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + /* modify tasks are always assigned using first-replica policy */ workerJob->taskList = FirstReplicaAssignTaskList(taskList); } @@ -519,6 +532,9 @@ RouterSelectExecScan(CustomScanState *node) Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; + /* we are taking locks on partitions of partitioned tables */ + LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + if (list_length(taskList) > 0) { Task *task = (Task *) linitial(taskList); @@ -682,6 +698,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult char *queryString = task->queryString; bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); + ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId); + Oid relationId = shardInterval->relationId; + /* * Modifications for reference tables are always done using 2PC. First * ensure that distributed transaction is started. Then force the @@ -712,6 +731,17 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult */ connectionList = GetModifyConnections(task, taskRequiresTwoPhaseCommit); + /* + * If we are dealing with a partitioned table, we also need to lock its + * partitions. + */ + if (PartitionedTable(relationId)) + { + LOCKMODE lockMode = LockModeForModifyTask(task); + + LockPartitionRelations(relationId, lockMode); + } + /* prevent replicas of the same shard from diverging */ AcquireExecutorShardLock(task, operation); @@ -947,6 +977,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn int64 totalAffectedTupleCount = 0; ListCell *taskCell = NULL; Task *firstTask = NULL; + ShardInterval *firstShardInterval = NULL; int connectionFlags = 0; List *affectedTupleCountList = NIL; HTAB *shardConnectionHash = NULL; @@ -958,13 +989,25 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn return 0; } + /* + * In multi shard modification, we expect that all tasks operates on the + * same relation, so it is enough to acquire a lock on the first task's + * anchor relation's partitions. + */ + firstTask = (Task *) linitial(taskList); + firstShardInterval = LoadShardInterval(firstTask->anchorShardId); + if (PartitionedTable(firstShardInterval->relationId)) + { + LOCKMODE lockMode = LockModeForModifyTask(firstTask); + + LockPartitionRelations(firstShardInterval->relationId, lockMode); + } + /* ensure that there are no concurrent modifications on the same shards */ AcquireExecutorMultiShardLocks(taskList); BeginOrContinueCoordinatedTransaction(); - firstTask = (Task *) linitial(taskList); - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC || firstTask->replicationModel == REPLICATION_MODEL_2PC) { @@ -1464,3 +1507,29 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) return gotResponse && !commandFailed; } + + +/* + * LockModeForRouterModifyTask returns appropriate LOCKMODE for given router + * modify task. + */ +static LOCKMODE +LockModeForModifyTask(Task *task) +{ + LOCKMODE lockMode = NoLock; + if (task->taskType == DDL_TASK) + { + lockMode = AccessExclusiveLock; + } + else if (task->taskType == MODIFY_TASK) + { + lockMode = RowExclusiveLock; + } + else + { + /* we do not allow any other task type in these code path */ + Assert(false); + } + + return lockMode; +} diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 5de8d5bd1..0e0b27c48 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -523,6 +523,8 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) } } + multiPlan->relationIdList = localPlan->relationOids; + multiPlanData = (Node *) multiPlan; customScan->custom_private = list_make1(multiPlanData); diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 9a3ab06af..cd5c334ee 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -107,6 +107,7 @@ CopyNodeMultiPlan(COPYFUNC_ARGS) COPY_NODE_FIELD(workerJob); COPY_NODE_FIELD(masterQuery); COPY_SCALAR_FIELD(routerExecutable); + COPY_NODE_FIELD(relationIdList); COPY_NODE_FIELD(insertSelectSubquery); COPY_NODE_FIELD(insertTargetList); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index e5fb43fab..f2d988299 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -185,6 +185,7 @@ OutMultiPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(masterQuery); WRITE_BOOL_FIELD(routerExecutable); + WRITE_NODE_FIELD(relationIdList); WRITE_NODE_FIELD(insertSelectSubquery); WRITE_NODE_FIELD(insertTargetList); diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 6e597cb6e..db6ca6757 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -205,6 +205,7 @@ ReadMultiPlan(READFUNC_ARGS) READ_NODE_FIELD(workerJob); READ_NODE_FIELD(masterQuery); READ_BOOL_FIELD(routerExecutable); + READ_NODE_FIELD(relationIdList); READ_NODE_FIELD(insertSelectSubquery); READ_NODE_FIELD(insertTargetList); diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index d4da891ae..9e08d95a1 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -17,11 +17,13 @@ #include "c.h" #include "miscadmin.h" +#include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" -#include "distributed/multi_router_executor.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_planner.h" +#include "distributed/multi_router_executor.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" @@ -316,3 +318,69 @@ LockRelationShardResources(List *relationShardList, LOCKMODE lockMode) } } } + + +/* + * LockParentShardResourceIfPartition checks whether the given shard belongs + * to a partition. If it does, LockParentShardResourceIfPartition acquires a + * shard resource lock on the colocated shard of the parent table. + */ +void +LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode) +{ + ShardInterval *shardInterval = LoadShardInterval(shardId); + Oid relationId = shardInterval->relationId; + + if (PartitionTable(relationId)) + { + int shardIndex = ShardIndex(shardInterval); + Oid parentRelationId = PartitionParentOid(relationId); + uint64 parentShardId = ColocatedShardIdInRelation(parentRelationId, shardIndex); + + LockShardResource(parentShardId, lockMode); + } +} + + +/* + * LockPartitionsInRelationList iterates over given list and acquires locks on + * partitions of each partitioned table. It does nothing for non-partitioned tables. + */ +void +LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode) +{ + ListCell *relationIdCell = NULL; + + foreach(relationIdCell, relationIdList) + { + Oid relationId = lfirst_oid(relationIdCell); + if (PartitionedTable(relationId)) + { + LockPartitionRelations(relationId, lockmode); + } + } +} + + +/* + * LockPartitionRelations acquires relation lock on all partitions of given + * partitioned relation. This function expects that given relation is a + * partitioned relation. + */ +void +LockPartitionRelations(Oid relationId, LOCKMODE lockMode) +{ + /* + * PartitionList function generates partition list in the same order + * as PostgreSQL. Therefore we do not need to sort it before acquiring + * locks. + */ + List *partitionList = PartitionList(relationId); + ListCell *partitionCell = NULL; + + foreach(partitionCell, partitionList) + { + Oid partitionRelationId = lfirst_oid(partitionCell); + LockRelationOid(partitionRelationId, lockMode); + } +} diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 8f753a4e5..1e16286c4 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -228,6 +228,7 @@ typedef struct MultiPlan Job *workerJob; Query *masterQuery; bool routerExecutable; + List *relationIdList; /* INSERT ... SELECT via coordinator only */ Query *insertSelectSubquery; diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 896d1fbb0..96debf4dc 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -80,5 +80,11 @@ extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode); extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode); +/* Lock partitions of partitioned table */ +extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode); +extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode); + +/* Lock parent table's colocated shard resource */ +extern void LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode); #endif /* RESOURCE_LOCK_H */ diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index 272312f2a..b0893e0ec 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -923,15 +923,327 @@ ERROR: no partition of relation "multi_column_partitioning_1660068" found for r DETAIL: Partition key of the failing row contains (c1, c2) = (20, -20). CONTEXT: while executing command on localhost:57637 -- see data is loaded to multi-column partitioned table -SELECT * FROM multi_column_partitioning; +SELECT * FROM multi_column_partitioning ORDER BY 1, 2; c1 | c2 ----+----- 1 | 1 5 | -5 - 19 | -19 11 | -11 + 19 | -19 (4 rows) -DROP TABLE IF EXISTS partitioning_test_2012, partitioning_test_2013, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning; +-- +-- Tests for locks on partitioned tables +-- +CREATE TABLE partitioning_locks(id int, ref_id int, time date) PARTITION BY RANGE (time); +-- create its partitions +CREATE TABLE partitioning_locks_2009 PARTITION OF partitioning_locks FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +CREATE TABLE partitioning_locks_2010 PARTITION OF partitioning_locks FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); +-- distribute partitioned table +SELECT create_distributed_table('partitioning_locks', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- test locks on router SELECT +BEGIN; +SELECT * FROM partitioning_locks WHERE id = 1 ORDER BY 1, 2; + id | ref_id | time +----+--------+------ +(0 rows) + +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; + relation | locktype | mode +-------------------------+----------+----------------- + partitioning_locks | relation | AccessShareLock + partitioning_locks_2009 | relation | AccessShareLock + partitioning_locks_2010 | relation | AccessShareLock +(3 rows) + +COMMIT; +-- test locks on real-time SELECT +BEGIN; +SELECT * FROM partitioning_locks ORDER BY 1, 2; + id | ref_id | time +----+--------+------ +(0 rows) + +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; + relation | locktype | mode +-------------------------+----------+----------------- + partitioning_locks | relation | AccessShareLock + partitioning_locks_2009 | relation | AccessShareLock + partitioning_locks_2010 | relation | AccessShareLock +(3 rows) + +COMMIT; +-- test locks on task-tracker SELECT +SET citus.task_executor_type TO 'task-tracker'; +BEGIN; +SELECT * FROM partitioning_locks AS pl1 JOIN partitioning_locks AS pl2 ON pl1.id = pl2.ref_id ORDER BY 1, 2; + id | ref_id | time | id | ref_id | time +----+--------+------+----+--------+------ +(0 rows) + +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; + relation | locktype | mode +-------------------------+----------+----------------- + partitioning_locks | relation | AccessShareLock + partitioning_locks_2009 | relation | AccessShareLock + partitioning_locks_2010 | relation | AccessShareLock +(3 rows) + +COMMIT; +SET citus.task_executor_type TO 'real-time'; +-- test locks on INSERT +BEGIN; +INSERT INTO partitioning_locks VALUES(1, 1, '2009-01-01'); +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; + relation | locktype | mode +-------------------------+----------+------------------ + partitioning_locks | relation | AccessShareLock + partitioning_locks | relation | RowExclusiveLock + partitioning_locks_2009 | relation | AccessShareLock + partitioning_locks_2009 | relation | RowExclusiveLock + partitioning_locks_2010 | relation | AccessShareLock + partitioning_locks_2010 | relation | RowExclusiveLock +(6 rows) + +COMMIT; +-- test locks on UPDATE +BEGIN; +UPDATE partitioning_locks SET time = '2009-02-01' WHERE id = 1; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; + relation | locktype | mode +-------------------------+----------+------------------ + partitioning_locks | relation | AccessShareLock + partitioning_locks | relation | RowExclusiveLock + partitioning_locks_2009 | relation | AccessShareLock + partitioning_locks_2009 | relation | RowExclusiveLock + partitioning_locks_2010 | relation | AccessShareLock + partitioning_locks_2010 | relation | RowExclusiveLock +(6 rows) + +COMMIT; +-- test locks on DELETE +BEGIN; +DELETE FROM partitioning_locks WHERE id = 1; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; + relation | locktype | mode +-------------------------+----------+------------------ + partitioning_locks | relation | AccessShareLock + partitioning_locks | relation | RowExclusiveLock + partitioning_locks_2009 | relation | AccessShareLock + partitioning_locks_2009 | relation | RowExclusiveLock + partitioning_locks_2010 | relation | AccessShareLock + partitioning_locks_2010 | relation | RowExclusiveLock +(6 rows) + +COMMIT; +-- test locks on INSERT/SELECT +CREATE TABLE partitioning_locks_for_select(id int, ref_id int, time date); +SELECT create_distributed_table('partitioning_locks_for_select', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +BEGIN; +INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; + relation | locktype | mode +-------------------------------+----------+------------------ + partitioning_locks | relation | AccessShareLock + partitioning_locks | relation | RowExclusiveLock + partitioning_locks_2009 | relation | AccessShareLock + partitioning_locks_2009 | relation | RowExclusiveLock + partitioning_locks_2010 | relation | AccessShareLock + partitioning_locks_2010 | relation | RowExclusiveLock + partitioning_locks_for_select | relation | AccessShareLock +(7 rows) + +COMMIT; +-- test locks on coordinator INSERT/SELECT +BEGIN; +INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select LIMIT 5; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; + relation | locktype | mode +-------------------------------+----------+------------------ + partitioning_locks | relation | AccessShareLock + partitioning_locks | relation | RowExclusiveLock + partitioning_locks_2009 | relation | RowExclusiveLock + partitioning_locks_2010 | relation | RowExclusiveLock + partitioning_locks_for_select | relation | AccessShareLock +(5 rows) + +COMMIT; +-- test locks on master_modify_multiple_shards +BEGIN; +SELECT master_modify_multiple_shards('UPDATE partitioning_locks SET time = ''2009-03-01'''); + master_modify_multiple_shards +------------------------------- + 0 +(1 row) + +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; + relation | locktype | mode +-------------------------+----------+------------------ + partitioning_locks | relation | AccessShareLock + partitioning_locks | relation | RowExclusiveLock + partitioning_locks_2009 | relation | RowExclusiveLock + partitioning_locks_2010 | relation | RowExclusiveLock +(4 rows) + +COMMIT; +-- test locks on DDL +BEGIN; +ALTER TABLE partitioning_locks ADD COLUMN new_column int; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; + relation | locktype | mode +-------------------------+----------+--------------------- + partitioning_locks | relation | AccessExclusiveLock + partitioning_locks | relation | AccessShareLock + partitioning_locks_2009 | relation | AccessExclusiveLock + partitioning_locks_2010 | relation | AccessExclusiveLock +(4 rows) + +COMMIT; +-- test locks on TRUNCATE +BEGIN; +TRUNCATE partitioning_locks; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; + relation | locktype | mode +-------------------------+----------+--------------------- + partitioning_locks | relation | AccessExclusiveLock + partitioning_locks | relation | AccessShareLock + partitioning_locks_2009 | relation | AccessExclusiveLock + partitioning_locks_2009 | relation | AccessShareLock + partitioning_locks_2009 | relation | RowExclusiveLock + partitioning_locks_2009 | relation | ShareLock + partitioning_locks_2010 | relation | AccessExclusiveLock + partitioning_locks_2010 | relation | AccessShareLock + partitioning_locks_2010 | relation | RowExclusiveLock + partitioning_locks_2010 | relation | ShareLock +(10 rows) + +COMMIT; +-- test shard resource locks with master_modify_multiple_shards +BEGIN; +SELECT master_modify_multiple_shards('UPDATE partitioning_locks_2009 SET time = ''2009-03-01'''); + master_modify_multiple_shards +------------------------------- + 0 +(1 row) + +-- see the locks on parent table +SELECT + logicalrelid, + locktype, + mode +FROM + pg_locks AS l JOIN pg_dist_shard AS s +ON + l.objid = s.shardid +WHERE + logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND + pid = pg_backend_pid() +ORDER BY + 1, 2, 3; + logicalrelid | locktype | mode +-------------------------+----------+-------------------------- + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock +(12 rows) + +COMMIT; +-- test shard resource locks with TRUNCATE +BEGIN; +TRUNCATE partitioning_locks_2009; +-- see the locks on parent table +SELECT + logicalrelid, + locktype, + mode +FROM + pg_locks AS l JOIN pg_dist_shard AS s +ON + l.objid = s.shardid +WHERE + logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND + pid = pg_backend_pid() +ORDER BY + 1, 2, 3; + logicalrelid | locktype | mode +-------------------------+----------+-------------------------- + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock +(12 rows) + +COMMIT; +-- test shard resource locks with INSERT/SELECT +BEGIN; +INSERT INTO partitioning_locks_2009 SELECT * FROM partitioning_locks WHERE time >= '2009-01-01' AND time < '2010-01-01'; +-- see the locks on parent table +SELECT + logicalrelid, + locktype, + mode +FROM + pg_locks AS l JOIN pg_dist_shard AS s +ON + l.objid = s.shardid +WHERE + logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND + pid = pg_backend_pid() +ORDER BY + 1, 2, 3; + logicalrelid | locktype | mode +-------------------------+----------+-------------------------- + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock + partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock +(12 rows) + +COMMIT; +DROP TABLE +IF EXISTS + partitioning_test_2012, + partitioning_test_2013, + partitioned_events_table, + partitioned_users_table, + list_partitioned_events_table, + multi_column_partitioning, + partitioning_locks, + partitioning_locks_for_select; NOTICE: table "partitioning_test_2012" does not exist, skipping NOTICE: table "partitioning_test_2013" does not exist, skipping diff --git a/src/test/regress/expected/multi_partitioning_0.out b/src/test/regress/expected/multi_partitioning_0.out index b0d11e11c..056ef12a7 100644 --- a/src/test/regress/expected/multi_partitioning_0.out +++ b/src/test/regress/expected/multi_partitioning_0.out @@ -923,12 +923,207 @@ ERROR: relation "multi_column_partitioning" does not exist LINE 1: INSERT INTO multi_column_partitioning VALUES(20, -20); ^ -- see data is loaded to multi-column partitioned table -SELECT * FROM multi_column_partitioning; +SELECT * FROM multi_column_partitioning ORDER BY 1, 2; ERROR: relation "multi_column_partitioning" does not exist -LINE 1: SELECT * FROM multi_column_partitioning; +LINE 1: SELECT * FROM multi_column_partitioning ORDER BY 1, 2; ^ -DROP TABLE IF EXISTS partitioning_test_2012, partitioning_test_2013, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning; +-- +-- Tests for locks on partitioned tables +-- +CREATE TABLE partitioning_locks(id int, ref_id int, time date) PARTITION BY RANGE (time); +ERROR: syntax error at or near "PARTITION" +LINE 1: ...partitioning_locks(id int, ref_id int, time date) PARTITION ... + ^ +-- create its partitions +CREATE TABLE partitioning_locks_2009 PARTITION OF partitioning_locks FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_locks_2009 PARTITION OF partitioni... + ^ +CREATE TABLE partitioning_locks_2010 PARTITION OF partitioning_locks FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_locks_2010 PARTITION OF partitioni... + ^ +-- distribute partitioned table +SELECT create_distributed_table('partitioning_locks', 'id'); +ERROR: relation "partitioning_locks" does not exist +LINE 1: SELECT create_distributed_table('partitioning_locks', 'id'); + ^ +-- test locks on router SELECT +BEGIN; +SELECT * FROM partitioning_locks WHERE id = 1 ORDER BY 1, 2; +ERROR: relation "partitioning_locks" does not exist +LINE 1: SELECT * FROM partitioning_locks WHERE id = 1 ORDER BY 1, 2; + ^ +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test locks on real-time SELECT +BEGIN; +SELECT * FROM partitioning_locks ORDER BY 1, 2; +ERROR: relation "partitioning_locks" does not exist +LINE 1: SELECT * FROM partitioning_locks ORDER BY 1, 2; + ^ +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test locks on task-tracker SELECT +SET citus.task_executor_type TO 'task-tracker'; +BEGIN; +SELECT * FROM partitioning_locks AS pl1 JOIN partitioning_locks AS pl2 ON pl1.id = pl2.ref_id ORDER BY 1, 2; +ERROR: relation "partitioning_locks" does not exist +LINE 1: SELECT * FROM partitioning_locks AS pl1 JOIN partitioning_lo... + ^ +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +SET citus.task_executor_type TO 'real-time'; +-- test locks on INSERT +BEGIN; +INSERT INTO partitioning_locks VALUES(1, 1, '2009-01-01'); +ERROR: relation "partitioning_locks" does not exist +LINE 1: INSERT INTO partitioning_locks VALUES(1, 1, '2009-01-01'); + ^ +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test locks on UPDATE +BEGIN; +UPDATE partitioning_locks SET time = '2009-02-01' WHERE id = 1; +ERROR: relation "partitioning_locks" does not exist +LINE 1: UPDATE partitioning_locks SET time = '2009-02-01' WHERE id =... + ^ +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test locks on DELETE +BEGIN; +DELETE FROM partitioning_locks WHERE id = 1; +ERROR: relation "partitioning_locks" does not exist +LINE 1: DELETE FROM partitioning_locks WHERE id = 1; + ^ +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test locks on INSERT/SELECT +CREATE TABLE partitioning_locks_for_select(id int, ref_id int, time date); +SELECT create_distributed_table('partitioning_locks_for_select', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +BEGIN; +INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select; +ERROR: relation "partitioning_locks" does not exist +LINE 1: INSERT INTO partitioning_locks SELECT * FROM partitioning_lo... + ^ +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test locks on coordinator INSERT/SELECT +BEGIN; +INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select LIMIT 5; +ERROR: relation "partitioning_locks" does not exist +LINE 1: INSERT INTO partitioning_locks SELECT * FROM partitioning_lo... + ^ +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test locks on master_modify_multiple_shards +BEGIN; +SELECT master_modify_multiple_shards('UPDATE partitioning_locks SET time = ''2009-03-01'''); +ERROR: relation "partitioning_locks" does not exist +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test locks on DDL +BEGIN; +ALTER TABLE partitioning_locks ADD COLUMN new_column int; +ERROR: relation "partitioning_locks" does not exist +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test locks on TRUNCATE +BEGIN; +TRUNCATE partitioning_locks; +ERROR: relation "partitioning_locks" does not exist +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test shard resource locks with master_modify_multiple_shards +BEGIN; +SELECT master_modify_multiple_shards('UPDATE partitioning_locks_2009 SET time = ''2009-03-01'''); +ERROR: relation "partitioning_locks_2009" does not exist +-- see the locks on parent table +SELECT + logicalrelid, + locktype, + mode +FROM + pg_locks AS l JOIN pg_dist_shard AS s +ON + l.objid = s.shardid +WHERE + logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND + pid = pg_backend_pid() +ORDER BY + 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test shard resource locks with TRUNCATE +BEGIN; +TRUNCATE partitioning_locks_2009; +ERROR: relation "partitioning_locks_2009" does not exist +-- see the locks on parent table +SELECT + logicalrelid, + locktype, + mode +FROM + pg_locks AS l JOIN pg_dist_shard AS s +ON + l.objid = s.shardid +WHERE + logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND + pid = pg_backend_pid() +ORDER BY + 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +-- test shard resource locks with INSERT/SELECT +BEGIN; +INSERT INTO partitioning_locks_2009 SELECT * FROM partitioning_locks WHERE time >= '2009-01-01' AND time < '2010-01-01'; +ERROR: relation "partitioning_locks_2009" does not exist +LINE 1: INSERT INTO partitioning_locks_2009 SELECT * FROM partitioni... + ^ +-- see the locks on parent table +SELECT + logicalrelid, + locktype, + mode +FROM + pg_locks AS l JOIN pg_dist_shard AS s +ON + l.objid = s.shardid +WHERE + logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND + pid = pg_backend_pid() +ORDER BY + 1, 2, 3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +COMMIT; +DROP TABLE +IF EXISTS + partitioning_test_2012, + partitioning_test_2013, + partitioned_events_table, + partitioned_users_table, + list_partitioned_events_table, + multi_column_partitioning, + partitioning_locks, + partitioning_locks_for_select; NOTICE: table "partitioned_events_table" does not exist, skipping NOTICE: table "partitioned_users_table" does not exist, skipping NOTICE: table "list_partitioned_events_table" does not exist, skipping NOTICE: table "multi_column_partitioning" does not exist, skipping +NOTICE: table "partitioning_locks" does not exist, skipping diff --git a/src/test/regress/sql/multi_partitioning.sql b/src/test/regress/sql/multi_partitioning.sql index 715c67ef6..2b122cf61 100644 --- a/src/test/regress/sql/multi_partitioning.sql +++ b/src/test/regress/sql/multi_partitioning.sql @@ -660,6 +660,157 @@ INSERT INTO multi_column_partitioning_10_max_20_min VALUES(19, -19); INSERT INTO multi_column_partitioning VALUES(20, -20); -- see data is loaded to multi-column partitioned table -SELECT * FROM multi_column_partitioning; +SELECT * FROM multi_column_partitioning ORDER BY 1, 2; -DROP TABLE IF EXISTS partitioning_test_2012, partitioning_test_2013, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning; +-- +-- Tests for locks on partitioned tables +-- +CREATE TABLE partitioning_locks(id int, ref_id int, time date) PARTITION BY RANGE (time); + +-- create its partitions +CREATE TABLE partitioning_locks_2009 PARTITION OF partitioning_locks FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +CREATE TABLE partitioning_locks_2010 PARTITION OF partitioning_locks FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); + +-- distribute partitioned table +SELECT create_distributed_table('partitioning_locks', 'id'); + +-- test locks on router SELECT +BEGIN; +SELECT * FROM partitioning_locks WHERE id = 1 ORDER BY 1, 2; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +COMMIT; + +-- test locks on real-time SELECT +BEGIN; +SELECT * FROM partitioning_locks ORDER BY 1, 2; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +COMMIT; + +-- test locks on task-tracker SELECT +SET citus.task_executor_type TO 'task-tracker'; +BEGIN; +SELECT * FROM partitioning_locks AS pl1 JOIN partitioning_locks AS pl2 ON pl1.id = pl2.ref_id ORDER BY 1, 2; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +COMMIT; +SET citus.task_executor_type TO 'real-time'; + +-- test locks on INSERT +BEGIN; +INSERT INTO partitioning_locks VALUES(1, 1, '2009-01-01'); +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +COMMIT; + +-- test locks on UPDATE +BEGIN; +UPDATE partitioning_locks SET time = '2009-02-01' WHERE id = 1; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +COMMIT; + +-- test locks on DELETE +BEGIN; +DELETE FROM partitioning_locks WHERE id = 1; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +COMMIT; + +-- test locks on INSERT/SELECT +CREATE TABLE partitioning_locks_for_select(id int, ref_id int, time date); +SELECT create_distributed_table('partitioning_locks_for_select', 'id'); +BEGIN; +INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +COMMIT; + +-- test locks on coordinator INSERT/SELECT +BEGIN; +INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select LIMIT 5; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +COMMIT; + +-- test locks on master_modify_multiple_shards +BEGIN; +SELECT master_modify_multiple_shards('UPDATE partitioning_locks SET time = ''2009-03-01'''); +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +COMMIT; + +-- test locks on DDL +BEGIN; +ALTER TABLE partitioning_locks ADD COLUMN new_column int; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +COMMIT; + +-- test locks on TRUNCATE +BEGIN; +TRUNCATE partitioning_locks; +SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; +COMMIT; + +-- test shard resource locks with master_modify_multiple_shards +BEGIN; +SELECT master_modify_multiple_shards('UPDATE partitioning_locks_2009 SET time = ''2009-03-01'''); + +-- see the locks on parent table +SELECT + logicalrelid, + locktype, + mode +FROM + pg_locks AS l JOIN pg_dist_shard AS s +ON + l.objid = s.shardid +WHERE + logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND + pid = pg_backend_pid() +ORDER BY + 1, 2, 3; +COMMIT; + +-- test shard resource locks with TRUNCATE +BEGIN; +TRUNCATE partitioning_locks_2009; + +-- see the locks on parent table +SELECT + logicalrelid, + locktype, + mode +FROM + pg_locks AS l JOIN pg_dist_shard AS s +ON + l.objid = s.shardid +WHERE + logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND + pid = pg_backend_pid() +ORDER BY + 1, 2, 3; +COMMIT; + +-- test shard resource locks with INSERT/SELECT +BEGIN; +INSERT INTO partitioning_locks_2009 SELECT * FROM partitioning_locks WHERE time >= '2009-01-01' AND time < '2010-01-01'; + +-- see the locks on parent table +SELECT + logicalrelid, + locktype, + mode +FROM + pg_locks AS l JOIN pg_dist_shard AS s +ON + l.objid = s.shardid +WHERE + logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND + pid = pg_backend_pid() +ORDER BY + 1, 2, 3; +COMMIT; + +DROP TABLE +IF EXISTS + partitioning_test_2012, + partitioning_test_2013, + partitioned_events_table, + partitioned_users_table, + list_partitioned_events_table, + multi_column_partitioning, + partitioning_locks, + partitioning_locks_for_select;