mirror of https://github.com/citusdata/citus.git
Merge pull request #1544 from citusdata/acquire_locks_for_partitioned_table_ops
Acquire proper locks for partitioned table operationspull/1551/head
commit
b7e55e0c81
|
@ -14,8 +14,10 @@
|
||||||
#include "distributed/insert_select_planner.h"
|
#include "distributed/insert_select_planner.h"
|
||||||
#include "distributed/multi_copy.h"
|
#include "distributed/multi_copy.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_planner.h"
|
#include "distributed/multi_planner.h"
|
||||||
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
#include "nodes/execnodes.h"
|
#include "nodes/execnodes.h"
|
||||||
|
@ -60,6 +62,16 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
|
||||||
|
|
||||||
ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator")));
|
ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator")));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we are dealing with partitioned table, we also need to lock its
|
||||||
|
* partitions. Here we only lock targetRelation, we acquire necessary
|
||||||
|
* locks on selected tables during execution of those select queries.
|
||||||
|
*/
|
||||||
|
if (PartitionedTable(targetRelationId))
|
||||||
|
{
|
||||||
|
LockPartitionRelations(targetRelationId, RowExclusiveLock);
|
||||||
|
}
|
||||||
|
|
||||||
ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery,
|
ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery,
|
||||||
executorState);
|
executorState);
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "distributed/multi_resowner.h"
|
#include "distributed/multi_resowner.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
#include "distributed/multi_utility.h"
|
#include "distributed/multi_utility.h"
|
||||||
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "executor/execdebug.h"
|
#include "executor/execdebug.h"
|
||||||
#include "commands/copy.h"
|
#include "commands/copy.h"
|
||||||
|
@ -248,6 +249,9 @@ RealTimeExecScan(CustomScanState *node)
|
||||||
MultiPlan *multiPlan = scanState->multiPlan;
|
MultiPlan *multiPlan = scanState->multiPlan;
|
||||||
Job *workerJob = multiPlan->workerJob;
|
Job *workerJob = multiPlan->workerJob;
|
||||||
|
|
||||||
|
/* we are taking locks on partitions of partitioned tables */
|
||||||
|
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock);
|
||||||
|
|
||||||
PrepareMasterJobDirectory(workerJob);
|
PrepareMasterJobDirectory(workerJob);
|
||||||
MultiRealTimeExecute(workerJob);
|
MultiRealTimeExecute(workerJob);
|
||||||
|
|
||||||
|
@ -445,6 +449,9 @@ TaskTrackerExecScan(CustomScanState *node)
|
||||||
MultiPlan *multiPlan = scanState->multiPlan;
|
MultiPlan *multiPlan = scanState->multiPlan;
|
||||||
Job *workerJob = multiPlan->workerJob;
|
Job *workerJob = multiPlan->workerJob;
|
||||||
|
|
||||||
|
/* we are taking locks on partitions of partitioned tables */
|
||||||
|
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock);
|
||||||
|
|
||||||
PrepareMasterJobDirectory(workerJob);
|
PrepareMasterJobDirectory(workerJob);
|
||||||
MultiTaskTrackerExecute(workerJob);
|
MultiTaskTrackerExecute(workerJob);
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,7 @@
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_planner.h"
|
#include "distributed/multi_planner.h"
|
||||||
#include "distributed/multi_router_executor.h"
|
#include "distributed/multi_router_executor.h"
|
||||||
|
@ -98,6 +99,7 @@ static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connect
|
||||||
bool failOnError, int64 *rows);
|
bool failOnError, int64 *rows);
|
||||||
static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError,
|
static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError,
|
||||||
int64 *rows);
|
int64 *rows);
|
||||||
|
static LOCKMODE LockModeForModifyTask(Task *task);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -296,6 +298,11 @@ AcquireExecutorMultiShardLocks(List *taskList)
|
||||||
lockMode = ExclusiveLock;
|
lockMode = ExclusiveLock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we are dealing with a partition we are also taking locks on parent table
|
||||||
|
* to prevent deadlocks on concurrent operations on a partition and its parent.
|
||||||
|
*/
|
||||||
|
LockParentShardResourceIfPartition(task->anchorShardId, lockMode);
|
||||||
LockShardResource(task->anchorShardId, lockMode);
|
LockShardResource(task->anchorShardId, lockMode);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -434,6 +441,12 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
/* prevent concurrent placement changes */
|
/* prevent concurrent placement changes */
|
||||||
AcquireMetadataLocks(taskList);
|
AcquireMetadataLocks(taskList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We are taking locks on partitions of partitioned tables. These locks are
|
||||||
|
* necessary for locking tables that appear in the SELECT part of the query.
|
||||||
|
*/
|
||||||
|
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock);
|
||||||
|
|
||||||
/* modify tasks are always assigned using first-replica policy */
|
/* modify tasks are always assigned using first-replica policy */
|
||||||
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
|
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
|
||||||
}
|
}
|
||||||
|
@ -519,6 +532,9 @@ RouterSelectExecScan(CustomScanState *node)
|
||||||
Job *workerJob = multiPlan->workerJob;
|
Job *workerJob = multiPlan->workerJob;
|
||||||
List *taskList = workerJob->taskList;
|
List *taskList = workerJob->taskList;
|
||||||
|
|
||||||
|
/* we are taking locks on partitions of partitioned tables */
|
||||||
|
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock);
|
||||||
|
|
||||||
if (list_length(taskList) > 0)
|
if (list_length(taskList) > 0)
|
||||||
{
|
{
|
||||||
Task *task = (Task *) linitial(taskList);
|
Task *task = (Task *) linitial(taskList);
|
||||||
|
@ -682,6 +698,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
|
||||||
char *queryString = task->queryString;
|
char *queryString = task->queryString;
|
||||||
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
|
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
|
||||||
|
|
||||||
|
ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId);
|
||||||
|
Oid relationId = shardInterval->relationId;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Modifications for reference tables are always done using 2PC. First
|
* Modifications for reference tables are always done using 2PC. First
|
||||||
* ensure that distributed transaction is started. Then force the
|
* ensure that distributed transaction is started. Then force the
|
||||||
|
@ -712,6 +731,17 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
|
||||||
*/
|
*/
|
||||||
connectionList = GetModifyConnections(task, taskRequiresTwoPhaseCommit);
|
connectionList = GetModifyConnections(task, taskRequiresTwoPhaseCommit);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we are dealing with a partitioned table, we also need to lock its
|
||||||
|
* partitions.
|
||||||
|
*/
|
||||||
|
if (PartitionedTable(relationId))
|
||||||
|
{
|
||||||
|
LOCKMODE lockMode = LockModeForModifyTask(task);
|
||||||
|
|
||||||
|
LockPartitionRelations(relationId, lockMode);
|
||||||
|
}
|
||||||
|
|
||||||
/* prevent replicas of the same shard from diverging */
|
/* prevent replicas of the same shard from diverging */
|
||||||
AcquireExecutorShardLock(task, operation);
|
AcquireExecutorShardLock(task, operation);
|
||||||
|
|
||||||
|
@ -947,6 +977,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
int64 totalAffectedTupleCount = 0;
|
int64 totalAffectedTupleCount = 0;
|
||||||
ListCell *taskCell = NULL;
|
ListCell *taskCell = NULL;
|
||||||
Task *firstTask = NULL;
|
Task *firstTask = NULL;
|
||||||
|
ShardInterval *firstShardInterval = NULL;
|
||||||
int connectionFlags = 0;
|
int connectionFlags = 0;
|
||||||
List *affectedTupleCountList = NIL;
|
List *affectedTupleCountList = NIL;
|
||||||
HTAB *shardConnectionHash = NULL;
|
HTAB *shardConnectionHash = NULL;
|
||||||
|
@ -958,13 +989,25 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In multi shard modification, we expect that all tasks operates on the
|
||||||
|
* same relation, so it is enough to acquire a lock on the first task's
|
||||||
|
* anchor relation's partitions.
|
||||||
|
*/
|
||||||
|
firstTask = (Task *) linitial(taskList);
|
||||||
|
firstShardInterval = LoadShardInterval(firstTask->anchorShardId);
|
||||||
|
if (PartitionedTable(firstShardInterval->relationId))
|
||||||
|
{
|
||||||
|
LOCKMODE lockMode = LockModeForModifyTask(firstTask);
|
||||||
|
|
||||||
|
LockPartitionRelations(firstShardInterval->relationId, lockMode);
|
||||||
|
}
|
||||||
|
|
||||||
/* ensure that there are no concurrent modifications on the same shards */
|
/* ensure that there are no concurrent modifications on the same shards */
|
||||||
AcquireExecutorMultiShardLocks(taskList);
|
AcquireExecutorMultiShardLocks(taskList);
|
||||||
|
|
||||||
BeginOrContinueCoordinatedTransaction();
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
firstTask = (Task *) linitial(taskList);
|
|
||||||
|
|
||||||
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
|
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
|
||||||
firstTask->replicationModel == REPLICATION_MODEL_2PC)
|
firstTask->replicationModel == REPLICATION_MODEL_2PC)
|
||||||
{
|
{
|
||||||
|
@ -1464,3 +1507,29 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows)
|
||||||
|
|
||||||
return gotResponse && !commandFailed;
|
return gotResponse && !commandFailed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LockModeForRouterModifyTask returns appropriate LOCKMODE for given router
|
||||||
|
* modify task.
|
||||||
|
*/
|
||||||
|
static LOCKMODE
|
||||||
|
LockModeForModifyTask(Task *task)
|
||||||
|
{
|
||||||
|
LOCKMODE lockMode = NoLock;
|
||||||
|
if (task->taskType == DDL_TASK)
|
||||||
|
{
|
||||||
|
lockMode = AccessExclusiveLock;
|
||||||
|
}
|
||||||
|
else if (task->taskType == MODIFY_TASK)
|
||||||
|
{
|
||||||
|
lockMode = RowExclusiveLock;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* we do not allow any other task type in these code path */
|
||||||
|
Assert(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
return lockMode;
|
||||||
|
}
|
||||||
|
|
|
@ -523,6 +523,8 @@ FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
multiPlan->relationIdList = localPlan->relationOids;
|
||||||
|
|
||||||
multiPlanData = (Node *) multiPlan;
|
multiPlanData = (Node *) multiPlan;
|
||||||
|
|
||||||
customScan->custom_private = list_make1(multiPlanData);
|
customScan->custom_private = list_make1(multiPlanData);
|
||||||
|
|
|
@ -107,6 +107,7 @@ CopyNodeMultiPlan(COPYFUNC_ARGS)
|
||||||
COPY_NODE_FIELD(workerJob);
|
COPY_NODE_FIELD(workerJob);
|
||||||
COPY_NODE_FIELD(masterQuery);
|
COPY_NODE_FIELD(masterQuery);
|
||||||
COPY_SCALAR_FIELD(routerExecutable);
|
COPY_SCALAR_FIELD(routerExecutable);
|
||||||
|
COPY_NODE_FIELD(relationIdList);
|
||||||
|
|
||||||
COPY_NODE_FIELD(insertSelectSubquery);
|
COPY_NODE_FIELD(insertSelectSubquery);
|
||||||
COPY_NODE_FIELD(insertTargetList);
|
COPY_NODE_FIELD(insertTargetList);
|
||||||
|
|
|
@ -185,6 +185,7 @@ OutMultiPlan(OUTFUNC_ARGS)
|
||||||
WRITE_NODE_FIELD(workerJob);
|
WRITE_NODE_FIELD(workerJob);
|
||||||
WRITE_NODE_FIELD(masterQuery);
|
WRITE_NODE_FIELD(masterQuery);
|
||||||
WRITE_BOOL_FIELD(routerExecutable);
|
WRITE_BOOL_FIELD(routerExecutable);
|
||||||
|
WRITE_NODE_FIELD(relationIdList);
|
||||||
|
|
||||||
WRITE_NODE_FIELD(insertSelectSubquery);
|
WRITE_NODE_FIELD(insertSelectSubquery);
|
||||||
WRITE_NODE_FIELD(insertTargetList);
|
WRITE_NODE_FIELD(insertTargetList);
|
||||||
|
|
|
@ -205,6 +205,7 @@ ReadMultiPlan(READFUNC_ARGS)
|
||||||
READ_NODE_FIELD(workerJob);
|
READ_NODE_FIELD(workerJob);
|
||||||
READ_NODE_FIELD(masterQuery);
|
READ_NODE_FIELD(masterQuery);
|
||||||
READ_BOOL_FIELD(routerExecutable);
|
READ_BOOL_FIELD(routerExecutable);
|
||||||
|
READ_NODE_FIELD(relationIdList);
|
||||||
|
|
||||||
READ_NODE_FIELD(insertSelectSubquery);
|
READ_NODE_FIELD(insertSelectSubquery);
|
||||||
READ_NODE_FIELD(insertTargetList);
|
READ_NODE_FIELD(insertTargetList);
|
||||||
|
|
|
@ -17,11 +17,13 @@
|
||||||
#include "c.h"
|
#include "c.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_router_executor.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_planner.h"
|
#include "distributed/multi_planner.h"
|
||||||
|
#include "distributed/multi_router_executor.h"
|
||||||
#include "distributed/relay_utility.h"
|
#include "distributed/relay_utility.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
|
@ -316,3 +318,69 @@ LockRelationShardResources(List *relationShardList, LOCKMODE lockMode)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LockParentShardResourceIfPartition checks whether the given shard belongs
|
||||||
|
* to a partition. If it does, LockParentShardResourceIfPartition acquires a
|
||||||
|
* shard resource lock on the colocated shard of the parent table.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode)
|
||||||
|
{
|
||||||
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
|
Oid relationId = shardInterval->relationId;
|
||||||
|
|
||||||
|
if (PartitionTable(relationId))
|
||||||
|
{
|
||||||
|
int shardIndex = ShardIndex(shardInterval);
|
||||||
|
Oid parentRelationId = PartitionParentOid(relationId);
|
||||||
|
uint64 parentShardId = ColocatedShardIdInRelation(parentRelationId, shardIndex);
|
||||||
|
|
||||||
|
LockShardResource(parentShardId, lockMode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LockPartitionsInRelationList iterates over given list and acquires locks on
|
||||||
|
* partitions of each partitioned table. It does nothing for non-partitioned tables.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode)
|
||||||
|
{
|
||||||
|
ListCell *relationIdCell = NULL;
|
||||||
|
|
||||||
|
foreach(relationIdCell, relationIdList)
|
||||||
|
{
|
||||||
|
Oid relationId = lfirst_oid(relationIdCell);
|
||||||
|
if (PartitionedTable(relationId))
|
||||||
|
{
|
||||||
|
LockPartitionRelations(relationId, lockmode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LockPartitionRelations acquires relation lock on all partitions of given
|
||||||
|
* partitioned relation. This function expects that given relation is a
|
||||||
|
* partitioned relation.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
LockPartitionRelations(Oid relationId, LOCKMODE lockMode)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* PartitionList function generates partition list in the same order
|
||||||
|
* as PostgreSQL. Therefore we do not need to sort it before acquiring
|
||||||
|
* locks.
|
||||||
|
*/
|
||||||
|
List *partitionList = PartitionList(relationId);
|
||||||
|
ListCell *partitionCell = NULL;
|
||||||
|
|
||||||
|
foreach(partitionCell, partitionList)
|
||||||
|
{
|
||||||
|
Oid partitionRelationId = lfirst_oid(partitionCell);
|
||||||
|
LockRelationOid(partitionRelationId, lockMode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -228,6 +228,7 @@ typedef struct MultiPlan
|
||||||
Job *workerJob;
|
Job *workerJob;
|
||||||
Query *masterQuery;
|
Query *masterQuery;
|
||||||
bool routerExecutable;
|
bool routerExecutable;
|
||||||
|
List *relationIdList;
|
||||||
|
|
||||||
/* INSERT ... SELECT via coordinator only */
|
/* INSERT ... SELECT via coordinator only */
|
||||||
Query *insertSelectSubquery;
|
Query *insertSelectSubquery;
|
||||||
|
|
|
@ -80,5 +80,11 @@ extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode);
|
||||||
extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode);
|
extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode);
|
||||||
extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode);
|
extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode);
|
||||||
|
|
||||||
|
/* Lock partitions of partitioned table */
|
||||||
|
extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode);
|
||||||
|
extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);
|
||||||
|
|
||||||
|
/* Lock parent table's colocated shard resource */
|
||||||
|
extern void LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode);
|
||||||
|
|
||||||
#endif /* RESOURCE_LOCK_H */
|
#endif /* RESOURCE_LOCK_H */
|
||||||
|
|
|
@ -923,15 +923,327 @@ ERROR: no partition of relation "multi_column_partitioning_1660068" found for r
|
||||||
DETAIL: Partition key of the failing row contains (c1, c2) = (20, -20).
|
DETAIL: Partition key of the failing row contains (c1, c2) = (20, -20).
|
||||||
CONTEXT: while executing command on localhost:57637
|
CONTEXT: while executing command on localhost:57637
|
||||||
-- see data is loaded to multi-column partitioned table
|
-- see data is loaded to multi-column partitioned table
|
||||||
SELECT * FROM multi_column_partitioning;
|
SELECT * FROM multi_column_partitioning ORDER BY 1, 2;
|
||||||
c1 | c2
|
c1 | c2
|
||||||
----+-----
|
----+-----
|
||||||
1 | 1
|
1 | 1
|
||||||
5 | -5
|
5 | -5
|
||||||
19 | -19
|
|
||||||
11 | -11
|
11 | -11
|
||||||
|
19 | -19
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
DROP TABLE IF EXISTS partitioning_test_2012, partitioning_test_2013, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning;
|
--
|
||||||
|
-- Tests for locks on partitioned tables
|
||||||
|
--
|
||||||
|
CREATE TABLE partitioning_locks(id int, ref_id int, time date) PARTITION BY RANGE (time);
|
||||||
|
-- create its partitions
|
||||||
|
CREATE TABLE partitioning_locks_2009 PARTITION OF partitioning_locks FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||||
|
CREATE TABLE partitioning_locks_2010 PARTITION OF partitioning_locks FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
|
||||||
|
-- distribute partitioned table
|
||||||
|
SELECT create_distributed_table('partitioning_locks', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test locks on router SELECT
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM partitioning_locks WHERE id = 1 ORDER BY 1, 2;
|
||||||
|
id | ref_id | time
|
||||||
|
----+--------+------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
relation | locktype | mode
|
||||||
|
-------------------------+----------+-----------------
|
||||||
|
partitioning_locks | relation | AccessShareLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on real-time SELECT
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM partitioning_locks ORDER BY 1, 2;
|
||||||
|
id | ref_id | time
|
||||||
|
----+--------+------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
relation | locktype | mode
|
||||||
|
-------------------------+----------+-----------------
|
||||||
|
partitioning_locks | relation | AccessShareLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on task-tracker SELECT
|
||||||
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM partitioning_locks AS pl1 JOIN partitioning_locks AS pl2 ON pl1.id = pl2.ref_id ORDER BY 1, 2;
|
||||||
|
id | ref_id | time | id | ref_id | time
|
||||||
|
----+--------+------+----+--------+------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
relation | locktype | mode
|
||||||
|
-------------------------+----------+-----------------
|
||||||
|
partitioning_locks | relation | AccessShareLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
SET citus.task_executor_type TO 'real-time';
|
||||||
|
-- test locks on INSERT
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks VALUES(1, 1, '2009-01-01');
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
relation | locktype | mode
|
||||||
|
-------------------------+----------+------------------
|
||||||
|
partitioning_locks | relation | AccessShareLock
|
||||||
|
partitioning_locks | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2009 | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2010 | relation | RowExclusiveLock
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on UPDATE
|
||||||
|
BEGIN;
|
||||||
|
UPDATE partitioning_locks SET time = '2009-02-01' WHERE id = 1;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
relation | locktype | mode
|
||||||
|
-------------------------+----------+------------------
|
||||||
|
partitioning_locks | relation | AccessShareLock
|
||||||
|
partitioning_locks | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2009 | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2010 | relation | RowExclusiveLock
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on DELETE
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM partitioning_locks WHERE id = 1;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
relation | locktype | mode
|
||||||
|
-------------------------+----------+------------------
|
||||||
|
partitioning_locks | relation | AccessShareLock
|
||||||
|
partitioning_locks | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2009 | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2010 | relation | RowExclusiveLock
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on INSERT/SELECT
|
||||||
|
CREATE TABLE partitioning_locks_for_select(id int, ref_id int, time date);
|
||||||
|
SELECT create_distributed_table('partitioning_locks_for_select', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
relation | locktype | mode
|
||||||
|
-------------------------------+----------+------------------
|
||||||
|
partitioning_locks | relation | AccessShareLock
|
||||||
|
partitioning_locks | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2009 | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2010 | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_for_select | relation | AccessShareLock
|
||||||
|
(7 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on coordinator INSERT/SELECT
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select LIMIT 5;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
relation | locktype | mode
|
||||||
|
-------------------------------+----------+------------------
|
||||||
|
partitioning_locks | relation | AccessShareLock
|
||||||
|
partitioning_locks | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2010 | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_for_select | relation | AccessShareLock
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on master_modify_multiple_shards
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards('UPDATE partitioning_locks SET time = ''2009-03-01''');
|
||||||
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
relation | locktype | mode
|
||||||
|
-------------------------+----------+------------------
|
||||||
|
partitioning_locks | relation | AccessShareLock
|
||||||
|
partitioning_locks | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2010 | relation | RowExclusiveLock
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on DDL
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE partitioning_locks ADD COLUMN new_column int;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
relation | locktype | mode
|
||||||
|
-------------------------+----------+---------------------
|
||||||
|
partitioning_locks | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks | relation | AccessShareLock
|
||||||
|
partitioning_locks_2009 | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks_2010 | relation | AccessExclusiveLock
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on TRUNCATE
|
||||||
|
BEGIN;
|
||||||
|
TRUNCATE partitioning_locks;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
relation | locktype | mode
|
||||||
|
-------------------------+----------+---------------------
|
||||||
|
partitioning_locks | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks | relation | AccessShareLock
|
||||||
|
partitioning_locks_2009 | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2009 | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2009 | relation | ShareLock
|
||||||
|
partitioning_locks_2010 | relation | AccessExclusiveLock
|
||||||
|
partitioning_locks_2010 | relation | AccessShareLock
|
||||||
|
partitioning_locks_2010 | relation | RowExclusiveLock
|
||||||
|
partitioning_locks_2010 | relation | ShareLock
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test shard resource locks with master_modify_multiple_shards
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards('UPDATE partitioning_locks_2009 SET time = ''2009-03-01''');
|
||||||
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- see the locks on parent table
|
||||||
|
SELECT
|
||||||
|
logicalrelid,
|
||||||
|
locktype,
|
||||||
|
mode
|
||||||
|
FROM
|
||||||
|
pg_locks AS l JOIN pg_dist_shard AS s
|
||||||
|
ON
|
||||||
|
l.objid = s.shardid
|
||||||
|
WHERE
|
||||||
|
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
|
||||||
|
pid = pg_backend_pid()
|
||||||
|
ORDER BY
|
||||||
|
1, 2, 3;
|
||||||
|
logicalrelid | locktype | mode
|
||||||
|
-------------------------+----------+--------------------------
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test shard resource locks with TRUNCATE
|
||||||
|
BEGIN;
|
||||||
|
TRUNCATE partitioning_locks_2009;
|
||||||
|
-- see the locks on parent table
|
||||||
|
SELECT
|
||||||
|
logicalrelid,
|
||||||
|
locktype,
|
||||||
|
mode
|
||||||
|
FROM
|
||||||
|
pg_locks AS l JOIN pg_dist_shard AS s
|
||||||
|
ON
|
||||||
|
l.objid = s.shardid
|
||||||
|
WHERE
|
||||||
|
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
|
||||||
|
pid = pg_backend_pid()
|
||||||
|
ORDER BY
|
||||||
|
1, 2, 3;
|
||||||
|
logicalrelid | locktype | mode
|
||||||
|
-------------------------+----------+--------------------------
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- test shard resource locks with INSERT/SELECT
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks_2009 SELECT * FROM partitioning_locks WHERE time >= '2009-01-01' AND time < '2010-01-01';
|
||||||
|
-- see the locks on parent table
|
||||||
|
SELECT
|
||||||
|
logicalrelid,
|
||||||
|
locktype,
|
||||||
|
mode
|
||||||
|
FROM
|
||||||
|
pg_locks AS l JOIN pg_dist_shard AS s
|
||||||
|
ON
|
||||||
|
l.objid = s.shardid
|
||||||
|
WHERE
|
||||||
|
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
|
||||||
|
pid = pg_backend_pid()
|
||||||
|
ORDER BY
|
||||||
|
1, 2, 3;
|
||||||
|
logicalrelid | locktype | mode
|
||||||
|
-------------------------+----------+--------------------------
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
DROP TABLE
|
||||||
|
IF EXISTS
|
||||||
|
partitioning_test_2012,
|
||||||
|
partitioning_test_2013,
|
||||||
|
partitioned_events_table,
|
||||||
|
partitioned_users_table,
|
||||||
|
list_partitioned_events_table,
|
||||||
|
multi_column_partitioning,
|
||||||
|
partitioning_locks,
|
||||||
|
partitioning_locks_for_select;
|
||||||
NOTICE: table "partitioning_test_2012" does not exist, skipping
|
NOTICE: table "partitioning_test_2012" does not exist, skipping
|
||||||
NOTICE: table "partitioning_test_2013" does not exist, skipping
|
NOTICE: table "partitioning_test_2013" does not exist, skipping
|
||||||
|
|
|
@ -923,12 +923,207 @@ ERROR: relation "multi_column_partitioning" does not exist
|
||||||
LINE 1: INSERT INTO multi_column_partitioning VALUES(20, -20);
|
LINE 1: INSERT INTO multi_column_partitioning VALUES(20, -20);
|
||||||
^
|
^
|
||||||
-- see data is loaded to multi-column partitioned table
|
-- see data is loaded to multi-column partitioned table
|
||||||
SELECT * FROM multi_column_partitioning;
|
SELECT * FROM multi_column_partitioning ORDER BY 1, 2;
|
||||||
ERROR: relation "multi_column_partitioning" does not exist
|
ERROR: relation "multi_column_partitioning" does not exist
|
||||||
LINE 1: SELECT * FROM multi_column_partitioning;
|
LINE 1: SELECT * FROM multi_column_partitioning ORDER BY 1, 2;
|
||||||
^
|
^
|
||||||
DROP TABLE IF EXISTS partitioning_test_2012, partitioning_test_2013, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning;
|
--
|
||||||
|
-- Tests for locks on partitioned tables
|
||||||
|
--
|
||||||
|
CREATE TABLE partitioning_locks(id int, ref_id int, time date) PARTITION BY RANGE (time);
|
||||||
|
ERROR: syntax error at or near "PARTITION"
|
||||||
|
LINE 1: ...partitioning_locks(id int, ref_id int, time date) PARTITION ...
|
||||||
|
^
|
||||||
|
-- create its partitions
|
||||||
|
CREATE TABLE partitioning_locks_2009 PARTITION OF partitioning_locks FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||||
|
ERROR: syntax error at or near "PARTITION"
|
||||||
|
LINE 1: CREATE TABLE partitioning_locks_2009 PARTITION OF partitioni...
|
||||||
|
^
|
||||||
|
CREATE TABLE partitioning_locks_2010 PARTITION OF partitioning_locks FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
|
||||||
|
ERROR: syntax error at or near "PARTITION"
|
||||||
|
LINE 1: CREATE TABLE partitioning_locks_2010 PARTITION OF partitioni...
|
||||||
|
^
|
||||||
|
-- distribute partitioned table
|
||||||
|
SELECT create_distributed_table('partitioning_locks', 'id');
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
LINE 1: SELECT create_distributed_table('partitioning_locks', 'id');
|
||||||
|
^
|
||||||
|
-- test locks on router SELECT
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM partitioning_locks WHERE id = 1 ORDER BY 1, 2;
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
LINE 1: SELECT * FROM partitioning_locks WHERE id = 1 ORDER BY 1, 2;
|
||||||
|
^
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on real-time SELECT
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM partitioning_locks ORDER BY 1, 2;
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
LINE 1: SELECT * FROM partitioning_locks ORDER BY 1, 2;
|
||||||
|
^
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on task-tracker SELECT
|
||||||
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM partitioning_locks AS pl1 JOIN partitioning_locks AS pl2 ON pl1.id = pl2.ref_id ORDER BY 1, 2;
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
LINE 1: SELECT * FROM partitioning_locks AS pl1 JOIN partitioning_lo...
|
||||||
|
^
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
SET citus.task_executor_type TO 'real-time';
|
||||||
|
-- test locks on INSERT
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks VALUES(1, 1, '2009-01-01');
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
LINE 1: INSERT INTO partitioning_locks VALUES(1, 1, '2009-01-01');
|
||||||
|
^
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on UPDATE
|
||||||
|
BEGIN;
|
||||||
|
UPDATE partitioning_locks SET time = '2009-02-01' WHERE id = 1;
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
LINE 1: UPDATE partitioning_locks SET time = '2009-02-01' WHERE id =...
|
||||||
|
^
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on DELETE
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM partitioning_locks WHERE id = 1;
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
LINE 1: DELETE FROM partitioning_locks WHERE id = 1;
|
||||||
|
^
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on INSERT/SELECT
|
||||||
|
CREATE TABLE partitioning_locks_for_select(id int, ref_id int, time date);
|
||||||
|
SELECT create_distributed_table('partitioning_locks_for_select', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select;
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
LINE 1: INSERT INTO partitioning_locks SELECT * FROM partitioning_lo...
|
||||||
|
^
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on coordinator INSERT/SELECT
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select LIMIT 5;
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
LINE 1: INSERT INTO partitioning_locks SELECT * FROM partitioning_lo...
|
||||||
|
^
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on master_modify_multiple_shards
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards('UPDATE partitioning_locks SET time = ''2009-03-01''');
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on DDL
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE partitioning_locks ADD COLUMN new_column int;
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test locks on TRUNCATE
|
||||||
|
BEGIN;
|
||||||
|
TRUNCATE partitioning_locks;
|
||||||
|
ERROR: relation "partitioning_locks" does not exist
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test shard resource locks with master_modify_multiple_shards
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards('UPDATE partitioning_locks_2009 SET time = ''2009-03-01''');
|
||||||
|
ERROR: relation "partitioning_locks_2009" does not exist
|
||||||
|
-- see the locks on parent table
|
||||||
|
SELECT
|
||||||
|
logicalrelid,
|
||||||
|
locktype,
|
||||||
|
mode
|
||||||
|
FROM
|
||||||
|
pg_locks AS l JOIN pg_dist_shard AS s
|
||||||
|
ON
|
||||||
|
l.objid = s.shardid
|
||||||
|
WHERE
|
||||||
|
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
|
||||||
|
pid = pg_backend_pid()
|
||||||
|
ORDER BY
|
||||||
|
1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test shard resource locks with TRUNCATE
|
||||||
|
BEGIN;
|
||||||
|
TRUNCATE partitioning_locks_2009;
|
||||||
|
ERROR: relation "partitioning_locks_2009" does not exist
|
||||||
|
-- see the locks on parent table
|
||||||
|
SELECT
|
||||||
|
logicalrelid,
|
||||||
|
locktype,
|
||||||
|
mode
|
||||||
|
FROM
|
||||||
|
pg_locks AS l JOIN pg_dist_shard AS s
|
||||||
|
ON
|
||||||
|
l.objid = s.shardid
|
||||||
|
WHERE
|
||||||
|
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
|
||||||
|
pid = pg_backend_pid()
|
||||||
|
ORDER BY
|
||||||
|
1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
-- test shard resource locks with INSERT/SELECT
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks_2009 SELECT * FROM partitioning_locks WHERE time >= '2009-01-01' AND time < '2010-01-01';
|
||||||
|
ERROR: relation "partitioning_locks_2009" does not exist
|
||||||
|
LINE 1: INSERT INTO partitioning_locks_2009 SELECT * FROM partitioni...
|
||||||
|
^
|
||||||
|
-- see the locks on parent table
|
||||||
|
SELECT
|
||||||
|
logicalrelid,
|
||||||
|
locktype,
|
||||||
|
mode
|
||||||
|
FROM
|
||||||
|
pg_locks AS l JOIN pg_dist_shard AS s
|
||||||
|
ON
|
||||||
|
l.objid = s.shardid
|
||||||
|
WHERE
|
||||||
|
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
|
||||||
|
pid = pg_backend_pid()
|
||||||
|
ORDER BY
|
||||||
|
1, 2, 3;
|
||||||
|
ERROR: current transaction is aborted, commands ignored until end of transaction block
|
||||||
|
COMMIT;
|
||||||
|
DROP TABLE
|
||||||
|
IF EXISTS
|
||||||
|
partitioning_test_2012,
|
||||||
|
partitioning_test_2013,
|
||||||
|
partitioned_events_table,
|
||||||
|
partitioned_users_table,
|
||||||
|
list_partitioned_events_table,
|
||||||
|
multi_column_partitioning,
|
||||||
|
partitioning_locks,
|
||||||
|
partitioning_locks_for_select;
|
||||||
NOTICE: table "partitioned_events_table" does not exist, skipping
|
NOTICE: table "partitioned_events_table" does not exist, skipping
|
||||||
NOTICE: table "partitioned_users_table" does not exist, skipping
|
NOTICE: table "partitioned_users_table" does not exist, skipping
|
||||||
NOTICE: table "list_partitioned_events_table" does not exist, skipping
|
NOTICE: table "list_partitioned_events_table" does not exist, skipping
|
||||||
NOTICE: table "multi_column_partitioning" does not exist, skipping
|
NOTICE: table "multi_column_partitioning" does not exist, skipping
|
||||||
|
NOTICE: table "partitioning_locks" does not exist, skipping
|
||||||
|
|
|
@ -660,6 +660,157 @@ INSERT INTO multi_column_partitioning_10_max_20_min VALUES(19, -19);
|
||||||
INSERT INTO multi_column_partitioning VALUES(20, -20);
|
INSERT INTO multi_column_partitioning VALUES(20, -20);
|
||||||
|
|
||||||
-- see data is loaded to multi-column partitioned table
|
-- see data is loaded to multi-column partitioned table
|
||||||
SELECT * FROM multi_column_partitioning;
|
SELECT * FROM multi_column_partitioning ORDER BY 1, 2;
|
||||||
|
|
||||||
DROP TABLE IF EXISTS partitioning_test_2012, partitioning_test_2013, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning;
|
--
|
||||||
|
-- Tests for locks on partitioned tables
|
||||||
|
--
|
||||||
|
CREATE TABLE partitioning_locks(id int, ref_id int, time date) PARTITION BY RANGE (time);
|
||||||
|
|
||||||
|
-- create its partitions
|
||||||
|
CREATE TABLE partitioning_locks_2009 PARTITION OF partitioning_locks FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||||
|
CREATE TABLE partitioning_locks_2010 PARTITION OF partitioning_locks FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
|
||||||
|
|
||||||
|
-- distribute partitioned table
|
||||||
|
SELECT create_distributed_table('partitioning_locks', 'id');
|
||||||
|
|
||||||
|
-- test locks on router SELECT
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM partitioning_locks WHERE id = 1 ORDER BY 1, 2;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test locks on real-time SELECT
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM partitioning_locks ORDER BY 1, 2;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test locks on task-tracker SELECT
|
||||||
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM partitioning_locks AS pl1 JOIN partitioning_locks AS pl2 ON pl1.id = pl2.ref_id ORDER BY 1, 2;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
SET citus.task_executor_type TO 'real-time';
|
||||||
|
|
||||||
|
-- test locks on INSERT
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks VALUES(1, 1, '2009-01-01');
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test locks on UPDATE
|
||||||
|
BEGIN;
|
||||||
|
UPDATE partitioning_locks SET time = '2009-02-01' WHERE id = 1;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test locks on DELETE
|
||||||
|
BEGIN;
|
||||||
|
DELETE FROM partitioning_locks WHERE id = 1;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test locks on INSERT/SELECT
|
||||||
|
CREATE TABLE partitioning_locks_for_select(id int, ref_id int, time date);
|
||||||
|
SELECT create_distributed_table('partitioning_locks_for_select', 'id');
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test locks on coordinator INSERT/SELECT
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks SELECT * FROM partitioning_locks_for_select LIMIT 5;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test locks on master_modify_multiple_shards
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards('UPDATE partitioning_locks SET time = ''2009-03-01''');
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test locks on DDL
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE partitioning_locks ADD COLUMN new_column int;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test locks on TRUNCATE
|
||||||
|
BEGIN;
|
||||||
|
TRUNCATE partitioning_locks;
|
||||||
|
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test shard resource locks with master_modify_multiple_shards
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards('UPDATE partitioning_locks_2009 SET time = ''2009-03-01''');
|
||||||
|
|
||||||
|
-- see the locks on parent table
|
||||||
|
SELECT
|
||||||
|
logicalrelid,
|
||||||
|
locktype,
|
||||||
|
mode
|
||||||
|
FROM
|
||||||
|
pg_locks AS l JOIN pg_dist_shard AS s
|
||||||
|
ON
|
||||||
|
l.objid = s.shardid
|
||||||
|
WHERE
|
||||||
|
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
|
||||||
|
pid = pg_backend_pid()
|
||||||
|
ORDER BY
|
||||||
|
1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test shard resource locks with TRUNCATE
|
||||||
|
BEGIN;
|
||||||
|
TRUNCATE partitioning_locks_2009;
|
||||||
|
|
||||||
|
-- see the locks on parent table
|
||||||
|
SELECT
|
||||||
|
logicalrelid,
|
||||||
|
locktype,
|
||||||
|
mode
|
||||||
|
FROM
|
||||||
|
pg_locks AS l JOIN pg_dist_shard AS s
|
||||||
|
ON
|
||||||
|
l.objid = s.shardid
|
||||||
|
WHERE
|
||||||
|
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
|
||||||
|
pid = pg_backend_pid()
|
||||||
|
ORDER BY
|
||||||
|
1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- test shard resource locks with INSERT/SELECT
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO partitioning_locks_2009 SELECT * FROM partitioning_locks WHERE time >= '2009-01-01' AND time < '2010-01-01';
|
||||||
|
|
||||||
|
-- see the locks on parent table
|
||||||
|
SELECT
|
||||||
|
logicalrelid,
|
||||||
|
locktype,
|
||||||
|
mode
|
||||||
|
FROM
|
||||||
|
pg_locks AS l JOIN pg_dist_shard AS s
|
||||||
|
ON
|
||||||
|
l.objid = s.shardid
|
||||||
|
WHERE
|
||||||
|
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
|
||||||
|
pid = pg_backend_pid()
|
||||||
|
ORDER BY
|
||||||
|
1, 2, 3;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
DROP TABLE
|
||||||
|
IF EXISTS
|
||||||
|
partitioning_test_2012,
|
||||||
|
partitioning_test_2013,
|
||||||
|
partitioned_events_table,
|
||||||
|
partitioned_users_table,
|
||||||
|
list_partitioned_events_table,
|
||||||
|
multi_column_partitioning,
|
||||||
|
partitioning_locks,
|
||||||
|
partitioning_locks_for_select;
|
||||||
|
|
Loading…
Reference in New Issue