Merge pull request #2220 from citusdata/relation_access_via_placement_access

Track relation accesses using the connection management infrastructure
pull/2182/head
Önder Kalacı 2018-06-25 22:52:57 +03:00 committed by GitHub
commit d63cbf3822
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 3415 additions and 38 deletions

View File

@ -68,11 +68,13 @@ OBJS = src/backend/distributed/shared_library_init.o \
src/backend/distributed/test/partitioning_utils.o \
src/backend/distributed/test/progress_utils.o \
src/backend/distributed/test/prune_shard_list.o \
src/backend/distributed/test/relation_access_tracking.o \
src/backend/distributed/test/sequential_execution.o \
src/backend/distributed/transaction/backend_data.o \
src/backend/distributed/transaction/distributed_deadlock_detection.o \
src/backend/distributed/transaction/lock_graph.o \
src/backend/distributed/transaction/multi_shard_transaction.o \
src/backend/distributed/transaction/relation_access_tracking.o \
src/backend/distributed/transaction/remote_transaction.o \
src/backend/distributed/transaction/transaction_management.o \
src/backend/distributed/transaction/transaction_recovery.o \

View File

@ -66,6 +66,7 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/resource_lock.h"
@ -2167,6 +2168,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState);
copyDest->copyOutState = copyOutState;
copyDest->multiShardCopy = false;
/* prepare functions to call on received tuples */
{
@ -2311,6 +2313,21 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
&shardConnectionsFound);
if (!shardConnectionsFound)
{
/*
* Keep track of multi shard accesses before opening connection
* the second shard.
*/
if (!copyDest->multiShardCopy && hash_get_num_entries(shardConnectionHash) == 2)
{
Oid relationId = copyDest->distributedRelationId;
/* mark as multi shard to skip doing the same thing over and over */
copyDest->multiShardCopy = true;
/* when we see multiple shard connections, we mark COPY as parallel modify */
RecordParallelModifyAccess(relationId);
}
/* open connections and initiate COPY on shard placements */
OpenCopyConnections(copyStatement, shardConnections, stopOnFailure,
copyOutState->binary);

View File

@ -19,6 +19,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/distributed_planner.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
@ -646,6 +647,9 @@ FindPlacementListConnection(int flags, List *placementAccessList, const char *us
}
*placementEntryList = lappend(*placementEntryList, placementEntry);
/* record the relation access mapping */
AssociatePlacementAccessWithRelation(placement, accessType);
}
return chosenConnection;
@ -920,6 +924,7 @@ ResetPlacementConnectionManagement(void)
hash_delete_all(ConnectionPlacementHash);
hash_delete_all(ConnectionShardHash);
hash_delete_all(ColocatedPlacementsHash);
ResetRelationAccessHash();
/*
* NB: memory for ConnectionReference structs and subordinate data is
@ -1129,6 +1134,9 @@ InitPlacementConnectionManagement(void)
ConnectionShardHash = hash_create("citus connection cache (shardid)",
64, &info, hashFlags);
/* (relationId) = [relationAccessMode] hash */
AllocateRelationAccessHash();
}

View File

@ -17,6 +17,7 @@
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/distributed_planner.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "executor/executor.h"

View File

@ -32,6 +32,8 @@
#include "distributed/multi_resowner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h"
#include "distributed/subplan_execution.h"
#include "distributed/worker_protocol.h"
@ -104,8 +106,15 @@ MultiRealTimeExecute(Job *job)
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
TaskExecution *taskExecution = NULL;
TaskExecution *taskExecution = InitTaskExecution(task, EXEC_TASK_CONNECT_START);
/* keep track of multi shard accesses before opening the connections */
if (MultiShardConnectionType == PARALLEL_CONNECTION)
{
RecordRelationParallelSelectAccessForTask(task);
}
taskExecution = InitTaskExecution(task, EXEC_TASK_CONNECT_START);
taskExecutionList = lappend(taskExecutionList, taskExecution);
}

View File

@ -41,6 +41,7 @@
#include "distributed/multi_router_planner.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/subplan_execution.h"
#include "distributed/relay_utility.h"
#include "distributed/remote_commands.h"
@ -85,6 +86,8 @@ static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdT
operation, bool alwaysThrowErrorOnFailure, bool
expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * BuildPlacementAccessList(uint32 groupId, List *relationShardList,
ShardPlacementAccessType accessType);
static List * GetModifyConnections(Task *task, bool markCritical);
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults);
@ -841,6 +844,28 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
*/
List *
BuildPlacementSelectList(uint32 groupId, List *relationShardList)
{
return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_SELECT);
}
/*
* BuildPlacementDDLList is a warpper around BuildPlacementAccessList() for DDL access.
*/
List *
BuildPlacementDDLList(uint32 groupId, List *relationShardList)
{
return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_DDL);
}
/*
* BuildPlacementAccessList returns a list of placement accesses for the given
* relationShardList and the access type.
*/
static List *
BuildPlacementAccessList(uint32 groupId, List *relationShardList,
ShardPlacementAccessType accessType)
{
ListCell *relationShardCell = NULL;
List *placementAccessList = NIL;
@ -857,7 +882,7 @@ BuildPlacementSelectList(uint32 groupId, List *relationShardList)
continue;
}
placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT);
placementAccess = CreatePlacementAccess(placement, accessType);
placementAccessList = lappend(placementAccessList, placementAccess);
}
@ -1091,9 +1116,22 @@ GetModifyConnections(Task *task, bool markCritical)
accessType = PLACEMENT_ACCESS_DML;
}
/* create placement accesses for placements that appear in a subselect */
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
relationShardList);
if (accessType == PLACEMENT_ACCESS_DDL)
{
/*
* All relations appearing inter-shard DDL commands should be marked
* with DDL access.
*/
placementAccessList =
BuildPlacementDDLList(taskPlacement->groupId, relationShardList);
}
else
{
/* create placement accesses for placements that appear in a subselect */
placementAccessList =
BuildPlacementSelectList(taskPlacement->groupId, relationShardList);
}
Assert(list_length(placementAccessList) == list_length(relationShardList));
@ -1278,17 +1316,14 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
/*
* Ensure that there are no concurrent modifications on the same
* shards. For DDL commands, we already obtained the appropriate
* locks in ProcessUtility.
*
* We don't need to acquire lock for TRUNCATE_TASK since it already
* acquires AccessExclusiveLock on the relation, and blocks any
* concurrent operation.
* shards. In general, for DDL commands, we already obtained the
* appropriate locks in ProcessUtility. However, we still prefer to
* acquire the executor locks for DDLs specifically for TRUNCATE
* command on a partition table since AcquireExecutorMultiShardLocks()
* ensures that no concurrent modifications happens on the parent
* tables.
*/
if (firstTask->taskType == MODIFY_TASK)
{
AcquireExecutorMultiShardLocks(taskList);
}
AcquireExecutorMultiShardLocks(taskList);
BeginOrContinueCoordinatedTransaction();
@ -1298,6 +1333,32 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
CoordinatedTransactionUse2PC();
}
/*
* With a similar rationale as above, where we expect all tasks to operate on
* the same relations, we prefer to record relation accesses for the first
* task only.
*/
if (firstTask->taskType == MODIFY_TASK)
{
RecordRelationParallelModifyAccessForTask(firstTask);
/*
* We prefer to mark with SELECT access as well because for multi shard
* modification queries, the placement access list is always marked with both
* DML and SELECT accesses.
*/
RecordRelationParallelSelectAccessForTask(firstTask);
}
else if (firstTask->taskType == DDL_TASK &&
PartitionMethod(firstShardInterval->relationId) != DISTRIBUTE_BY_NONE)
{
/*
* Even single task DDLs hit here, so we'd prefer
* not to record for reference tables.
*/
RecordRelationParallelDDLAccessForTask(firstTask);
}
if (firstTask->taskType == DDL_TASK || firstTask->taskType == VACUUM_ANALYZE_TASK)
{
connectionFlags = FOR_DDL;

View File

@ -3121,10 +3121,18 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
uint64 leftShardId = leftShardInterval->shardId;
StringInfo applyCommand = makeStringInfo();
Task *task = NULL;
RelationShard *leftRelationShard = CitusMakeNode(RelationShard);
RelationShard *rightRelationShard = CitusMakeNode(RelationShard);
ShardInterval *rightShardInterval = (ShardInterval *) lfirst(rightShardCell);
uint64 rightShardId = rightShardInterval->shardId;
leftRelationShard->relationId = leftRelationId;
leftRelationShard->shardId = leftShardId;
rightRelationShard->relationId = rightRelationId;
rightRelationShard->shardId = rightShardId;
appendStringInfo(applyCommand, WORKER_APPLY_INTER_SHARD_DDL_COMMAND,
leftShardId, escapedLeftSchemaName, rightShardId,
escapedRightSchemaName, escapedCommandString);
@ -3138,6 +3146,7 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = leftShardId;
task->taskPlacementList = FinalizedShardPlacementList(leftShardId);
task->relationShardList = list_make2(leftRelationShard, rightRelationShard);
taskList = lappend(taskList, task);
}

View File

@ -30,6 +30,7 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/reference_table_utils.h"

View File

@ -55,7 +55,8 @@
#include "utils/memutils.h"
static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList);
static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType
taskType);
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
@ -85,6 +86,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
List *taskList = NIL;
int32 affectedTupleCount = 0;
CmdType operation = CMD_UNKNOWN;
TaskType taskType = TASK_TYPE_INVALID_FIRST;
#if (PG_VERSION_NUM >= 100000)
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
queryTreeNode = rawStmt->stmt;
@ -159,6 +161,12 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
{
RaiseDeferredError(error, ERROR);
}
taskType = MODIFY_TASK;
}
else
{
taskType = DDL_TASK;
}
/* reject queries with a returning list */
@ -179,7 +187,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
CHECK_FOR_INTERRUPTS();
taskList =
ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList);
ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType);
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
@ -200,7 +208,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
* given list of shards.
*/
static List *
ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList)
ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType taskType)
{
List *taskList = NIL;
ListCell *shardIntervalCell = NULL;
@ -223,7 +231,7 @@ ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList)
task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = MODIFY_TASK;
task->taskType = taskType;
task->queryString = shardQueryString->data;
task->dependedTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;

View File

@ -30,8 +30,10 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/distributed_planner.h"
#include "distributed/foreign_constraint.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_router_executor.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
@ -40,6 +42,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
@ -518,6 +521,19 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
CoordinatedTransactionUse2PC();
}
/* mark parallel relation accesses before opening connections */
if (ShouldRecordRelationAccess() && useExclusiveConnection)
{
RecordParallelDDLAccess(distributedRelationId);
/* we should mark the parent as well */
if (alterTableAttachPartitionCommand != NULL)
{
Oid parentRelationId = PartitionParentOid(distributedRelationId);
RecordParallelDDLAccess(parentRelationId);
}
}
foreach(shardPlacementCell, shardPlacements)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
@ -531,8 +547,38 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
shardIndex = ShardIndex(shardInterval);
}
connection = GetPlacementConnection(connectionFlags, shardPlacement,
placementOwner);
/*
* For partitions, make sure that we mark the parent table relation access
* with DDL. This is only important for parallel relation access in transaction
* blocks, thus check useExclusiveConnection and transaction block as well.
*/
if ((ShouldRecordRelationAccess() && useExclusiveConnection) &&
alterTableAttachPartitionCommand != NULL)
{
RelationShard *parentRelationShard = CitusMakeNode(RelationShard);
RelationShard *partitionRelationShard = CitusMakeNode(RelationShard);
List *relationShardList = NIL;
List *placementAccessList = NIL;
parentRelationShard->relationId = PartitionParentOid(distributedRelationId);
parentRelationShard->shardId =
ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex);
partitionRelationShard->relationId = distributedRelationId;
partitionRelationShard->shardId = shardId;
relationShardList = list_make2(parentRelationShard, partitionRelationShard);
placementAccessList = BuildPlacementDDLList(shardPlacement->groupId,
relationShardList);
connection = GetPlacementListConnection(connectionFlags, placementAccessList,
placementOwner);
}
else
{
connection = GetPlacementConnection(connectionFlags, shardPlacement,
placementOwner);
}
if (useExclusiveConnection)
{
ClaimConnectionExclusively(connection);

View File

@ -0,0 +1,60 @@
/*-------------------------------------------------------------------------
*
* test/src/relation_acess_tracking.c
*
* Some test UDF for tracking relation accesses within transaction blocks.
*
* Copyright (c) 2014-2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "c.h"
#include "fmgr.h"
#include "distributed/relation_access_tracking.h"
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(relation_select_access_mode);
PG_FUNCTION_INFO_V1(relation_dml_access_mode);
PG_FUNCTION_INFO_V1(relation_ddl_access_mode);
/*
* relation_select_access_mode returns the SELECT access
* type (e.g., single shard - multi shard) for the given relation.
*/
Datum
relation_select_access_mode(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
PG_RETURN_INT64(GetRelationSelectAccessMode(relationId));
}
/*
* relation_dml_access_mode returns the DML access type (e.g.,
* single shard - multi shard) for the given relation.
*/
Datum
relation_dml_access_mode(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
PG_RETURN_INT64(GetRelationDMLAccessMode(relationId));
}
/*
* relation_ddl_access_mode returns the DDL access type (e.g.,
* single shard - multi shard) for the given relation.
*/
Datum
relation_ddl_access_mode(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
PG_RETURN_INT64(GetRelationDDLAccessMode(relationId));
}

View File

@ -91,7 +91,6 @@ OpenTransactionsForAllTasks(List *taskList, int connectionFlags)
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(placementCell);
ShardPlacementAccess placementModification;
List *placementAccessList = NIL;
List *placementSelectList = NIL;
MultiConnection *connection = NULL;
WorkerNode *workerNode = FindWorkerNode(shardPlacement->nodeName,
@ -109,10 +108,27 @@ OpenTransactionsForAllTasks(List *taskList, int connectionFlags)
placementAccessList = lappend(placementAccessList, &placementModification);
/* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */
placementSelectList = BuildPlacementSelectList(shardPlacement->groupId,
task->relationShardList);
placementAccessList = list_concat(placementAccessList, placementSelectList);
if (accessType == PLACEMENT_ACCESS_DDL)
{
List *placementDDLList = BuildPlacementDDLList(shardPlacement->groupId,
task->relationShardList);
/*
* All relations appearing inter-shard DDL commands should be marked
* with DDL access.
*/
placementAccessList = list_concat(placementAccessList, placementDDLList);
}
else
{
List *placementSelectList =
BuildPlacementSelectList(shardPlacement->groupId,
task->relationShardList);
/* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */
placementAccessList =
list_concat(placementAccessList, placementSelectList);
}
/*
* Find a connection that sees preceding writes and cannot self-deadlock,

View File

@ -0,0 +1,533 @@
/*-------------------------------------------------------------------------
*
* relation_access_tracking.c
*
* Transaction access tracking for Citus. The functions in this file
* are intended to track the relation accesses within a transaction. The
* logic here is mostly useful when a reference table is referred by
* a distributed table via a foreign key. Whenever such a pair of tables
* are acccesed inside a transaction, Citus should detect and act
* accordingly.
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/xact.h"
#include "distributed/colocation_utils.h"
#include "distributed/hash_helpers.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/relation_access_tracking.h"
#include "utils/hsearch.h"
#define PARALLEL_MODE_FLAG_OFFSET 3
/*
* Hash table mapping relations to the
* (relationId) = (relationAccessType and relationAccessMode)
*
* RelationAccessHash is used to keep track of relation accesses types (e.g., select,
* dml or ddl) along with access modes (e.g., no access, sequential access or
* parallel access).
*
* We keep an integer per relation and use some of the bits to identify the access types
* and access modes.
*
* We store the access types in the first 3 bits:
* - 0th bit is set for SELECT accesses to a relation
* - 1st bit is set for DML accesses to a relation
* - 2nd bit is set for DDL accesses to a relation
*
* and, access modes in the next 3 bits:
* - 3rd bit is set for PARALLEL SELECT accesses to a relation
* - 4th bit is set for PARALLEL DML accesses to a relation
* - 5th bit is set for PARALLEL DDL accesses to a relation
*
*/
typedef struct RelationAccessHashKey
{
Oid relationId;
} RelationAccessHashKey;
typedef struct RelationAccessHashEntry
{
RelationAccessHashKey key;
int relationAccessMode;
} RelationAccessHashEntry;
static HTAB *RelationAccessHash;
static void RecordRelationAccess(Oid relationId, ShardPlacementAccessType accessType);
static void RecordPlacementAccessToCache(Oid relationId,
ShardPlacementAccessType accessType);
static RelationAccessMode GetRelationAccessMode(Oid relationId,
ShardPlacementAccessType accessType);
static void RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessType
placementAccess);
static void RecordParallelRelationAccessToCache(Oid relationId,
ShardPlacementAccessType placementAccess);
/*
* Empty RelationAccessHash, without destroying the hash table itself.
*/
void
ResetRelationAccessHash()
{
hash_delete_all(RelationAccessHash);
}
/*
* Allocate RelationAccessHash.
*/
void
AllocateRelationAccessHash(void)
{
HASHCTL info;
uint32 hashFlags = 0;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(RelationAccessHashKey);
info.entrysize = sizeof(RelationAccessHashEntry);
info.hash = tag_hash;
info.hcxt = ConnectionContext;
hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
RelationAccessHash = hash_create("citus connection cache (relationid)",
8, &info, hashFlags);
}
/*
* AssociatePlacementAccessWithRelation associates the placement access to the
* distributed relation that the placement belongs to.
*/
void
AssociatePlacementAccessWithRelation(ShardPlacement *placement,
ShardPlacementAccessType accessType)
{
Oid relationId = InvalidOid;
uint64 shardId = INVALID_SHARD_ID;
if (!ShouldRecordRelationAccess())
{
return;
}
shardId = placement->shardId;
relationId = RelationIdForShard(shardId);
RecordRelationAccess(relationId, accessType);
}
/*
* RecordRelationAccess associates the access to the distributed relation. The
* function takes partitioned relations into account as well.
*
* We implemented this function to prevent accessing placement metadata during
* recursive calls of the function itself (e.g., avoid
* AssociatePlacementAccessWithRelation()).
*/
static void
RecordRelationAccess(Oid relationId, ShardPlacementAccessType accessType)
{
/*
* If a relation is partitioned, record accesses to all of its partitions as well.
* We prefer to use PartitionedTableNoLock() because at this point the necessary
* locks on the relation has already been acquired.
*/
if (PartitionedTableNoLock(relationId))
{
List *partitionList = PartitionList(relationId);
ListCell *partitionCell = NULL;
foreach(partitionCell, partitionList)
{
Oid partitionOid = lfirst_oid(partitionCell);
/*
* During create_distributed_table, the partitions may not
* have been created yet and so there are no placements yet.
* We're already going to register them when we distribute
* the partitions.
*/
if (!IsDistributedTable(partitionOid))
{
continue;
}
/* recursively call the function to cover multi-level partitioned tables */
RecordRelationAccess(partitionOid, accessType);
}
}
else if (PartitionTableNoLock(relationId))
{
Oid parentOid = PartitionParentOid(relationId);
/* record the parent */
RecordPlacementAccessToCache(parentOid, accessType);
}
/* always record the relation that is being considered */
RecordPlacementAccessToCache(relationId, accessType);
}
/*
* RecordPlacementAccessToCache is a utility function which saves the given
* relation id's access to the RelationAccessHash.
*/
static void
RecordPlacementAccessToCache(Oid relationId, ShardPlacementAccessType accessType)
{
RelationAccessHashKey hashKey;
RelationAccessHashEntry *hashEntry;
bool found = false;
hashKey.relationId = relationId;
hashEntry = hash_search(RelationAccessHash, &hashKey, HASH_ENTER, &found);
if (!found)
{
hashEntry->relationAccessMode = 0;
}
/* set the bit representing the access type */
hashEntry->relationAccessMode |= (1 << (accessType));
}
/*
* RecordRelationParallelSelectAccessForTask goes over all the relations
* in the relationShardList and records the select access per each table.
*/
void
RecordRelationParallelSelectAccessForTask(Task *task)
{
List *relationShardList = NIL;
ListCell *relationShardCell = NULL;
Oid lastRelationId = InvalidOid;
/* no point in recoding accesses in non-transaction blocks, skip the loop */
if (!ShouldRecordRelationAccess())
{
return;
}
relationShardList = task->relationShardList;
foreach(relationShardCell, relationShardList)
{
RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell);
Oid currentRelationId = relationShard->relationId;
/*
* An optimization, skip going to hash table if we've already
* recorded the relation.
*/
if (currentRelationId == lastRelationId)
{
continue;
}
RecordParallelSelectAccess(currentRelationId);
lastRelationId = currentRelationId;
}
}
/*
* RecordRelationParallelModifyAccessForTask gets a task and records
* the accesses. Note that the target relation is recorded with modify access
* where as the subqueries inside the modify query is recorded with select
* access.
*/
void
RecordRelationParallelModifyAccessForTask(Task *task)
{
List *relationShardList = NULL;
ListCell *relationShardCell = NULL;
Oid lastRelationId = InvalidOid;
/* no point in recoding accesses in non-transaction blocks, skip the loop */
if (!ShouldRecordRelationAccess())
{
return;
}
/* anchor shard is always associated with modify access */
RecordParallelModifyAccess(RelationIdForShard(task->anchorShardId));
if (task->modifyWithSubquery)
{
relationShardList = task->relationShardList;
foreach(relationShardCell, relationShardList)
{
RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell);
Oid currentRelationId = relationShard->relationId;
/*
* An optimization, skip going to hash table if we've already
* recorded the relation.
*/
if (currentRelationId == lastRelationId)
{
continue;
}
RecordParallelSelectAccess(currentRelationId);
lastRelationId = currentRelationId;
}
}
}
/*
* RecordRelationParallelDDLAccessForTask marks all the relationShards
* with parallel DDL access if exists. That case is valid for inter-shard
* DDL commands such as foreign key creation. The function also records
* the relation that anchorShardId belongs to.
*/
void
RecordRelationParallelDDLAccessForTask(Task *task)
{
List *relationShardList = task->relationShardList;
ListCell *relationShardCell = NULL;
Oid lastRelationId = InvalidOid;
foreach(relationShardCell, relationShardList)
{
RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell);
Oid currentRelationId = relationShard->relationId;
/*
* An optimization, skip going to hash table if we've already
* recorded the relation.
*/
if (currentRelationId == lastRelationId)
{
continue;
}
RecordParallelDDLAccess(currentRelationId);
lastRelationId = currentRelationId;
}
RecordParallelDDLAccess(RelationIdForShard(task->anchorShardId));
}
/*
* RecordParallelSelectAccess is a wrapper around RecordParallelRelationAccess()
*/
void
RecordParallelSelectAccess(Oid relationId)
{
RecordParallelRelationAccess(relationId, PLACEMENT_ACCESS_SELECT);
}
/*
* RecordParallelModifyAccess is a wrapper around RecordParallelRelationAccess()
*/
void
RecordParallelModifyAccess(Oid relationId)
{
RecordParallelRelationAccess(relationId, PLACEMENT_ACCESS_DML);
}
/*
* RecordParallelDDLAccess is a wrapper around RecordParallelRelationAccess()
*/
void
RecordParallelDDLAccess(Oid relationId)
{
RecordParallelRelationAccess(relationId, PLACEMENT_ACCESS_DDL);
}
/*
* RecordParallelRelationAccess records the relation access mode as parallel
* for the given access type (e.g., select, dml or ddl) in the RelationAccessHash.
*
* The function also takes partitions and partitioned tables into account.
*/
static void
RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessType placementAccess)
{
if (!ShouldRecordRelationAccess())
{
return;
}
/*
* If a relation is partitioned, record accesses to all of its partitions as well.
* We prefer to use PartitionedTableNoLock() because at this point the necessary
* locks on the relation has already been acquired.
*/
if (PartitionedTableNoLock(relationId))
{
List *partitionList = PartitionList(relationId);
ListCell *partitionCell = NULL;
foreach(partitionCell, partitionList)
{
Oid partitionOid = lfirst_oid(partitionCell);
/* recursively record all relation accesses of its partitions */
RecordParallelRelationAccess(partitionOid, placementAccess);
}
}
else if (PartitionTableNoLock(relationId))
{
Oid parentOid = PartitionParentOid(relationId);
/* only record the parent */
RecordParallelRelationAccessToCache(parentOid, placementAccess);
}
RecordParallelRelationAccessToCache(relationId, placementAccess);
}
/*
* RecordParallelRelationAccessToCache is a utility function which saves the given
* relation id's access to the RelationAccessHash.
*/
static void
RecordParallelRelationAccessToCache(Oid relationId,
ShardPlacementAccessType placementAccess)
{
RelationAccessHashKey hashKey;
RelationAccessHashEntry *hashEntry;
bool found = false;
int parallelRelationAccessBit = 0;
hashKey.relationId = relationId;
hashEntry = hash_search(RelationAccessHash, &hashKey, HASH_ENTER, &found);
if (!found)
{
hashEntry->relationAccessMode = 0;
}
/* set the bit representing the access type */
hashEntry->relationAccessMode |= (1 << (placementAccess));
/* set the bit representing access mode */
parallelRelationAccessBit = placementAccess + PARALLEL_MODE_FLAG_OFFSET;
hashEntry->relationAccessMode |= (1 << parallelRelationAccessBit);
}
/*
* GetRelationSelectAccessMode is a wrapper around GetRelationAccessMode.
*/
RelationAccessMode
GetRelationSelectAccessMode(Oid relationId)
{
return GetRelationAccessMode(relationId, PLACEMENT_ACCESS_SELECT);
}
/*
* GetRelationDMLAccessMode is a wrapper around GetRelationAccessMode.
*/
RelationAccessMode
GetRelationDMLAccessMode(Oid relationId)
{
return GetRelationAccessMode(relationId, PLACEMENT_ACCESS_DML);
}
/*
* GetRelationDDLAccessMode is a wrapper around GetRelationAccessMode.
*/
RelationAccessMode
GetRelationDDLAccessMode(Oid relationId)
{
return GetRelationAccessMode(relationId, PLACEMENT_ACCESS_DDL);
}
/*
* GetRelationAccessMode returns the relation access mode (e.g., none, sequential
* or parallel) for the given access type (e.g., select, dml or ddl).
*/
static RelationAccessMode
GetRelationAccessMode(Oid relationId, ShardPlacementAccessType accessType)
{
RelationAccessHashKey hashKey;
RelationAccessHashEntry *hashEntry;
int relationAcessMode = 0;
bool found = false;
int parallelRelationAccessBit = accessType + PARALLEL_MODE_FLAG_OFFSET;
/* no point in getting the mode when not inside a transaction block */
if (!ShouldRecordRelationAccess())
{
return RELATION_NOT_ACCESSED;
}
hashKey.relationId = relationId;
hashEntry = hash_search(RelationAccessHash, &hashKey, HASH_FIND, &found);
if (!found)
{
/* relation not accessed at all */
return RELATION_NOT_ACCESSED;
}
relationAcessMode = hashEntry->relationAccessMode;
if (!(relationAcessMode & (1 << accessType)))
{
/* relation not accessed with the given access type */
return RELATION_NOT_ACCESSED;
}
if (relationAcessMode & (1 << parallelRelationAccessBit))
{
return RELATION_PARALLEL_ACCESSED;
}
else
{
return RELATION_SEQUENTIAL_ACCESSED;
}
}
/*
* ShouldRecordRelationAccess returns true when we should keep track
* of the relation accesses.
*
* In many cases, we'd only need IsTransactionBlock(), however, for some cases such as
* CTEs, where Citus uses the same connections accross multiple queries, we should
* still record the relation accesses even not inside an explicit transaction block.
* Thus, keeping track of the relation accesses inside coordinated transactions is
* also required.
*/
bool
ShouldRecordRelationAccess()
{
if (IsTransactionBlock() || InCoordinatedTransaction())
{
return true;
}
return false;
}

View File

@ -363,6 +363,26 @@ LoadShardInterval(uint64 shardId)
}
/*
* RelationIdOfShard returns the relationId of the given
* shardId.
*/
Oid
RelationIdForShard(uint64 shardId)
{
ShardCacheEntry *shardEntry = NULL;
DistTableCacheEntry *tableEntry = NULL;
shardEntry = LookupShardCacheEntry(shardId);
tableEntry = shardEntry->tableEntry;
Assert(tableEntry->isDistributedTable);
return tableEntry->relationId;
}
/*
* ReferenceTableShardId returns true if the given shardId belongs to
* a reference table.

View File

@ -57,6 +57,37 @@ PartitionedTable(Oid relationId)
}
/*
* Returns true if the given relation is a partitioned table. The function
* doesn't acquire any locks on the input relation, thus the caller is
* reponsible for holding the appropriate locks.
*/
bool
PartitionedTableNoLock(Oid relationId)
{
Relation rel = try_relation_open(relationId, NoLock);
bool partitionedTable = false;
/* don't error out for tables that are dropped */
if (rel == NULL)
{
return false;
}
#if (PG_VERSION_NUM >= 100000)
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
partitionedTable = true;
}
#endif
/* keep the lock */
heap_close(rel, NoLock);
return partitionedTable;
}
/*
* Returns true if the given relation is a partition.
*/
@ -77,6 +108,34 @@ PartitionTable(Oid relationId)
}
/*
* Returns true if the given relation is a partition. The function
* doesn't acquire any locks on the input relation, thus the caller is
* reponsible for holding the appropriate locks.
*/
bool
PartitionTableNoLock(Oid relationId)
{
Relation rel = try_relation_open(relationId, NoLock);
bool partitionTable = false;
/* don't error out for tables that are dropped */
if (rel == NULL)
{
return false;
}
#if (PG_VERSION_NUM >= 100000)
partitionTable = rel->rd_rel->relispartition;
#endif
/* keep the lock */
heap_close(rel, NoLock);
return partitionTable;
}
/*
* IsChildTable returns true if the table is inherited. Note that
* partition tables inherites by default. However, this function

View File

@ -78,6 +78,7 @@ typedef struct
extern bool IsDistributedTable(Oid relationId);
extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);
extern Oid RelationIdForShard(uint64 shardId);
extern bool ReferenceTableShardId(uint64 shardId);
extern ShardPlacement * FindShardPlacementOnGroup(uint32 groupId, uint64 shardId);
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);

View File

@ -105,6 +105,9 @@ typedef struct CitusCopyDestReceiver
/* number of tuples sent */
int64 tuplesSent;
/* useful for tracking multi shard accesses */
bool multiShardCopy;
} CitusCopyDestReceiver;

View File

@ -12,7 +12,9 @@
extern bool PartitionedTable(Oid relationId);
extern bool PartitionedTableNoLock(Oid relationId);
extern bool PartitionTable(Oid relationId);
extern bool PartitionTableNoLock(Oid relationId);
extern bool IsChildTable(Oid relationId);
extern bool IsParentTable(Oid relationId);
extern Oid PartitionParentOid(Oid partitionOid);

View File

@ -14,6 +14,7 @@
#include "access/sdir.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/placement_connection.h"
#include "executor/execdesc.h"
#include "executor/tuptable.h"
#include "nodes/pg_list.h"
@ -47,5 +48,6 @@ extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,
/* helper functions */
extern bool TaskListRequires2PC(List *taskList);
extern List * BuildPlacementSelectList(uint32 groupId, List *relationShardList);
extern List * BuildPlacementDDLList(uint32 groupId, List *relationShardList);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -0,0 +1,42 @@
/*
* relation_access_tracking.h
*
* Function declartions for transaction access tracking.
*
* Copyright (c) 2018, Citus Data, Inc.
*/
#ifndef RELATION_ACCESS_TRACKING_H_
#define RELATION_ACCESS_TRACKING_H_
#include "distributed/master_metadata_utility.h"
#include "distributed/multi_physical_planner.h" /* access Task struct */
#include "distributed/placement_connection.h"
/* forward declare, to avoid dependency on ShardPlacement definition */
struct ShardPlacement;
typedef enum RelationAccessMode
{
RELATION_NOT_ACCESSED,
RELATION_SEQUENTIAL_ACCESSED,
RELATION_PARALLEL_ACCESSED
} RelationAccessMode;
extern void AllocateRelationAccessHash(void);
extern void ResetRelationAccessHash(void);
extern void AssociatePlacementAccessWithRelation(ShardPlacement *placement,
ShardPlacementAccessType accessType);
extern void RecordParallelSelectAccess(Oid relationId);
extern void RecordRelationParallelSelectAccessForTask(Task *task);
extern void RecordRelationParallelModifyAccessForTask(Task *task);
extern void RecordParallelModifyAccess(Oid relationId);
extern void RecordParallelDDLAccess(Oid relationId);
extern void RecordRelationParallelDDLAccessForTask(Task *task);
extern RelationAccessMode GetRelationDDLAccessMode(Oid relationId);
extern RelationAccessMode GetRelationDMLAccessMode(Oid relationId);
extern RelationAccessMode GetRelationSelectAccessMode(Oid relationId);
extern bool ShouldRecordRelationAccess(void);
#endif /* RELATION_ACCESS_TRACKING_H_ */

View File

@ -1122,13 +1122,11 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass
partitioning_locks | relation | AccessShareLock
partitioning_locks_2009 | relation | AccessExclusiveLock
partitioning_locks_2009 | relation | AccessShareLock
partitioning_locks_2009 | relation | RowExclusiveLock
partitioning_locks_2009 | relation | ShareLock
partitioning_locks_2010 | relation | AccessExclusiveLock
partitioning_locks_2010 | relation | AccessShareLock
partitioning_locks_2010 | relation | RowExclusiveLock
partitioning_locks_2010 | relation | ShareLock
(10 rows)
(8 rows)
COMMIT;
-- test shard resource locks with master_modify_multiple_shards

View File

@ -294,7 +294,7 @@ ROLLBACK;
-- Test cancelling behaviour. See https://github.com/citusdata/citus/pull/1905.
-- Repeating it multiple times to increase the chance of failure before PR #1905.
SET client_min_messages TO ERROR;
alter system set deadlock_timeout TO '100ms';
alter system set deadlock_timeout TO '250ms';
SELECT pg_reload_conf();
pg_reload_conf
----------------

View File

@ -302,7 +302,7 @@ ROLLBACK;
-- Test cancelling behaviour. See https://github.com/citusdata/citus/pull/1905.
-- Repeating it multiple times to increase the chance of failure before PR #1905.
SET client_min_messages TO ERROR;
alter system set deadlock_timeout TO '100ms';
alter system set deadlock_timeout TO '250ms';
SELECT pg_reload_conf();
pg_reload_conf
----------------

View File

@ -0,0 +1,996 @@
---
--- tests around access tracking within transaction blocks
---
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int AS server_version;
server_version
----------------
10
(1 row)
CREATE SCHEMA access_tracking;
SET search_path TO 'access_tracking';
CREATE OR REPLACE FUNCTION relation_select_access_mode(relationId Oid)
RETURNS int
LANGUAGE C STABLE STRICT
AS 'citus', $$relation_select_access_mode$$;
CREATE OR REPLACE FUNCTION relation_dml_access_mode(relationId Oid)
RETURNS int
LANGUAGE C STABLE STRICT
AS 'citus', $$relation_dml_access_mode$$;
CREATE OR REPLACE FUNCTION relation_ddl_access_mode(relationId Oid)
RETURNS int
LANGUAGE C STABLE STRICT
AS 'citus', $$relation_ddl_access_mode$$;
CREATE OR REPLACE FUNCTION relation_access_mode_to_text(relationShardAccess int)
RETURNS text AS
$$
BEGIN
IF relationShardAccess = 0 THEN
RETURN 'not_accessed';
ELSIF relationShardAccess = 1 THEN
RETURN 'sequential_access';
ELSE
RETURN 'parallel_access';
END IF;
END;
$$ LANGUAGE 'plpgsql' IMMUTABLE;
CREATE VIEW relation_acesses AS
SELECT table_name,
relation_access_mode_to_text(relation_select_access_mode(table_name::regclass)) as select_access,
relation_access_mode_to_text(relation_dml_access_mode(table_name::regclass)) as dml_access,
relation_access_mode_to_text(relation_ddl_access_mode(table_name::regclass)) as ddl_access
FROM
((SELECT 'table_' || i as table_name FROM generate_series(1, 7) i) UNION (SELECT 'partitioning_test') UNION (SELECT 'partitioning_test_2009') UNION (SELECT 'partitioning_test_2010')) tables;
SET citus.shard_replication_factor TO 1;
CREATE TABLE table_1 (key int, value int);
SELECT create_distributed_table('table_1', 'key');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_2 (key int, value int);
SELECT create_distributed_table('table_2', 'key');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_3 (key int, value int);
SELECT create_distributed_table('table_3', 'key');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_4 (key int, value int);
SELECT create_distributed_table('table_4', 'key');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_5 (key int, value int);
SELECT create_distributed_table('table_5', 'key');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_6 (key int, value int);
SELECT create_reference_Table('table_6');
create_reference_table
------------------------
(1 row)
INSERT INTO table_1 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_2 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_4 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_5 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_6 SELECT i, i FROM generate_series(0,100) i;
-- create_distributed_table works fine
BEGIN;
CREATE TABLE table_7 (key int, value int);
SELECT create_distributed_table('table_7', 'key');
create_distributed_table
--------------------------
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_7') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-----------------
table_7 | not_accessed | not_accessed | parallel_access
(1 row)
COMMIT;
-- outisde the transaction blocks, the function always returns zero
SELECT count(*) FROM table_1;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+--------------
table_1 | not_accessed | not_accessed | not_accessed
(1 row)
-- a very simple test that first checks sequential
-- and parallel SELECTs,DMLs, and DDLs
BEGIN;
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+--------------
table_1 | not_accessed | not_accessed | not_accessed
(1 row)
SELECT count(*) FROM table_1 WHERE key = 1;
count
-------
1
(1 row)
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
(1 row)
SELECT count(*) FROM table_1 WHERE key = 1 OR key = 2;
count
-------
2
(1 row)
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
(1 row)
INSERT INTO table_1 VALUES (1,1);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-----------------+-------------------+--------------
table_1 | parallel_access | sequential_access | not_accessed
(1 row)
INSERT INTO table_1 VALUES (1,1), (2,2);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-----------------+-------------------+--------------
table_1 | parallel_access | sequential_access | not_accessed
(1 row)
ALTER TABLE table_1 ADD COLUMN test_col INT;
-- now see that the other tables are not accessed at all
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-----------------+-------------------+-----------------
table_1 | parallel_access | sequential_access | parallel_access
(1 row)
ROLLBACK;
-- this test shows that even if two multiple single shard
-- commands executed, we can treat the transaction as sequential
BEGIN;
SELECT count(*) FROM table_1 WHERE key = 1;
count
-------
1
(1 row)
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
(1 row)
SELECT count(*) FROM table_1 WHERE key = 2;
count
-------
1
(1 row)
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
(1 row)
INSERT INTO table_1 VALUES (1,1);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_1 | sequential_access | sequential_access | not_accessed
(1 row)
INSERT INTO table_1 VALUES (2,2);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_1 | sequential_access | sequential_access | not_accessed
(1 row)
ROLLBACK;
-- a sample DDL example
BEGIN;
ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-----------------
table_1 | not_accessed | not_accessed | parallel_access
(1 row)
ROLLBACK;
-- a simple join touches single shard per table
BEGIN;
SELECT
count(*)
FROM
table_1, table_2, table_3, table_4, table_5
WHERE
table_1.key = table_2.key AND table_2.key = table_3.key AND
table_3.key = table_4.key AND table_4.key = table_5.key AND
table_1.key = 1;
count
-------
1
(1 row)
SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
table_2 | sequential_access | not_accessed | not_accessed
table_3 | sequential_access | not_accessed | not_accessed
table_4 | sequential_access | not_accessed | not_accessed
table_5 | sequential_access | not_accessed | not_accessed
table_6 | not_accessed | not_accessed | not_accessed
table_7 | not_accessed | not_accessed | not_accessed
(7 rows)
ROLLBACK;
-- a simple real-time join touches all shard per table
BEGIN;
SELECT
count(*)
FROM
table_1, table_2
WHERE
table_1.key = table_2.key;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
(2 rows)
ROLLBACK;
-- a simple real-time join touches all shard per table
-- in sequential mode
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT
count(*)
FROM
table_1, table_2
WHERE
table_1.key = table_2.key;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
table_2 | sequential_access | not_accessed | not_accessed
(2 rows)
ROLLBACK;
-- a simple subquery pushdown that touches all shards
BEGIN;
SELECT
count(*)
FROM
(
SELECT
random()
FROM
table_1, table_2, table_3, table_4, table_5
WHERE
table_1.key = table_2.key AND table_2.key = table_3.key AND
table_3.key = table_4.key AND table_4.key = table_5.key
) as foo;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
table_3 | parallel_access | not_accessed | not_accessed
table_4 | parallel_access | not_accessed | not_accessed
table_5 | parallel_access | not_accessed | not_accessed
table_6 | not_accessed | not_accessed | not_accessed
table_7 | not_accessed | not_accessed | not_accessed
(7 rows)
ROLLBACK;
-- simple multi shard update both sequential and parallel modes
-- note that in multi shard modify mode we always add select
-- access for all the shards accessed. But, sequential mode is OK
BEGIN;
UPDATE table_1 SET value = 15;
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | parallel_access | not_accessed
(1 row)
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
UPDATE table_2 SET value = 15;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_1 | parallel_access | parallel_access | not_accessed
table_2 | sequential_access | sequential_access | not_accessed
(2 rows)
ROLLBACK;
-- now UPDATE/DELETE with subselect pushdown
BEGIN;
UPDATE
table_1 SET value = 15
WHERE key IN (SELECT key FROM table_2 JOIN table_3 USING (key) WHERE table_2.value = 15);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | parallel_access | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
table_3 | parallel_access | not_accessed | not_accessed
(3 rows)
ROLLBACK;
-- INSERT .. SELECT pushdown
BEGIN;
INSERT INTO table_2 SELECT * FROM table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | not_accessed | parallel_access | not_accessed
(2 rows)
ROLLBACK;
-- INSERT .. SELECT pushdown in sequential mode should be OK
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
INSERT INTO table_2 SELECT * FROM table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
table_2 | not_accessed | sequential_access | not_accessed
(2 rows)
ROLLBACK;
-- coordinator INSERT .. SELECT
BEGIN;
INSERT INTO table_2 SELECT * FROM table_1 OFFSET 0;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | not_accessed | parallel_access | not_accessed
(2 rows)
ROLLBACK;
-- recursively planned SELECT
BEGIN;
SELECT
count(*)
FROM
(
SELECT
random()
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
OFFSET 0
) as foo;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
(2 rows)
ROLLBACK;
-- recursively planned SELECT and coordinator INSERT .. SELECT
BEGIN;
INSERT INTO table_3 (key)
SELECT
*
FROM
(
SELECT
random() * 1000
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
OFFSET 0
) as foo;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
table_3 | not_accessed | parallel_access | not_accessed
(3 rows)
ROLLBACK;
-- recursively planned SELECT and coordinator INSERT .. SELECT
-- but modifies single shard, marked as sequential operation
BEGIN;
INSERT INTO table_3 (key)
SELECT
*
FROM
(
SELECT
random() * 1000
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
AND table_1.key = 1
OFFSET 0
) as foo;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
table_2 | sequential_access | not_accessed | not_accessed
table_3 | not_accessed | sequential_access | not_accessed
(3 rows)
ROLLBACK;
-- recursively planned SELECT and recursively planned multi-shard DELETE
BEGIN;
DELETE FROM table_3 where key IN
(
SELECT
*
FROM
(
SELECT
table_1.key
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
OFFSET 0
) as foo
) AND value IN (SELECT key FROM table_4);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3', 'table_4') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
table_3 | parallel_access | parallel_access | not_accessed
table_4 | parallel_access | not_accessed | not_accessed
(4 rows)
ROLLBACK;
-- copy out
BEGIN;
COPY (SELECT * FROM table_1 WHERE key IN (1,2,3) ORDER BY 1) TO stdout;
1 1
2 2
3 3
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
(1 row)
ROLLBACK;
-- copy in
BEGIN;
COPY table_1 FROM STDIN WITH CSV;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+-----------------+--------------
table_1 | not_accessed | parallel_access | not_accessed
(1 row)
ROLLBACK;
-- copy in single shard
BEGIN;
COPY table_1 FROM STDIN WITH CSV;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+-------------------+--------------
table_1 | not_accessed | sequential_access | not_accessed
(1 row)
ROLLBACK;
-- reference table accesses should always be a sequential
BEGIN;
SELECT count(*) FROM table_6;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_6');
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_6 | sequential_access | not_accessed | not_accessed
(1 row)
UPDATE table_6 SET value = 15;
SELECT * FROM relation_acesses WHERE table_name IN ('table_6');
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_6 | sequential_access | sequential_access | not_accessed
(1 row)
ALTER TABLE table_6 ADD COLUMN x INT;
SELECT * FROM relation_acesses WHERE table_name IN ('table_6');
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+-------------------
table_6 | sequential_access | sequential_access | sequential_access
(1 row)
ROLLBACK;
-- reference table join with a distributed table
BEGIN;
SELECT count(*) FROM table_1 JOIN table_6 USING(key);
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_6', 'table_1');
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_6 | parallel_access | not_accessed | not_accessed
(2 rows)
ROLLBACK;
-- TRUNCATE should be DDL
BEGIN;
TRUNCATE table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-----------------
table_1 | not_accessed | not_accessed | parallel_access
(1 row)
ROLLBACK;
-- TRUNCATE can be a sequential DDL
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
TRUNCATE table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-------------------
table_1 | not_accessed | not_accessed | sequential_access
(1 row)
ROLLBACK;
-- TRUNCATE on a reference table should be sequential
BEGIN;
TRUNCATE table_6;
SELECT * FROM relation_acesses WHERE table_name IN ('table_6') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-------------------
table_6 | not_accessed | not_accessed | sequential_access
(1 row)
ROLLBACK;
-- creating foreign keys should consider adding the placement accesses for the referenced table
ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key);
BEGIN;
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-----------------
table_1 | not_accessed | not_accessed | parallel_access
table_2 | not_accessed | not_accessed | parallel_access
(2 rows)
ROLLBACK;
-- creating foreign keys should consider adding the placement accesses for the referenced table
-- in sequential mode as well
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-------------------
table_1 | not_accessed | not_accessed | sequential_access
table_2 | not_accessed | not_accessed | sequential_access
(2 rows)
ROLLBACK;
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
SELECT create_distributed_table('partitioning_test', 'id');
create_distributed_table
--------------------------
(1 row)
-- Adding partition tables via CREATE TABLE should have DDL access the partitioned table as well
BEGIN;
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+---------------+--------------+-----------------
partitioning_test | not_accessed | not_accessed | parallel_access
partitioning_test_2009 | not_accessed | not_accessed | parallel_access
(2 rows)
ROLLBACK;
-- Adding partition tables via ATTACH PARTITION on local tables should have DDL access the partitioned table as well
CREATE TABLE partitioning_test_2009 AS SELECT * FROM partitioning_test;
BEGIN;
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+---------------+--------------+-----------------
partitioning_test | not_accessed | not_accessed | parallel_access
partitioning_test_2009 | not_accessed | not_accessed | parallel_access
(2 rows)
COMMIT;
-- Adding partition tables via ATTACH PARTITION on distributed tables should have DDL access the partitioned table as well
CREATE TABLE partitioning_test_2010 AS SELECT * FROM partitioning_test;
SELECT create_distributed_table('partitioning_test_2010', 'id');
create_distributed_table
--------------------------
(1 row)
BEGIN;
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2010 FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+---------------+--------------+-----------------
partitioning_test | not_accessed | not_accessed | parallel_access
partitioning_test_2010 | not_accessed | not_accessed | parallel_access
(2 rows)
COMMIT;
-- reading from partitioned table marks all of its partitions
BEGIN;
SELECT count(*) FROM partitioning_test;
count
-------
0
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+-----------------+--------------+--------------
partitioning_test | parallel_access | not_accessed | not_accessed
partitioning_test_2009 | parallel_access | not_accessed | not_accessed
partitioning_test_2010 | parallel_access | not_accessed | not_accessed
(3 rows)
COMMIT;
-- reading from partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM partitioning_test;
count
-------
0
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+-------------------+--------------+--------------
partitioning_test | sequential_access | not_accessed | not_accessed
partitioning_test_2009 | sequential_access | not_accessed | not_accessed
partitioning_test_2010 | sequential_access | not_accessed | not_accessed
(3 rows)
COMMIT;
-- updating partitioned table marks all of its partitions
BEGIN;
UPDATE partitioning_test SET time = now();
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+-----------------+-----------------+--------------
partitioning_test | parallel_access | parallel_access | not_accessed
partitioning_test_2009 | parallel_access | parallel_access | not_accessed
partitioning_test_2010 | parallel_access | parallel_access | not_accessed
(3 rows)
COMMIT;
-- updating partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
UPDATE partitioning_test SET time = now();
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+-------------------+-------------------+--------------
partitioning_test | sequential_access | sequential_access | not_accessed
partitioning_test_2009 | sequential_access | sequential_access | not_accessed
partitioning_test_2010 | sequential_access | sequential_access | not_accessed
(3 rows)
COMMIT;
-- DDLs on partitioned table marks all of its partitions
BEGIN;
ALTER TABLE partitioning_test ADD COLUMN X INT;
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+---------------+--------------+-----------------
partitioning_test | not_accessed | not_accessed | parallel_access
partitioning_test_2009 | not_accessed | not_accessed | parallel_access
partitioning_test_2010 | not_accessed | not_accessed | parallel_access
(3 rows)
ROLLBACK;
-- DDLs on partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
ALTER TABLE partitioning_test ADD COLUMN X INT;
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+---------------+--------------+-------------------
partitioning_test | not_accessed | not_accessed | sequential_access
partitioning_test_2009 | not_accessed | not_accessed | sequential_access
partitioning_test_2010 | not_accessed | not_accessed | sequential_access
(3 rows)
ROLLBACK;
-- reading from partition table marks its parent
BEGIN;
SELECT count(*) FROM partitioning_test_2009;
count
-------
0
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+-----------------+--------------+--------------
partitioning_test | parallel_access | not_accessed | not_accessed
partitioning_test_2009 | parallel_access | not_accessed | not_accessed
partitioning_test_2010 | not_accessed | not_accessed | not_accessed
(3 rows)
COMMIT;
-- rreading from partition table marks its parent with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM partitioning_test_2009;
count
-------
0
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+-------------------+--------------+--------------
partitioning_test | sequential_access | not_accessed | not_accessed
partitioning_test_2009 | sequential_access | not_accessed | not_accessed
partitioning_test_2010 | not_accessed | not_accessed | not_accessed
(3 rows)
COMMIT;
-- updating from partition table marks its parent
BEGIN;
UPDATE partitioning_test_2009 SET time = now();
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+-----------------+-----------------+--------------
partitioning_test | parallel_access | parallel_access | not_accessed
partitioning_test_2009 | parallel_access | parallel_access | not_accessed
partitioning_test_2010 | not_accessed | not_accessed | not_accessed
(3 rows)
COMMIT;
-- updating from partition table marks its parent sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
UPDATE partitioning_test_2009 SET time = now();
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+-------------------+-------------------+--------------
partitioning_test | sequential_access | sequential_access | not_accessed
partitioning_test_2009 | sequential_access | sequential_access | not_accessed
partitioning_test_2010 | not_accessed | not_accessed | not_accessed
(3 rows)
COMMIT;
-- DDLs on partition table marks its parent
BEGIN;
CREATE INDEX i1000000 ON partitioning_test_2009 (id);
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+---------------+--------------+-----------------
partitioning_test | not_accessed | not_accessed | parallel_access
partitioning_test_2009 | not_accessed | not_accessed | parallel_access
partitioning_test_2010 | not_accessed | not_accessed | not_accessed
(3 rows)
ROLLBACK;
-- DDLs on partition table marks its parent in sequential mode
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
CREATE INDEX i1000000 ON partitioning_test_2009 (id);
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------------------+---------------+--------------+-------------------
partitioning_test | not_accessed | not_accessed | sequential_access
partitioning_test_2009 | not_accessed | not_accessed | sequential_access
partitioning_test_2010 | not_accessed | not_accessed | not_accessed
(3 rows)
ROLLBACK;
-- TRUNCATE CASCADE works fine
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
BEGIN;
TRUNCATE table_1 CASCADE;
NOTICE: truncate cascades to table "table_2"
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-----------------
table_1 | not_accessed | not_accessed | parallel_access
table_2 | not_accessed | not_accessed | parallel_access
(2 rows)
ROLLBACK;
-- CTEs with SELECT only should work fine
BEGIN;
WITH cte AS (SELECT count(*) FROM table_1)
SELECT * FROM cte;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
(1 row)
COMMIT;
-- CTEs with SELECT only in sequential mode should work fine
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
WITH cte AS (SELECT count(*) FROM table_1)
SELECT * FROM cte;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
(1 row)
COMMIT;
-- modifying CTEs should work fine with multi-row inserts, which are by default in sequential
BEGIN;
WITH cte_1 AS (INSERT INTO table_1 VALUES (1000,1000), (1001, 1001), (1002, 1002) RETURNING *)
SELECT * FROM cte_1 ORDER BY 1;
key | value
------+-------
1000 | 1000
1001 | 1001
1002 | 1002
(3 rows)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+-------------------+--------------
table_1 | not_accessed | sequential_access | not_accessed
(1 row)
ROLLBACK;
-- modifying CTEs should work fine with parallel mode
BEGIN;
WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *)
SELECT count(*) FROM cte_1 ORDER BY 1;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | parallel_access | not_accessed
(1 row)
ROLLBACK;
-- modifying CTEs should work fine with sequential mode
BEGIN;
WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *)
SELECT count(*) FROM cte_1 ORDER BY 1;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | parallel_access | not_accessed
(1 row)
ROLLBACK;
-- create distributed table with data loading
-- should mark both parallel dml and parallel ddl
DROP TABLE table_3;
CREATE TABLE table_3 (key int, value int);
INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i;
BEGIN;
SELECT create_distributed_table('table_3', 'key');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_3') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+-----------------+-----------------
table_3 | not_accessed | parallel_access | parallel_access
(1 row)
COMMIT;
SET search_path TO 'public';
DROP SCHEMA access_tracking CASCADE;
NOTICE: drop cascades to 13 other objects
DETAIL: drop cascades to function access_tracking.relation_select_access_mode(oid)
drop cascades to function access_tracking.relation_dml_access_mode(oid)
drop cascades to function access_tracking.relation_ddl_access_mode(oid)
drop cascades to function access_tracking.relation_access_mode_to_text(integer)
drop cascades to view access_tracking.relation_acesses
drop cascades to table access_tracking.table_1
drop cascades to table access_tracking.table_2
drop cascades to table access_tracking.table_4
drop cascades to table access_tracking.table_5
drop cascades to table access_tracking.table_6
drop cascades to table access_tracking.table_7
drop cascades to table access_tracking.partitioning_test
drop cascades to table access_tracking.table_3

View File

@ -0,0 +1,930 @@
---
--- tests around access tracking within transaction blocks
---
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int AS server_version;
server_version
----------------
9
(1 row)
CREATE SCHEMA access_tracking;
SET search_path TO 'access_tracking';
CREATE OR REPLACE FUNCTION relation_select_access_mode(relationId Oid)
RETURNS int
LANGUAGE C STABLE STRICT
AS 'citus', $$relation_select_access_mode$$;
CREATE OR REPLACE FUNCTION relation_dml_access_mode(relationId Oid)
RETURNS int
LANGUAGE C STABLE STRICT
AS 'citus', $$relation_dml_access_mode$$;
CREATE OR REPLACE FUNCTION relation_ddl_access_mode(relationId Oid)
RETURNS int
LANGUAGE C STABLE STRICT
AS 'citus', $$relation_ddl_access_mode$$;
CREATE OR REPLACE FUNCTION relation_access_mode_to_text(relationShardAccess int)
RETURNS text AS
$$
BEGIN
IF relationShardAccess = 0 THEN
RETURN 'not_accessed';
ELSIF relationShardAccess = 1 THEN
RETURN 'sequential_access';
ELSE
RETURN 'parallel_access';
END IF;
END;
$$ LANGUAGE 'plpgsql' IMMUTABLE;
CREATE VIEW relation_acesses AS
SELECT table_name,
relation_access_mode_to_text(relation_select_access_mode(table_name::regclass)) as select_access,
relation_access_mode_to_text(relation_dml_access_mode(table_name::regclass)) as dml_access,
relation_access_mode_to_text(relation_ddl_access_mode(table_name::regclass)) as ddl_access
FROM
((SELECT 'table_' || i as table_name FROM generate_series(1, 7) i) UNION (SELECT 'partitioning_test') UNION (SELECT 'partitioning_test_2009') UNION (SELECT 'partitioning_test_2010')) tables;
SET citus.shard_replication_factor TO 1;
CREATE TABLE table_1 (key int, value int);
SELECT create_distributed_table('table_1', 'key');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_2 (key int, value int);
SELECT create_distributed_table('table_2', 'key');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_3 (key int, value int);
SELECT create_distributed_table('table_3', 'key');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_4 (key int, value int);
SELECT create_distributed_table('table_4', 'key');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_5 (key int, value int);
SELECT create_distributed_table('table_5', 'key');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_6 (key int, value int);
SELECT create_reference_Table('table_6');
create_reference_table
------------------------
(1 row)
INSERT INTO table_1 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_2 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_4 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_5 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_6 SELECT i, i FROM generate_series(0,100) i;
-- create_distributed_table works fine
BEGIN;
CREATE TABLE table_7 (key int, value int);
SELECT create_distributed_table('table_7', 'key');
create_distributed_table
--------------------------
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_7') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-----------------
table_7 | not_accessed | not_accessed | parallel_access
(1 row)
COMMIT;
-- outisde the transaction blocks, the function always returns zero
SELECT count(*) FROM table_1;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+--------------
table_1 | not_accessed | not_accessed | not_accessed
(1 row)
-- a very simple test that first checks sequential
-- and parallel SELECTs,DMLs, and DDLs
BEGIN;
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+--------------
table_1 | not_accessed | not_accessed | not_accessed
(1 row)
SELECT count(*) FROM table_1 WHERE key = 1;
count
-------
1
(1 row)
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
(1 row)
SELECT count(*) FROM table_1 WHERE key = 1 OR key = 2;
count
-------
2
(1 row)
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
(1 row)
INSERT INTO table_1 VALUES (1,1);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-----------------+-------------------+--------------
table_1 | parallel_access | sequential_access | not_accessed
(1 row)
INSERT INTO table_1 VALUES (1,1), (2,2);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-----------------+-------------------+--------------
table_1 | parallel_access | sequential_access | not_accessed
(1 row)
ALTER TABLE table_1 ADD COLUMN test_col INT;
-- now see that the other tables are not accessed at all
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-----------------+-------------------+-----------------
table_1 | parallel_access | sequential_access | parallel_access
(1 row)
ROLLBACK;
-- this test shows that even if two multiple single shard
-- commands executed, we can treat the transaction as sequential
BEGIN;
SELECT count(*) FROM table_1 WHERE key = 1;
count
-------
1
(1 row)
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
(1 row)
SELECT count(*) FROM table_1 WHERE key = 2;
count
-------
1
(1 row)
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
(1 row)
INSERT INTO table_1 VALUES (1,1);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_1 | sequential_access | sequential_access | not_accessed
(1 row)
INSERT INTO table_1 VALUES (2,2);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_1 | sequential_access | sequential_access | not_accessed
(1 row)
ROLLBACK;
-- a sample DDL example
BEGIN;
ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-----------------
table_1 | not_accessed | not_accessed | parallel_access
(1 row)
ROLLBACK;
-- a simple join touches single shard per table
BEGIN;
SELECT
count(*)
FROM
table_1, table_2, table_3, table_4, table_5
WHERE
table_1.key = table_2.key AND table_2.key = table_3.key AND
table_3.key = table_4.key AND table_4.key = table_5.key AND
table_1.key = 1;
count
-------
1
(1 row)
SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
table_2 | sequential_access | not_accessed | not_accessed
table_3 | sequential_access | not_accessed | not_accessed
table_4 | sequential_access | not_accessed | not_accessed
table_5 | sequential_access | not_accessed | not_accessed
table_6 | not_accessed | not_accessed | not_accessed
table_7 | not_accessed | not_accessed | not_accessed
(7 rows)
ROLLBACK;
-- a simple real-time join touches all shard per table
BEGIN;
SELECT
count(*)
FROM
table_1, table_2
WHERE
table_1.key = table_2.key;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
(2 rows)
ROLLBACK;
-- a simple real-time join touches all shard per table
-- in sequential mode
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT
count(*)
FROM
table_1, table_2
WHERE
table_1.key = table_2.key;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
table_2 | sequential_access | not_accessed | not_accessed
(2 rows)
ROLLBACK;
-- a simple subquery pushdown that touches all shards
BEGIN;
SELECT
count(*)
FROM
(
SELECT
random()
FROM
table_1, table_2, table_3, table_4, table_5
WHERE
table_1.key = table_2.key AND table_2.key = table_3.key AND
table_3.key = table_4.key AND table_4.key = table_5.key
) as foo;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
table_3 | parallel_access | not_accessed | not_accessed
table_4 | parallel_access | not_accessed | not_accessed
table_5 | parallel_access | not_accessed | not_accessed
table_6 | not_accessed | not_accessed | not_accessed
table_7 | not_accessed | not_accessed | not_accessed
(7 rows)
ROLLBACK;
-- simple multi shard update both sequential and parallel modes
-- note that in multi shard modify mode we always add select
-- access for all the shards accessed. But, sequential mode is OK
BEGIN;
UPDATE table_1 SET value = 15;
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | parallel_access | not_accessed
(1 row)
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
UPDATE table_2 SET value = 15;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_1 | parallel_access | parallel_access | not_accessed
table_2 | sequential_access | sequential_access | not_accessed
(2 rows)
ROLLBACK;
-- now UPDATE/DELETE with subselect pushdown
BEGIN;
UPDATE
table_1 SET value = 15
WHERE key IN (SELECT key FROM table_2 JOIN table_3 USING (key) WHERE table_2.value = 15);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | parallel_access | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
table_3 | parallel_access | not_accessed | not_accessed
(3 rows)
ROLLBACK;
-- INSERT .. SELECT pushdown
BEGIN;
INSERT INTO table_2 SELECT * FROM table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | not_accessed | parallel_access | not_accessed
(2 rows)
ROLLBACK;
-- INSERT .. SELECT pushdown in sequential mode should be OK
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
INSERT INTO table_2 SELECT * FROM table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
table_2 | not_accessed | sequential_access | not_accessed
(2 rows)
ROLLBACK;
-- coordinator INSERT .. SELECT
BEGIN;
INSERT INTO table_2 SELECT * FROM table_1 OFFSET 0;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | not_accessed | parallel_access | not_accessed
(2 rows)
ROLLBACK;
-- recursively planned SELECT
BEGIN;
SELECT
count(*)
FROM
(
SELECT
random()
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
OFFSET 0
) as foo;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
(2 rows)
ROLLBACK;
-- recursively planned SELECT and coordinator INSERT .. SELECT
BEGIN;
INSERT INTO table_3 (key)
SELECT
*
FROM
(
SELECT
random() * 1000
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
OFFSET 0
) as foo;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
table_3 | not_accessed | parallel_access | not_accessed
(3 rows)
ROLLBACK;
-- recursively planned SELECT and coordinator INSERT .. SELECT
-- but modifies single shard, marked as sequential operation
BEGIN;
INSERT INTO table_3 (key)
SELECT
*
FROM
(
SELECT
random() * 1000
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
AND table_1.key = 1
OFFSET 0
) as foo;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
table_2 | sequential_access | not_accessed | not_accessed
table_3 | not_accessed | sequential_access | not_accessed
(3 rows)
ROLLBACK;
-- recursively planned SELECT and recursively planned multi-shard DELETE
BEGIN;
DELETE FROM table_3 where key IN
(
SELECT
*
FROM
(
SELECT
table_1.key
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
OFFSET 0
) as foo
) AND value IN (SELECT key FROM table_4);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3', 'table_4') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_2 | parallel_access | not_accessed | not_accessed
table_3 | parallel_access | parallel_access | not_accessed
table_4 | parallel_access | not_accessed | not_accessed
(4 rows)
ROLLBACK;
-- copy out
BEGIN;
COPY (SELECT * FROM table_1 WHERE key IN (1,2,3) ORDER BY 1) TO stdout;
1 1
2 2
3 3
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
(1 row)
ROLLBACK;
-- copy in
BEGIN;
COPY table_1 FROM STDIN WITH CSV;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+-----------------+--------------
table_1 | not_accessed | parallel_access | not_accessed
(1 row)
ROLLBACK;
-- copy in single shard
BEGIN;
COPY table_1 FROM STDIN WITH CSV;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+-------------------+--------------
table_1 | not_accessed | sequential_access | not_accessed
(1 row)
ROLLBACK;
-- reference table accesses should always be a sequential
BEGIN;
SELECT count(*) FROM table_6;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_6');
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_6 | sequential_access | not_accessed | not_accessed
(1 row)
UPDATE table_6 SET value = 15;
SELECT * FROM relation_acesses WHERE table_name IN ('table_6');
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+--------------
table_6 | sequential_access | sequential_access | not_accessed
(1 row)
ALTER TABLE table_6 ADD COLUMN x INT;
SELECT * FROM relation_acesses WHERE table_name IN ('table_6');
table_name | select_access | dml_access | ddl_access
------------+-------------------+-------------------+-------------------
table_6 | sequential_access | sequential_access | sequential_access
(1 row)
ROLLBACK;
-- reference table join with a distributed table
BEGIN;
SELECT count(*) FROM table_1 JOIN table_6 USING(key);
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_6', 'table_1');
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
table_6 | parallel_access | not_accessed | not_accessed
(2 rows)
ROLLBACK;
-- TRUNCATE should be DDL
BEGIN;
TRUNCATE table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-----------------
table_1 | not_accessed | not_accessed | parallel_access
(1 row)
ROLLBACK;
-- TRUNCATE can be a sequential DDL
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
TRUNCATE table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-------------------
table_1 | not_accessed | not_accessed | sequential_access
(1 row)
ROLLBACK;
-- TRUNCATE on a reference table should be sequential
BEGIN;
TRUNCATE table_6;
SELECT * FROM relation_acesses WHERE table_name IN ('table_6') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-------------------
table_6 | not_accessed | not_accessed | sequential_access
(1 row)
ROLLBACK;
-- creating foreign keys should consider adding the placement accesses for the referenced table
ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key);
BEGIN;
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-----------------
table_1 | not_accessed | not_accessed | parallel_access
table_2 | not_accessed | not_accessed | parallel_access
(2 rows)
ROLLBACK;
-- creating foreign keys should consider adding the placement accesses for the referenced table
-- in sequential mode as well
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-------------------
table_1 | not_accessed | not_accessed | sequential_access
table_2 | not_accessed | not_accessed | sequential_access
(2 rows)
ROLLBACK;
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ...
^
SELECT create_distributed_table('partitioning_test', 'id');
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test', 'id');
^
-- Adding partition tables via CREATE TABLE should have DDL access the partitioned table as well
BEGIN;
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin...
^
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
-- Adding partition tables via ATTACH PARTITION on local tables should have DDL access the partitioned table as well
CREATE TABLE partitioning_test_2009 AS SELECT * FROM partitioning_test;
ERROR: relation "partitioning_test" does not exist
LINE 1: ...ATE TABLE partitioning_test_2009 AS SELECT * FROM partitioni...
^
BEGIN;
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
ERROR: syntax error at or near "ATTACH"
LINE 1: ALTER TABLE partitioning_test ATTACH PARTITION partitioning_...
^
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- Adding partition tables via ATTACH PARTITION on distributed tables should have DDL access the partitioned table as well
CREATE TABLE partitioning_test_2010 AS SELECT * FROM partitioning_test;
ERROR: relation "partitioning_test" does not exist
LINE 1: ...ATE TABLE partitioning_test_2010 AS SELECT * FROM partitioni...
^
SELECT create_distributed_table('partitioning_test_2010', 'id');
ERROR: relation "partitioning_test_2010" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test_2010', 'i...
^
BEGIN;
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2010 FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
ERROR: syntax error at or near "ATTACH"
LINE 1: ALTER TABLE partitioning_test ATTACH PARTITION partitioning_...
^
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- reading from partitioned table marks all of its partitions
BEGIN;
SELECT count(*) FROM partitioning_test;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT count(*) FROM partitioning_test;
^
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- reading from partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM partitioning_test;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT count(*) FROM partitioning_test;
^
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- updating partitioned table marks all of its partitions
BEGIN;
UPDATE partitioning_test SET time = now();
ERROR: relation "partitioning_test" does not exist
LINE 1: UPDATE partitioning_test SET time = now();
^
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- updating partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
UPDATE partitioning_test SET time = now();
ERROR: relation "partitioning_test" does not exist
LINE 1: UPDATE partitioning_test SET time = now();
^
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- DDLs on partitioned table marks all of its partitions
BEGIN;
ALTER TABLE partitioning_test ADD COLUMN X INT;
ERROR: relation "partitioning_test" does not exist
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
-- DDLs on partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
ALTER TABLE partitioning_test ADD COLUMN X INT;
ERROR: relation "partitioning_test" does not exist
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
-- reading from partition table marks its parent
BEGIN;
SELECT count(*) FROM partitioning_test_2009;
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: SELECT count(*) FROM partitioning_test_2009;
^
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- rreading from partition table marks its parent with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM partitioning_test_2009;
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: SELECT count(*) FROM partitioning_test_2009;
^
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- updating from partition table marks its parent
BEGIN;
UPDATE partitioning_test_2009 SET time = now();
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: UPDATE partitioning_test_2009 SET time = now();
^
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- updating from partition table marks its parent sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
UPDATE partitioning_test_2009 SET time = now();
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: UPDATE partitioning_test_2009 SET time = now();
^
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- DDLs on partition table marks its parent
BEGIN;
CREATE INDEX i1000000 ON partitioning_test_2009 (id);
ERROR: relation "partitioning_test_2009" does not exist
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
-- DDLs on partition table marks its parent in sequential mode
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
CREATE INDEX i1000000 ON partitioning_test_2009 (id);
ERROR: relation "partitioning_test_2009" does not exist
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
-- TRUNCATE CASCADE works fine
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
BEGIN;
TRUNCATE table_1 CASCADE;
NOTICE: truncate cascades to table "table_2"
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+--------------+-----------------
table_1 | not_accessed | not_accessed | parallel_access
table_2 | not_accessed | not_accessed | parallel_access
(2 rows)
ROLLBACK;
-- CTEs with SELECT only should work fine
BEGIN;
WITH cte AS (SELECT count(*) FROM table_1)
SELECT * FROM cte;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+--------------+--------------
table_1 | parallel_access | not_accessed | not_accessed
(1 row)
COMMIT;
-- CTEs with SELECT only in sequential mode should work fine
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
WITH cte AS (SELECT count(*) FROM table_1)
SELECT * FROM cte;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-------------------+--------------+--------------
table_1 | sequential_access | not_accessed | not_accessed
(1 row)
COMMIT;
-- modifying CTEs should work fine with multi-row inserts, which are by default in sequential
BEGIN;
WITH cte_1 AS (INSERT INTO table_1 VALUES (1000,1000), (1001, 1001), (1002, 1002) RETURNING *)
SELECT * FROM cte_1 ORDER BY 1;
key | value
------+-------
1000 | 1000
1001 | 1001
1002 | 1002
(3 rows)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+-------------------+--------------
table_1 | not_accessed | sequential_access | not_accessed
(1 row)
ROLLBACK;
-- modifying CTEs should work fine with parallel mode
BEGIN;
WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *)
SELECT count(*) FROM cte_1 ORDER BY 1;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | parallel_access | not_accessed
(1 row)
ROLLBACK;
-- modifying CTEs should work fine with sequential mode
BEGIN;
WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *)
SELECT count(*) FROM cte_1 ORDER BY 1;
count
-------
101
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+-----------------+-----------------+--------------
table_1 | parallel_access | parallel_access | not_accessed
(1 row)
ROLLBACK;
-- create distributed table with data loading
-- should mark both parallel dml and parallel ddl
DROP TABLE table_3;
CREATE TABLE table_3 (key int, value int);
INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i;
BEGIN;
SELECT create_distributed_table('table_3', 'key');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
SELECT * FROM relation_acesses WHERE table_name IN ('table_3') ORDER BY 1;
table_name | select_access | dml_access | ddl_access
------------+---------------+-----------------+-----------------
table_3 | not_accessed | parallel_access | parallel_access
(1 row)
COMMIT;
SET search_path TO 'public';
DROP SCHEMA access_tracking CASCADE;
NOTICE: drop cascades to 12 other objects
DETAIL: drop cascades to function access_tracking.relation_select_access_mode(oid)
drop cascades to function access_tracking.relation_dml_access_mode(oid)
drop cascades to function access_tracking.relation_ddl_access_mode(oid)
drop cascades to function access_tracking.relation_access_mode_to_text(integer)
drop cascades to view access_tracking.relation_acesses
drop cascades to table access_tracking.table_1
drop cascades to table access_tracking.table_2
drop cascades to table access_tracking.table_4
drop cascades to table access_tracking.table_5
drop cascades to table access_tracking.table_6
drop cascades to table access_tracking.table_7
drop cascades to table access_tracking.table_3

View File

@ -6,8 +6,8 @@
--
CREATE SCHEMA test_seq_ddl;
SET search_path TO 'test_seq_ddl';
SET citus.next_shard_id TO 1600;
SET citus.next_placement_id TO 1600;
SET citus.next_shard_id TO 16000;
SET citus.next_placement_id TO 16000;
-- this function simply checks the equality of the number of transactions in the
-- pg_dist_transaction and number of primary worker nodes
-- The function is useful to ensure that a single connection is opened per worker
@ -591,7 +591,7 @@ BEGIN;
(1 row)
\COPY test_seq_copy FROM STDIN DELIMITER AS ',';
ERROR: cannot establish a new connection for placement 1673, since DDL has been executed on a connection that is in use
ERROR: cannot establish a new connection for placement 16073, since DDL has been executed on a connection that is in use
CONTEXT: COPY test_seq_copy, line 2: "2,2"
ROLLBACK;
SELECT distributed_2PCs_are_equal_to_worker_count();

View File

@ -58,7 +58,7 @@ test: multi_subquery_complex_reference_clause multi_subquery_window_functions mu
test: multi_subquery_in_where_reference_clause
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql
test: multi_reference_table multi_select_for_update
test: multi_reference_table multi_select_for_update relation_access_tracking
test: multi_average_expression multi_working_columns multi_having_pushdown
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg

View File

@ -179,7 +179,7 @@ ROLLBACK;
-- Test cancelling behaviour. See https://github.com/citusdata/citus/pull/1905.
-- Repeating it multiple times to increase the chance of failure before PR #1905.
SET client_min_messages TO ERROR;
alter system set deadlock_timeout TO '100ms';
alter system set deadlock_timeout TO '250ms';
SELECT pg_reload_conf();
BEGIN;

View File

@ -0,0 +1,553 @@
---
--- tests around access tracking within transaction blocks
---
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int AS server_version;
CREATE SCHEMA access_tracking;
SET search_path TO 'access_tracking';
CREATE OR REPLACE FUNCTION relation_select_access_mode(relationId Oid)
RETURNS int
LANGUAGE C STABLE STRICT
AS 'citus', $$relation_select_access_mode$$;
CREATE OR REPLACE FUNCTION relation_dml_access_mode(relationId Oid)
RETURNS int
LANGUAGE C STABLE STRICT
AS 'citus', $$relation_dml_access_mode$$;
CREATE OR REPLACE FUNCTION relation_ddl_access_mode(relationId Oid)
RETURNS int
LANGUAGE C STABLE STRICT
AS 'citus', $$relation_ddl_access_mode$$;
CREATE OR REPLACE FUNCTION relation_access_mode_to_text(relationShardAccess int)
RETURNS text AS
$$
BEGIN
IF relationShardAccess = 0 THEN
RETURN 'not_accessed';
ELSIF relationShardAccess = 1 THEN
RETURN 'sequential_access';
ELSE
RETURN 'parallel_access';
END IF;
END;
$$ LANGUAGE 'plpgsql' IMMUTABLE;
CREATE VIEW relation_acesses AS
SELECT table_name,
relation_access_mode_to_text(relation_select_access_mode(table_name::regclass)) as select_access,
relation_access_mode_to_text(relation_dml_access_mode(table_name::regclass)) as dml_access,
relation_access_mode_to_text(relation_ddl_access_mode(table_name::regclass)) as ddl_access
FROM
((SELECT 'table_' || i as table_name FROM generate_series(1, 7) i) UNION (SELECT 'partitioning_test') UNION (SELECT 'partitioning_test_2009') UNION (SELECT 'partitioning_test_2010')) tables;
SET citus.shard_replication_factor TO 1;
CREATE TABLE table_1 (key int, value int);
SELECT create_distributed_table('table_1', 'key');
CREATE TABLE table_2 (key int, value int);
SELECT create_distributed_table('table_2', 'key');
CREATE TABLE table_3 (key int, value int);
SELECT create_distributed_table('table_3', 'key');
CREATE TABLE table_4 (key int, value int);
SELECT create_distributed_table('table_4', 'key');
CREATE TABLE table_5 (key int, value int);
SELECT create_distributed_table('table_5', 'key');
CREATE TABLE table_6 (key int, value int);
SELECT create_reference_Table('table_6');
INSERT INTO table_1 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_2 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_4 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_5 SELECT i, i FROM generate_series(0,100) i;
INSERT INTO table_6 SELECT i, i FROM generate_series(0,100) i;
-- create_distributed_table works fine
BEGIN;
CREATE TABLE table_7 (key int, value int);
SELECT create_distributed_table('table_7', 'key');
SELECT * FROM relation_acesses WHERE table_name IN ('table_7') ORDER BY 1;
COMMIT;
-- outisde the transaction blocks, the function always returns zero
SELECT count(*) FROM table_1;
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
-- a very simple test that first checks sequential
-- and parallel SELECTs,DMLs, and DDLs
BEGIN;
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
SELECT count(*) FROM table_1 WHERE key = 1;
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
SELECT count(*) FROM table_1 WHERE key = 1 OR key = 2;
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
INSERT INTO table_1 VALUES (1,1);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
INSERT INTO table_1 VALUES (1,1), (2,2);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
ALTER TABLE table_1 ADD COLUMN test_col INT;
-- now see that the other tables are not accessed at all
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
ROLLBACK;
-- this test shows that even if two multiple single shard
-- commands executed, we can treat the transaction as sequential
BEGIN;
SELECT count(*) FROM table_1 WHERE key = 1;
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
SELECT count(*) FROM table_1 WHERE key = 2;
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
INSERT INTO table_1 VALUES (1,1);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
INSERT INTO table_1 VALUES (2,2);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
ROLLBACK;
-- a sample DDL example
BEGIN;
ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key);
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
ROLLBACK;
-- a simple join touches single shard per table
BEGIN;
SELECT
count(*)
FROM
table_1, table_2, table_3, table_4, table_5
WHERE
table_1.key = table_2.key AND table_2.key = table_3.key AND
table_3.key = table_4.key AND table_4.key = table_5.key AND
table_1.key = 1;
SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1;
ROLLBACK;
-- a simple real-time join touches all shard per table
BEGIN;
SELECT
count(*)
FROM
table_1, table_2
WHERE
table_1.key = table_2.key;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
ROLLBACK;
-- a simple real-time join touches all shard per table
-- in sequential mode
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT
count(*)
FROM
table_1, table_2
WHERE
table_1.key = table_2.key;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
ROLLBACK;
-- a simple subquery pushdown that touches all shards
BEGIN;
SELECT
count(*)
FROM
(
SELECT
random()
FROM
table_1, table_2, table_3, table_4, table_5
WHERE
table_1.key = table_2.key AND table_2.key = table_3.key AND
table_3.key = table_4.key AND table_4.key = table_5.key
) as foo;
SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1;
ROLLBACK;
-- simple multi shard update both sequential and parallel modes
-- note that in multi shard modify mode we always add select
-- access for all the shards accessed. But, sequential mode is OK
BEGIN;
UPDATE table_1 SET value = 15;
SELECT * FROM relation_acesses WHERE table_name = 'table_1';
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
UPDATE table_2 SET value = 15;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
ROLLBACK;
-- now UPDATE/DELETE with subselect pushdown
BEGIN;
UPDATE
table_1 SET value = 15
WHERE key IN (SELECT key FROM table_2 JOIN table_3 USING (key) WHERE table_2.value = 15);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1;
ROLLBACK;
-- INSERT .. SELECT pushdown
BEGIN;
INSERT INTO table_2 SELECT * FROM table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
ROLLBACK;
-- INSERT .. SELECT pushdown in sequential mode should be OK
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
INSERT INTO table_2 SELECT * FROM table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
ROLLBACK;
-- coordinator INSERT .. SELECT
BEGIN;
INSERT INTO table_2 SELECT * FROM table_1 OFFSET 0;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
ROLLBACK;
-- recursively planned SELECT
BEGIN;
SELECT
count(*)
FROM
(
SELECT
random()
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
OFFSET 0
) as foo;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
ROLLBACK;
-- recursively planned SELECT and coordinator INSERT .. SELECT
BEGIN;
INSERT INTO table_3 (key)
SELECT
*
FROM
(
SELECT
random() * 1000
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
OFFSET 0
) as foo;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1;
ROLLBACK;
-- recursively planned SELECT and coordinator INSERT .. SELECT
-- but modifies single shard, marked as sequential operation
BEGIN;
INSERT INTO table_3 (key)
SELECT
*
FROM
(
SELECT
random() * 1000
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
AND table_1.key = 1
OFFSET 0
) as foo;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1;
ROLLBACK;
-- recursively planned SELECT and recursively planned multi-shard DELETE
BEGIN;
DELETE FROM table_3 where key IN
(
SELECT
*
FROM
(
SELECT
table_1.key
FROM
table_1, table_2
WHERE
table_1.key = table_2.key
OFFSET 0
) as foo
) AND value IN (SELECT key FROM table_4);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3', 'table_4') ORDER BY 1;
ROLLBACK;
-- copy out
BEGIN;
COPY (SELECT * FROM table_1 WHERE key IN (1,2,3) ORDER BY 1) TO stdout;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
ROLLBACK;
-- copy in
BEGIN;
COPY table_1 FROM STDIN WITH CSV;
1,1
2,2
3,3
\.
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
ROLLBACK;
-- copy in single shard
BEGIN;
COPY table_1 FROM STDIN WITH CSV;
1,1
1,2
1,3
\.
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
ROLLBACK;
-- reference table accesses should always be a sequential
BEGIN;
SELECT count(*) FROM table_6;
SELECT * FROM relation_acesses WHERE table_name IN ('table_6');
UPDATE table_6 SET value = 15;
SELECT * FROM relation_acesses WHERE table_name IN ('table_6');
ALTER TABLE table_6 ADD COLUMN x INT;
SELECT * FROM relation_acesses WHERE table_name IN ('table_6');
ROLLBACK;
-- reference table join with a distributed table
BEGIN;
SELECT count(*) FROM table_1 JOIN table_6 USING(key);
SELECT * FROM relation_acesses WHERE table_name IN ('table_6', 'table_1');
ROLLBACK;
-- TRUNCATE should be DDL
BEGIN;
TRUNCATE table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
ROLLBACK;
-- TRUNCATE can be a sequential DDL
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
TRUNCATE table_1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
ROLLBACK;
-- TRUNCATE on a reference table should be sequential
BEGIN;
TRUNCATE table_6;
SELECT * FROM relation_acesses WHERE table_name IN ('table_6') ORDER BY 1;
ROLLBACK;
-- creating foreign keys should consider adding the placement accesses for the referenced table
ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key);
BEGIN;
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
ROLLBACK;
-- creating foreign keys should consider adding the placement accesses for the referenced table
-- in sequential mode as well
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
ROLLBACK;
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
SELECT create_distributed_table('partitioning_test', 'id');
-- Adding partition tables via CREATE TABLE should have DDL access the partitioned table as well
BEGIN;
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1;
ROLLBACK;
-- Adding partition tables via ATTACH PARTITION on local tables should have DDL access the partitioned table as well
CREATE TABLE partitioning_test_2009 AS SELECT * FROM partitioning_test;
BEGIN;
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1;
COMMIT;
-- Adding partition tables via ATTACH PARTITION on distributed tables should have DDL access the partitioned table as well
CREATE TABLE partitioning_test_2010 AS SELECT * FROM partitioning_test;
SELECT create_distributed_table('partitioning_test_2010', 'id');
BEGIN;
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2010 FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2010') ORDER BY 1;
COMMIT;
-- reading from partitioned table marks all of its partitions
BEGIN;
SELECT count(*) FROM partitioning_test;
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
COMMIT;
-- reading from partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM partitioning_test;
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
COMMIT;
-- updating partitioned table marks all of its partitions
BEGIN;
UPDATE partitioning_test SET time = now();
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
COMMIT;
-- updating partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
UPDATE partitioning_test SET time = now();
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
COMMIT;
-- DDLs on partitioned table marks all of its partitions
BEGIN;
ALTER TABLE partitioning_test ADD COLUMN X INT;
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ROLLBACK;
-- DDLs on partitioned table sequentially marks all of its partitions with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
ALTER TABLE partitioning_test ADD COLUMN X INT;
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ROLLBACK;
-- reading from partition table marks its parent
BEGIN;
SELECT count(*) FROM partitioning_test_2009;
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
COMMIT;
-- rreading from partition table marks its parent with sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT count(*) FROM partitioning_test_2009;
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
COMMIT;
-- updating from partition table marks its parent
BEGIN;
UPDATE partitioning_test_2009 SET time = now();
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
COMMIT;
-- updating from partition table marks its parent sequential accesses
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
UPDATE partitioning_test_2009 SET time = now();
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
COMMIT;
-- DDLs on partition table marks its parent
BEGIN;
CREATE INDEX i1000000 ON partitioning_test_2009 (id);
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ROLLBACK;
-- DDLs on partition table marks its parent in sequential mode
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
CREATE INDEX i1000000 ON partitioning_test_2009 (id);
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') ORDER BY 1;
ROLLBACK;
-- TRUNCATE CASCADE works fine
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
BEGIN;
TRUNCATE table_1 CASCADE;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
ROLLBACK;
-- CTEs with SELECT only should work fine
BEGIN;
WITH cte AS (SELECT count(*) FROM table_1)
SELECT * FROM cte;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
COMMIT;
-- CTEs with SELECT only in sequential mode should work fine
BEGIN;
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
WITH cte AS (SELECT count(*) FROM table_1)
SELECT * FROM cte;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
COMMIT;
-- modifying CTEs should work fine with multi-row inserts, which are by default in sequential
BEGIN;
WITH cte_1 AS (INSERT INTO table_1 VALUES (1000,1000), (1001, 1001), (1002, 1002) RETURNING *)
SELECT * FROM cte_1 ORDER BY 1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
ROLLBACK;
-- modifying CTEs should work fine with parallel mode
BEGIN;
WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *)
SELECT count(*) FROM cte_1 ORDER BY 1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
ROLLBACK;
-- modifying CTEs should work fine with sequential mode
BEGIN;
WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *)
SELECT count(*) FROM cte_1 ORDER BY 1;
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
ROLLBACK;
-- create distributed table with data loading
-- should mark both parallel dml and parallel ddl
DROP TABLE table_3;
CREATE TABLE table_3 (key int, value int);
INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i;
BEGIN;
SELECT create_distributed_table('table_3', 'key');
SELECT * FROM relation_acesses WHERE table_name IN ('table_3') ORDER BY 1;
COMMIT;
SET search_path TO 'public';
DROP SCHEMA access_tracking CASCADE;

View File

@ -6,8 +6,8 @@
--
CREATE SCHEMA test_seq_ddl;
SET search_path TO 'test_seq_ddl';
SET citus.next_shard_id TO 1600;
SET citus.next_placement_id TO 1600;
SET citus.next_shard_id TO 16000;
SET citus.next_placement_id TO 16000;
-- this function simply checks the equality of the number of transactions in the
-- pg_dist_transaction and number of primary worker nodes