diff --git a/Makefile b/Makefile index 82e454fb8..7ac0416c1 100644 --- a/Makefile +++ b/Makefile @@ -68,11 +68,13 @@ OBJS = src/backend/distributed/shared_library_init.o \ src/backend/distributed/test/partitioning_utils.o \ src/backend/distributed/test/progress_utils.o \ src/backend/distributed/test/prune_shard_list.o \ + src/backend/distributed/test/relation_access_tracking.o \ src/backend/distributed/test/sequential_execution.o \ src/backend/distributed/transaction/backend_data.o \ src/backend/distributed/transaction/distributed_deadlock_detection.o \ src/backend/distributed/transaction/lock_graph.o \ src/backend/distributed/transaction/multi_shard_transaction.o \ + src/backend/distributed/transaction/relation_access_tracking.o \ src/backend/distributed/transaction/remote_transaction.o \ src/backend/distributed/transaction/transaction_management.o \ src/backend/distributed/transaction/transaction_recovery.o \ diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index f25acb931..aa283e013 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -66,6 +66,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_shard_transaction.h" #include "distributed/placement_connection.h" +#include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" #include "distributed/resource_lock.h" @@ -2167,6 +2168,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyOutState->fe_msgbuf = makeStringInfo(); copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState); copyDest->copyOutState = copyOutState; + copyDest->multiShardCopy = false; /* prepare functions to call on received tuples */ { @@ -2311,6 +2313,21 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) &shardConnectionsFound); if (!shardConnectionsFound) { + /* + * Keep track of multi shard accesses before opening connection + * the second shard. + */ + if (!copyDest->multiShardCopy && hash_get_num_entries(shardConnectionHash) == 2) + { + Oid relationId = copyDest->distributedRelationId; + + /* mark as multi shard to skip doing the same thing over and over */ + copyDest->multiShardCopy = true; + + /* when we see multiple shard connections, we mark COPY as parallel modify */ + RecordParallelModifyAccess(relationId); + } + /* open connections and initiate COPY on shard placements */ OpenCopyConnections(copyStatement, shardConnections, stopOnFailure, copyOutState->binary); diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 10780fda7..f3b49a4a1 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -19,6 +19,7 @@ #include "distributed/metadata_cache.h" #include "distributed/distributed_planner.h" #include "distributed/placement_connection.h" +#include "distributed/relation_access_tracking.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -646,6 +647,9 @@ FindPlacementListConnection(int flags, List *placementAccessList, const char *us } *placementEntryList = lappend(*placementEntryList, placementEntry); + + /* record the relation access mapping */ + AssociatePlacementAccessWithRelation(placement, accessType); } return chosenConnection; @@ -920,6 +924,7 @@ ResetPlacementConnectionManagement(void) hash_delete_all(ConnectionPlacementHash); hash_delete_all(ConnectionShardHash); hash_delete_all(ColocatedPlacementsHash); + ResetRelationAccessHash(); /* * NB: memory for ConnectionReference structs and subordinate data is @@ -1129,6 +1134,9 @@ InitPlacementConnectionManagement(void) ConnectionShardHash = hash_create("citus connection cache (shardid)", 64, &info, hashFlags); + + /* (relationId) = [relationAccessMode] hash */ + AllocateRelationAccessHash(); } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 6aab2c74b..e756cecdc 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -17,6 +17,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/distributed_planner.h" +#include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" #include "executor/executor.h" diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 26509b018..da3cc41d1 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -32,6 +32,8 @@ #include "distributed/multi_resowner.h" #include "distributed/multi_router_executor.h" #include "distributed/multi_server_executor.h" +#include "distributed/placement_connection.h" +#include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/subplan_execution.h" #include "distributed/worker_protocol.h" @@ -147,6 +149,13 @@ MultiRealTimeExecute(Job *job) /* update the connection counter for throttling */ UpdateConnectionCounter(workerNodeState, connectAction); + /* keep track of multi shard select accesses */ + if (MultiShardConnectionType == PARALLEL_CONNECTION && + connectAction == CONNECT_ACTION_OPENED) + { + RecordRelationMultiShardSelectAccessForTask(task); + } + /* * If this task failed, we need to iterate over task executions, and * manually clean out their client-side resources. Hence, we record diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index c1f978cfd..4c7f692bd 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -41,6 +41,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" #include "distributed/placement_connection.h" +#include "distributed/relation_access_tracking.h" #include "distributed/subplan_execution.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" @@ -1335,6 +1336,20 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn &shardConnectionsFound); connectionList = shardConnections->connectionList; + if (task->taskType == MODIFY_TASK) + { + RecordRelationMultiShardModifyAccessForTask(task); + } + else if (task->taskType == DDL_TASK && + PartitionMethod(RelationIdForShard(shardId)) != DISTRIBUTE_BY_NONE) + { + /* + * Even single task DDLs hit here, so we'd prefer + * not to record for reference tables. + */ + RecordRelationMultiShardDDLAccessForTask(task); + } + if (placementIndex >= list_length(connectionList)) { /* no more active placements for this task */ diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index d3969bbc8..7fc272fef 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -33,6 +33,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/reference_table_utils.h" +#include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/transaction_management.h" @@ -335,6 +336,11 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool CreateShardsOnWorkers(targetRelationId, insertedShardPlacements, useExclusiveConnections, colocatedShard); + + if (useExclusiveConnections) + { + RecordParallelDDLAccess(targetRelationId); + } } diff --git a/src/backend/distributed/test/relation_access_tracking.c b/src/backend/distributed/test/relation_access_tracking.c new file mode 100644 index 000000000..e3204acf3 --- /dev/null +++ b/src/backend/distributed/test/relation_access_tracking.c @@ -0,0 +1,60 @@ +/*------------------------------------------------------------------------- + * + * test/src/relation_acess_tracking.c + * + * Some test UDF for tracking relation accesses within transaction blocks. + * + * Copyright (c) 2014-2018, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "c.h" +#include "fmgr.h" + +#include "distributed/relation_access_tracking.h" + + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(relation_select_access_mode); +PG_FUNCTION_INFO_V1(relation_dml_access_mode); +PG_FUNCTION_INFO_V1(relation_ddl_access_mode); + +/* + * relation_select_access_mode returns the SELECT access + * type (e.g., single shard - multi shard) for the given relation. + */ +Datum +relation_select_access_mode(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + + PG_RETURN_INT64(GetRelationSelectAccessMode(relationId)); +} + + +/* + * relation_dml_access_mode returns the DML access type (e.g., + * single shard - multi shard) for the given relation. + */ +Datum +relation_dml_access_mode(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + + PG_RETURN_INT64(GetRelationDMLAccessMode(relationId)); +} + + +/* + * relation_ddl_access_mode returns the DDL access type (e.g., + * single shard - multi shard) for the given relation. + */ +Datum +relation_ddl_access_mode(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + + PG_RETURN_INT64(GetRelationDDLAccessMode(relationId)); +} diff --git a/src/backend/distributed/transaction/relation_access_tracking.c b/src/backend/distributed/transaction/relation_access_tracking.c new file mode 100644 index 000000000..cff8b4081 --- /dev/null +++ b/src/backend/distributed/transaction/relation_access_tracking.c @@ -0,0 +1,369 @@ +/*------------------------------------------------------------------------- + * + * relation_access_tracking.c + * + * Transaction access tracking for Citus. The functions in this file + * are intended to track the relation accesses within a transaction. The + * logic here is mostly useful when a reference table is referred by + * a distributed table via a foreign key. Whenever such a pair of tables + * are acccesed inside a transaction, Citus should detect and act + * accordingly. + * + * Copyright (c) 2018, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "miscadmin.h" + +#include "access/xact.h" +#include "distributed/hash_helpers.h" +#include "distributed/multi_join_order.h" +#include "distributed/metadata_cache.h" +#include "distributed/relation_access_tracking.h" +#include "utils/hsearch.h" + + +#define PARALLEL_MODE_FLAG_OFFSET 3 + +/* + * Hash table mapping relations to the + * (relationId) = (relationAccessType and relationAccessMode) + * + * RelationAccessHash is used to keep track of relation accesses types (e.g., select, + * dml or ddl) along with access modes (e.g., no access, sequential access or + * parallel access). + * + * We keep an integer per relation and use some of the bits to identify the access types + * and access modes. + * + * We store the access types in the first 3 bits: + * - 0th bit is set for SELECT accesses to a relation + * - 1st bit is set for DML accesses to a relation + * - 2nd bit is set for DDL accesses to a relation + * + * and, access modes in the next 3 bits: + * - 3rd bit is set for PARALLEL SELECT accesses to a relation + * - 4th bit is set for PARALLEL DML accesses to a relation + * - 5th bit is set for PARALLEL DDL accesses to a relation + * + */ +typedef struct RelationAccessHashKey +{ + Oid relationId; +} RelationAccessHashKey; + +typedef struct RelationAccessHashEntry +{ + RelationAccessHashKey key; + + int relationAccessMode; +} RelationAccessHashEntry; + +static HTAB *RelationAccessHash; + +static RelationAccessMode GetRelationAccessMode(Oid relationId, + ShardPlacementAccessType accessType); +static void RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessType + placementAccess); + + +/* + * Empty RelationAccessHash, without destroying the hash table itself. + */ +void +ResetRelationAccessHash() +{ + hash_delete_all(RelationAccessHash); +} + + +/* + * Allocate RelationAccessHash. + */ +void +AllocateRelationAccessHash() +{ + HASHCTL info; + uint32 hashFlags = 0; + + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(RelationAccessHashKey); + info.entrysize = sizeof(RelationAccessHashEntry); + info.hash = tag_hash; + info.hcxt = ConnectionContext; + hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + RelationAccessHash = hash_create("citus connection cache (relationid)", + 8, &info, hashFlags); +} + + +/* + * AssociatePlacementAccessWithRelation associates the placement access to the + * distributed relation that the placement belongs to. + */ +void +AssociatePlacementAccessWithRelation(ShardPlacement *placement, + ShardPlacementAccessType accessType) +{ + uint64 shardId = placement->shardId; + Oid relationId = RelationIdForShard(shardId); + RelationAccessHashKey hashKey; + RelationAccessHashEntry *hashEntry; + bool found = false; + + hashKey.relationId = relationId; + + hashEntry = hash_search(RelationAccessHash, &hashKey, HASH_ENTER, &found); + if (!found) + { + hashEntry->relationAccessMode = 0; + } + + /* set the bit representing the access type */ + hashEntry->relationAccessMode |= (1 << (accessType)); +} + + +/* + * RecordRelationMultiShardSelectAccessForTask goes over all the relations + * in the relationShardList and records the select access per each table. + */ +void +RecordRelationMultiShardSelectAccessForTask(Task *task) +{ + List *relationShardList = NIL; + ListCell *relationShardCell = NULL; + Oid lastRelationId = InvalidOid; + + /* no point in recoding accesses in non-transaction blocks, skip the loop */ + if (!IsTransactionBlock()) + { + return; + } + + relationShardList = task->relationShardList; + + foreach(relationShardCell, relationShardList) + { + RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell); + Oid currentRelationId = relationShard->relationId; + + /* + * An optimization, skip going to hash table if we've already + * recorded the relation. + */ + if (currentRelationId == lastRelationId) + { + continue; + } + + RecordParallelSelectAccess(currentRelationId); + + lastRelationId = currentRelationId; + } +} + + +/* + * RecordRelationMultiShardModifyAccessForTask gets a task and records + * the accesses. Note that the target relation is recorded with modify access + * where as the subqueries inside the modify query is recorded with select + * access. + */ +void +RecordRelationMultiShardModifyAccessForTask(Task *task) +{ + List *relationShardList = NULL; + ListCell *relationShardCell = NULL; + Oid lastRelationId = InvalidOid; + + /* no point in recoding accesses in non-transaction blocks, skip the loop */ + if (!IsTransactionBlock()) + { + return; + } + + /* anchor shard is always associated with modify access */ + RecordParallelModifyAccess(RelationIdForShard(task->anchorShardId)); + + if (task->modifyWithSubquery) + { + relationShardList = task->relationShardList; + foreach(relationShardCell, relationShardList) + { + RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell); + Oid currentRelationId = relationShard->relationId; + + /* + * An optimization, skip going to hash table if we've already + * recorded the relation. + */ + if (currentRelationId == lastRelationId) + { + continue; + } + + RecordParallelSelectAccess(currentRelationId); + + lastRelationId = currentRelationId; + } + } +} + + +/* + * RecordRelationMultiShardDDLAccessForTask is a wrapper around + * RecordParallelDDLAccess + */ +void +RecordRelationMultiShardDDLAccessForTask(Task *task) +{ + RecordParallelDDLAccess(RelationIdForShard(task->anchorShardId)); +} + + +/* + * RecordParallelSelectAccess is a wrapper around RecordParallelRelationAccess() + */ +void +RecordParallelSelectAccess(Oid relationId) +{ + RecordParallelRelationAccess(relationId, PLACEMENT_ACCESS_SELECT); +} + + +/* + * RecordParallelModifyAccess is a wrapper around RecordParallelRelationAccess() + */ +void +RecordParallelModifyAccess(Oid relationId) +{ + RecordParallelRelationAccess(relationId, PLACEMENT_ACCESS_DML); +} + + +/* + * RecordParallelDDLAccess is a wrapper around RecordParallelRelationAccess() + */ +void +RecordParallelDDLAccess(Oid relationId) +{ + RecordParallelRelationAccess(relationId, PLACEMENT_ACCESS_DDL); +} + + +/* + * RecordParallelRelationAccess records the relation access mode as parallel + * for the given access type (e.g., select, dml or ddl) in the RelationAccessHash. + * + * The function becomes no-op for non-transaction blocks + */ +static void +RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessType placementAccess) +{ + RelationAccessHashKey hashKey; + RelationAccessHashEntry *hashEntry; + bool found = false; + int multiShardAccessBit = 0; + + /* no point in recoding accesses in non-transaction blocks */ + if (!IsTransactionBlock()) + { + return; + } + + hashKey.relationId = relationId; + + hashEntry = hash_search(RelationAccessHash, &hashKey, HASH_ENTER, &found); + if (!found) + { + hashEntry->relationAccessMode = 0; + } + + /* set the bit representing the access type */ + hashEntry->relationAccessMode |= (1 << (placementAccess)); + + /* set the bit representing access mode */ + multiShardAccessBit = placementAccess + PARALLEL_MODE_FLAG_OFFSET; + hashEntry->relationAccessMode |= (1 << multiShardAccessBit); +} + + +/* + * GetRelationSelectAccessMode is a wrapper around GetRelationAccessMode. + */ +RelationAccessMode +GetRelationSelectAccessMode(Oid relationId) +{ + return GetRelationAccessMode(relationId, PLACEMENT_ACCESS_SELECT); +} + + +/* + * GetRelationDMLAccessMode is a wrapper around GetRelationAccessMode. + */ +RelationAccessMode +GetRelationDMLAccessMode(Oid relationId) +{ + return GetRelationAccessMode(relationId, PLACEMENT_ACCESS_DML); +} + + +/* + * GetRelationDDLAccessMode is a wrapper around GetRelationAccessMode. + */ +RelationAccessMode +GetRelationDDLAccessMode(Oid relationId) +{ + return GetRelationAccessMode(relationId, PLACEMENT_ACCESS_DDL); +} + + +/* + * GetRelationAccessMode returns the relation access mode (e.g., none, sequential + * or parallel) for the given access type (e.g., select, dml or ddl). + */ +static RelationAccessMode +GetRelationAccessMode(Oid relationId, ShardPlacementAccessType accessType) +{ + RelationAccessHashKey hashKey; + RelationAccessHashEntry *hashEntry; + int relationAcessMode = 0; + bool found = false; + int multiShardAccessBit = accessType + PARALLEL_MODE_FLAG_OFFSET; + + /* no point in getting the mode when not inside a transaction block */ + if (!IsTransactionBlock()) + { + return RELATION_NOT_ACCESSED; + } + + hashKey.relationId = relationId; + + hashEntry = hash_search(RelationAccessHash, &hashKey, HASH_FIND, &found); + if (!found) + { + /* relation not accessed at all */ + return RELATION_NOT_ACCESSED; + } + + + relationAcessMode = hashEntry->relationAccessMode; + if (!(relationAcessMode & (1 << accessType))) + { + /* relation not accessed with the given access type */ + return RELATION_NOT_ACCESSED; + } + + if (relationAcessMode & (1 << multiShardAccessBit)) + { + return RELATION_PARALLEL_ACCESSED; + } + else + { + return RELATION_SEQUENTIAL_ACCESSED; + } +} diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 499358d42..999efa28f 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -363,6 +363,26 @@ LoadShardInterval(uint64 shardId) } +/* + * RelationIdOfShard returns the relationId of the given + * shardId. + */ +Oid +RelationIdForShard(uint64 shardId) +{ + ShardCacheEntry *shardEntry = NULL; + DistTableCacheEntry *tableEntry = NULL; + + shardEntry = LookupShardCacheEntry(shardId); + + tableEntry = shardEntry->tableEntry; + + Assert(tableEntry->isDistributedTable); + + return tableEntry->relationId; +} + + /* * ReferenceTableShardId returns true if the given shardId belongs to * a reference table. diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index c9e84a0a9..c30aece12 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -78,6 +78,7 @@ typedef struct extern bool IsDistributedTable(Oid relationId); extern List * DistributedTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); +extern Oid RelationIdForShard(uint64 shardId); extern bool ReferenceTableShardId(uint64 shardId); extern ShardPlacement * FindShardPlacementOnGroup(uint32 groupId, uint64 shardId); extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId); diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 5269a61f1..deb74fa92 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -105,6 +105,9 @@ typedef struct CitusCopyDestReceiver /* number of tuples sent */ int64 tuplesSent; + + /* useful for tracking multi shard accesses */ + bool multiShardCopy; } CitusCopyDestReceiver; diff --git a/src/include/distributed/relation_access_tracking.h b/src/include/distributed/relation_access_tracking.h new file mode 100644 index 000000000..8a4cf3667 --- /dev/null +++ b/src/include/distributed/relation_access_tracking.h @@ -0,0 +1,41 @@ +/* + * relation_access_tracking.h + * + * Function declartions for transaction access tracking. + * + * Copyright (c) 2018, Citus Data, Inc. + */ + +#ifndef RELATION_ACCESS_TRACKING_H_ +#define RELATION_ACCESS_TRACKING_H_ + +#include "distributed/master_metadata_utility.h" +#include "distributed/multi_physical_planner.h" /* access Task struct */ +#include "distributed/placement_connection.h" + +/* forward declare, to avoid dependency on ShardPlacement definition */ +struct ShardPlacement; + +typedef enum RelationAccessMode +{ + RELATION_NOT_ACCESSED, + RELATION_SEQUENTIAL_ACCESSED, + RELATION_PARALLEL_ACCESSED +} RelationAccessMode; + +extern void AllocateRelationAccessHash(void); +extern void ResetRelationAccessHash(void); +extern void AssociatePlacementAccessWithRelation(ShardPlacement *placement, + ShardPlacementAccessType accessType); +extern void RecordParallelSelectAccess(Oid relationId); +extern void RecordRelationMultiShardSelectAccessForTask(Task *task); +extern void RecordRelationMultiShardModifyAccessForTask(Task *task); +extern void RecordParallelModifyAccess(Oid relationId); +extern void RecordParallelDDLAccess(Oid relationId); +extern void RecordRelationMultiShardDDLAccessForTask(Task *task); +extern RelationAccessMode GetRelationDDLAccessMode(Oid relationId); +extern RelationAccessMode GetRelationDMLAccessMode(Oid relationId); +extern RelationAccessMode GetRelationSelectAccessMode(Oid relationId); + + +#endif /* RELATION_ACCESS_TRACKING_H_ */ diff --git a/src/test/regress/expected/relation_access_tracking.out b/src/test/regress/expected/relation_access_tracking.out new file mode 100644 index 000000000..fb315cfc2 --- /dev/null +++ b/src/test/regress/expected/relation_access_tracking.out @@ -0,0 +1,775 @@ +--- +--- tests around access tracking within transaction blocks +--- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + server_version +---------------- + 10 +(1 row) + +CREATE SCHEMA access_tracking; +SET search_path TO 'access_tracking'; +CREATE OR REPLACE FUNCTION relation_select_access_mode(relationId Oid) + RETURNS int + LANGUAGE C STABLE STRICT + AS 'citus', $$relation_select_access_mode$$; +CREATE OR REPLACE FUNCTION relation_dml_access_mode(relationId Oid) + RETURNS int + LANGUAGE C STABLE STRICT + AS 'citus', $$relation_dml_access_mode$$; +CREATE OR REPLACE FUNCTION relation_ddl_access_mode(relationId Oid) + RETURNS int + LANGUAGE C STABLE STRICT + AS 'citus', $$relation_ddl_access_mode$$; +CREATE OR REPLACE FUNCTION relation_access_mode_to_text(relationShardAccess int) +RETURNS text AS +$$ +BEGIN + IF relationShardAccess = 0 THEN + RETURN 'not_accessed'; + ELSIF relationShardAccess = 1 THEN + RETURN 'sequential_access'; + ELSE + RETURN 'parallel_access'; + END IF; +END; +$$ LANGUAGE 'plpgsql' IMMUTABLE; +CREATE VIEW relation_acesses AS + SELECT table_name, + relation_access_mode_to_text(relation_select_access_mode(table_name::regclass)) as select_access, + relation_access_mode_to_text(relation_dml_access_mode(table_name::regclass)) as dml_access, + relation_access_mode_to_text(relation_ddl_access_mode(table_name::regclass)) as ddl_access + FROM + ((SELECT 'table_' || i as table_name FROM generate_series(1, 7) i) UNION (SELECT 'partitioning_test') UNION (SELECT 'partitioning_test_2009') UNION (SELECT 'partitioning_test_2010')) tables; +SET citus.shard_replication_factor TO 1; +CREATE TABLE table_1 (key int, value int); +SELECT create_distributed_table('table_1', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_2 (key int, value int); +SELECT create_distributed_table('table_2', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_3 (key int, value int); +SELECT create_distributed_table('table_3', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_4 (key int, value int); +SELECT create_distributed_table('table_4', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_5 (key int, value int); +SELECT create_distributed_table('table_5', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_6 (key int, value int); +SELECT create_reference_Table('table_6'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO table_1 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_2 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_4 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_5 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_6 SELECT i, i FROM generate_series(0,100) i; +-- create_distributed_table works fine +BEGIN; + CREATE TABLE table_7 (key int, value int); + SELECT create_distributed_table('table_7', 'key'); + create_distributed_table +-------------------------- + +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_7') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+----------------- + table_7 | not_accessed | not_accessed | parallel_access +(1 row) + +COMMIT; +-- outisde the transaction blocks, the function always returns zero +SELECT count(*) FROM table_1; + count +------- + 101 +(1 row) + +SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+-------------- + table_1 | not_accessed | not_accessed | not_accessed +(1 row) + +-- a very simple test that first checks sequential +-- and parallel SELECTs,DMLs, and DDLs +BEGIN; + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+-------------- + table_1 | not_accessed | not_accessed | not_accessed +(1 row) + + SELECT count(*) FROM table_1 WHERE key = 1; + count +------- + 1 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed +(1 row) + + SELECT count(*) FROM table_1 WHERE key = 1 OR key = 2; + count +------- + 2 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed +(1 row) + + INSERT INTO table_1 VALUES (1,1); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-------------------+-------------- + table_1 | parallel_access | sequential_access | not_accessed +(1 row) + + INSERT INTO table_1 VALUES (1,1), (2,2); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-------------------+-------------- + table_1 | parallel_access | sequential_access | not_accessed +(1 row) + + ALTER TABLE table_1 ADD COLUMN test_col INT; + -- now see that the other tables are not accessed at all + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-------------------+----------------- + table_1 | parallel_access | sequential_access | parallel_access +(1 row) + +ROLLBACK; +-- this test shows that even if two multiple single shard +-- commands executed, we can treat the transaction as sequential +BEGIN; + SELECT count(*) FROM table_1 WHERE key = 1; + count +------- + 1 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed +(1 row) + + SELECT count(*) FROM table_1 WHERE key = 2; + count +------- + 1 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed +(1 row) + + INSERT INTO table_1 VALUES (1,1); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_1 | sequential_access | sequential_access | not_accessed +(1 row) + + INSERT INTO table_1 VALUES (2,2); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_1 | sequential_access | sequential_access | not_accessed +(1 row) + +ROLLBACK; +-- a sample DDL example +BEGIN; + ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+----------------- + table_1 | not_accessed | not_accessed | parallel_access +(1 row) + +ROLLBACK; +-- a simple join touches single shard per table +BEGIN; + SELECT + count(*) + FROM + table_1, table_2, table_3, table_4, table_5 + WHERE + table_1.key = table_2.key AND table_2.key = table_3.key AND + table_3.key = table_4.key AND table_4.key = table_5.key AND + table_1.key = 1; + count +------- + 1 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed + table_2 | sequential_access | not_accessed | not_accessed + table_3 | sequential_access | not_accessed | not_accessed + table_4 | sequential_access | not_accessed | not_accessed + table_5 | sequential_access | not_accessed | not_accessed + table_6 | not_accessed | not_accessed | not_accessed + table_7 | not_accessed | not_accessed | not_accessed +(7 rows) + +ROLLBACK; +-- a simple real-time join touches all shard per table +BEGIN; + SELECT + count(*) + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | parallel_access | not_accessed | not_accessed +(2 rows) + +ROLLBACK; +-- a simple real-time join touches all shard per table +-- in sequential mode +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT + count(*) + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed + table_2 | sequential_access | not_accessed | not_accessed +(2 rows) + +ROLLBACK; +-- a simple subquery pushdown that touches all shards +BEGIN; + SELECT + count(*) + FROM + ( + SELECT + random() + FROM + table_1, table_2, table_3, table_4, table_5 + WHERE + table_1.key = table_2.key AND table_2.key = table_3.key AND + table_3.key = table_4.key AND table_4.key = table_5.key + ) as foo; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | parallel_access | not_accessed | not_accessed + table_3 | parallel_access | not_accessed | not_accessed + table_4 | parallel_access | not_accessed | not_accessed + table_5 | parallel_access | not_accessed | not_accessed + table_6 | not_accessed | not_accessed | not_accessed + table_7 | not_accessed | not_accessed | not_accessed +(7 rows) + +ROLLBACK; +-- simple multi shard update both sequential and parallel modes +-- note that in multi shard modify mode we always add select +-- access for all the shards accessed. But, sequential mode is OK +BEGIN; + UPDATE table_1 SET value = 15; + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-----------------+-------------- + table_1 | sequential_access | parallel_access | not_accessed +(1 row) + + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + UPDATE table_2 SET value = 15; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_1 | sequential_access | parallel_access | not_accessed + table_2 | sequential_access | sequential_access | not_accessed +(2 rows) + +ROLLBACK; +-- now UPDATE/DELETE with subselect pushdown +BEGIN; + UPDATE + table_1 SET value = 15 + WHERE key IN (SELECT key FROM table_2 JOIN table_3 USING (key) WHERE table_2.value = 15); + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-----------------+-------------- + table_1 | parallel_access | parallel_access | not_accessed + table_2 | parallel_access | not_accessed | not_accessed + table_3 | parallel_access | not_accessed | not_accessed +(3 rows) + +ROLLBACK; +-- INSERT .. SELECT pushdown +BEGIN; + INSERT INTO table_2 SELECT * FROM table_1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-----------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | not_accessed | parallel_access | not_accessed +(2 rows) + +ROLLBACK; +-- INSERT .. SELECT pushdown in sequential mode should be OK +BEGIN; + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + INSERT INTO table_2 SELECT * FROM table_1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed + table_2 | not_accessed | sequential_access | not_accessed +(2 rows) + +ROLLBACK; +-- coordinator INSERT .. SELECT +BEGIN; + INSERT INTO table_2 SELECT * FROM table_1 OFFSET 0; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-----------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | not_accessed | parallel_access | not_accessed +(2 rows) + +ROLLBACK; + +-- recursively planned SELECT +BEGIN; + SELECT + count(*) + FROM + ( + SELECT + random() + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + OFFSET 0 + ) as foo; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | parallel_access | not_accessed | not_accessed +(2 rows) + +ROLLBACK; +-- recursively planned SELECT and coordinator INSERT .. SELECT +BEGIN; + INSERT INTO table_3 (key) + SELECT + * + FROM + ( + SELECT + random() * 1000 + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + OFFSET 0 + ) as foo; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-----------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | parallel_access | not_accessed | not_accessed + table_3 | not_accessed | parallel_access | not_accessed +(3 rows) + +ROLLBACK; +-- recursively planned SELECT and coordinator INSERT .. SELECT +-- but modifies single shard, marked as sequential operation +BEGIN; + INSERT INTO table_3 (key) + SELECT + * + FROM + ( + SELECT + random() * 1000 + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + AND table_1.key = 1 + OFFSET 0 + ) as foo; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed + table_2 | sequential_access | not_accessed | not_accessed + table_3 | not_accessed | sequential_access | not_accessed +(3 rows) + +ROLLBACK; +-- recursively planned SELECT and recursively planned multi-shard DELETE +BEGIN; + DELETE FROM table_3 where key IN + ( + SELECT + * + FROM + ( + SELECT + table_1.key + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + OFFSET 0 + ) as foo + ) AND value IN (SELECT key FROM table_4); + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3', 'table_4') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-----------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | parallel_access | not_accessed | not_accessed + table_3 | sequential_access | parallel_access | not_accessed + table_4 | parallel_access | not_accessed | not_accessed +(4 rows) + +ROLLBACK; +-- copy out +BEGIN; + COPY (SELECT * FROM table_1 WHERE key IN (1,2,3) ORDER BY 1) TO stdout; +1 1 +2 2 +3 3 + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed +(1 row) + +ROLLBACK; +-- copy in +BEGIN; + COPY table_1 FROM STDIN WITH CSV; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+-----------------+-------------- + table_1 | not_accessed | parallel_access | not_accessed +(1 row) + +ROLLBACK; +-- copy in single shard +BEGIN; + COPY table_1 FROM STDIN WITH CSV; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+-------------------+-------------- + table_1 | not_accessed | sequential_access | not_accessed +(1 row) + +ROLLBACK; +-- reference table accesses should always be a sequential +BEGIN; + SELECT count(*) FROM table_6; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_6'); + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_6 | sequential_access | not_accessed | not_accessed +(1 row) + + UPDATE table_6 SET value = 15; + SELECT * FROM relation_acesses WHERE table_name IN ('table_6'); + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_6 | sequential_access | sequential_access | not_accessed +(1 row) + + ALTER TABLE table_6 ADD COLUMN x INT; + SELECT * FROM relation_acesses WHERE table_name IN ('table_6'); + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+------------------- + table_6 | sequential_access | sequential_access | sequential_access +(1 row) + +ROLLBACK; +-- reference table join with a distributed table +BEGIN; + SELECT count(*) FROM table_1 JOIN table_6 USING(key); + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_6', 'table_1'); + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_6 | parallel_access | not_accessed | not_accessed +(2 rows) + +ROLLBACK; +-- FIXME: TRUNCATE should be DDL +BEGIN; + TRUNCATE table_1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+-----------------+-------------- + table_1 | not_accessed | parallel_access | not_accessed +(1 row) + +ROLLBACK; +-- FIXME: creating foreign keys should consider adding the placement accesses for the referenced table +ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key); +BEGIN; + ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+----------------- + table_1 | not_accessed | not_accessed | not_accessed + table_2 | not_accessed | not_accessed | parallel_access +(2 rows) + +ROLLBACK; +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +SELECT create_distributed_table('partitioning_test', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- FIXME: Adding partition tables should have DDL access the partitioned table as well +BEGIN; + CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); + SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------------------+---------------+--------------+----------------- + partitioning_test | not_accessed | not_accessed | not_accessed + partitioning_test_2009 | not_accessed | not_accessed | parallel_access +(2 rows) + +ROLLBACK; +-- FIXME: Adding partition tables should have DDL access the partitioned table as well +CREATE TABLE partitioning_test_2009 AS SELECT * FROM partitioning_test; +BEGIN; + ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); + SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------------------+---------------+--------------+----------------- + partitioning_test | not_accessed | not_accessed | not_accessed + partitioning_test_2009 | not_accessed | not_accessed | parallel_access +(2 rows) + +ROLLBACK; +-- TRUNCATE CASCADE works fine +ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); +BEGIN; + TRUNCATE table_1 CASCADE; +NOTICE: truncate cascades to table "table_2" + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+-----------------+-------------- + table_1 | not_accessed | parallel_access | not_accessed + table_2 | not_accessed | parallel_access | not_accessed +(2 rows) + +ROLLBACK; +-- CTEs with SELECT only should work fine +BEGIN; + + WITH cte AS (SELECT count(*) FROM table_1) + SELECT * FROM cte; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed +(1 row) + +COMMIT; +-- CTEs with SELECT only in sequential mode should work fine +BEGIN; + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + WITH cte AS (SELECT count(*) FROM table_1) + SELECT * FROM cte; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed +(1 row) + +COMMIT; +-- modifying CTEs should work fine with multi-row inserts, which are by default in sequential +BEGIN; + + WITH cte_1 AS (INSERT INTO table_1 VALUES (1000,1000), (1001, 1001), (1002, 1002) RETURNING *) + SELECT * FROM cte_1 ORDER BY 1; + key | value +------+------- + 1000 | 1000 + 1001 | 1001 + 1002 | 1002 +(3 rows) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+-------------------+-------------- + table_1 | not_accessed | sequential_access | not_accessed +(1 row) + +ROLLBACK; +-- modifying CTEs should work fine with parallel mode +BEGIN; + + WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *) + SELECT count(*) FROM cte_1 ORDER BY 1; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-----------------+-------------- + table_1 | sequential_access | parallel_access | not_accessed +(1 row) + +ROLLBACK; +-- modifying CTEs should work fine with sequential mode +BEGIN; + + WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *) + SELECT count(*) FROM cte_1 ORDER BY 1; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-----------------+-------------- + table_1 | sequential_access | parallel_access | not_accessed +(1 row) + +ROLLBACK; +-- create distributed table with data loading +-- should mark both parallel dml and parallel ddl +DROP TABLE table_3; +CREATE TABLE table_3 (key int, value int); +INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i; +BEGIN; + SELECT create_distributed_table('table_3', 'key'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_3') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+-----------------+----------------- + table_3 | not_accessed | parallel_access | parallel_access +(1 row) + +COMMIT; +SET search_path TO 'public'; +DROP SCHEMA access_tracking CASCADE; +NOTICE: drop cascades to 14 other objects +DETAIL: drop cascades to function access_tracking.relation_select_access_mode(oid) +drop cascades to function access_tracking.relation_dml_access_mode(oid) +drop cascades to function access_tracking.relation_ddl_access_mode(oid) +drop cascades to function access_tracking.relation_access_mode_to_text(integer) +drop cascades to view access_tracking.relation_acesses +drop cascades to table access_tracking.table_1 +drop cascades to table access_tracking.table_2 +drop cascades to table access_tracking.table_4 +drop cascades to table access_tracking.table_5 +drop cascades to table access_tracking.table_6 +drop cascades to table access_tracking.table_7 +drop cascades to table access_tracking.partitioning_test +drop cascades to table access_tracking.partitioning_test_2009 +drop cascades to table access_tracking.table_3 diff --git a/src/test/regress/expected/relation_access_tracking_0.out b/src/test/regress/expected/relation_access_tracking_0.out new file mode 100644 index 000000000..3846455b5 --- /dev/null +++ b/src/test/regress/expected/relation_access_tracking_0.out @@ -0,0 +1,824 @@ +--- +--- tests around access tracking within transaction blocks +--- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + server_version +---------------- + 9 +(1 row) + +CREATE SCHEMA access_tracking; +SET search_path TO 'access_tracking'; +CREATE OR REPLACE FUNCTION relation_select_access_mode(relationId Oid) + RETURNS int + LANGUAGE C STABLE STRICT + AS 'citus', $$relation_select_access_mode$$; +CREATE OR REPLACE FUNCTION relation_dml_access_mode(relationId Oid) + RETURNS int + LANGUAGE C STABLE STRICT + AS 'citus', $$relation_dml_access_mode$$; +CREATE OR REPLACE FUNCTION relation_ddl_access_mode(relationId Oid) + RETURNS int + LANGUAGE C STABLE STRICT + AS 'citus', $$relation_ddl_access_mode$$; +CREATE OR REPLACE FUNCTION relation_access_mode_to_text(relationShardAccess int) +RETURNS text AS +$$ +BEGIN + IF relationShardAccess = 0 THEN + RETURN 'not_accessed'; + ELSIF relationShardAccess = 1 THEN + RETURN 'sequential_access'; + ELSE + RETURN 'parallel_access'; + END IF; +END; +$$ LANGUAGE 'plpgsql' IMMUTABLE; +CREATE VIEW relation_acesses AS + SELECT table_name, + relation_access_mode_to_text(relation_select_access_mode(table_name::regclass)) as select_access, + relation_access_mode_to_text(relation_dml_access_mode(table_name::regclass)) as dml_access, + relation_access_mode_to_text(relation_ddl_access_mode(table_name::regclass)) as ddl_access + FROM + ((SELECT 'table_' || i as table_name FROM generate_series(1, 7) i) UNION (SELECT 'partitioning_test') UNION (SELECT 'partitioning_test_2009') UNION (SELECT 'partitioning_test_2010')) tables; +SET citus.shard_replication_factor TO 1; +CREATE TABLE table_1 (key int, value int); +SELECT create_distributed_table('table_1', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_2 (key int, value int); +SELECT create_distributed_table('table_2', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_3 (key int, value int); +SELECT create_distributed_table('table_3', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_4 (key int, value int); +SELECT create_distributed_table('table_4', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_5 (key int, value int); +SELECT create_distributed_table('table_5', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_6 (key int, value int); +SELECT create_reference_Table('table_6'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO table_1 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_2 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_4 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_5 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_6 SELECT i, i FROM generate_series(0,100) i; +-- create_distributed_table works fine +BEGIN; + CREATE TABLE table_7 (key int, value int); + SELECT create_distributed_table('table_7', 'key'); + create_distributed_table +-------------------------- + +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_7') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+----------------- + table_7 | not_accessed | not_accessed | parallel_access +(1 row) + +COMMIT; +-- outisde the transaction blocks, the function always returns zero +SELECT count(*) FROM table_1; + count +------- + 101 +(1 row) + +SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+-------------- + table_1 | not_accessed | not_accessed | not_accessed +(1 row) + +-- a very simple test that first checks sequential +-- and parallel SELECTs,DMLs, and DDLs +BEGIN; + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+-------------- + table_1 | not_accessed | not_accessed | not_accessed +(1 row) + + SELECT count(*) FROM table_1 WHERE key = 1; + count +------- + 1 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed +(1 row) + + SELECT count(*) FROM table_1 WHERE key = 1 OR key = 2; + count +------- + 2 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed +(1 row) + + INSERT INTO table_1 VALUES (1,1); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-------------------+-------------- + table_1 | parallel_access | sequential_access | not_accessed +(1 row) + + INSERT INTO table_1 VALUES (1,1), (2,2); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-------------------+-------------- + table_1 | parallel_access | sequential_access | not_accessed +(1 row) + + ALTER TABLE table_1 ADD COLUMN test_col INT; + -- now see that the other tables are not accessed at all + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-------------------+----------------- + table_1 | parallel_access | sequential_access | parallel_access +(1 row) + +ROLLBACK; +-- this test shows that even if two multiple single shard +-- commands executed, we can treat the transaction as sequential +BEGIN; + SELECT count(*) FROM table_1 WHERE key = 1; + count +------- + 1 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed +(1 row) + + SELECT count(*) FROM table_1 WHERE key = 2; + count +------- + 1 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed +(1 row) + + INSERT INTO table_1 VALUES (1,1); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_1 | sequential_access | sequential_access | not_accessed +(1 row) + + INSERT INTO table_1 VALUES (2,2); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_1 | sequential_access | sequential_access | not_accessed +(1 row) + +ROLLBACK; +-- a sample DDL example +BEGIN; + ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+----------------- + table_1 | not_accessed | not_accessed | parallel_access +(1 row) + +ROLLBACK; +-- a simple join touches single shard per table +BEGIN; + SELECT + count(*) + FROM + table_1, table_2, table_3, table_4, table_5 + WHERE + table_1.key = table_2.key AND table_2.key = table_3.key AND + table_3.key = table_4.key AND table_4.key = table_5.key AND + table_1.key = 1; + count +------- + 1 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed + table_2 | sequential_access | not_accessed | not_accessed + table_3 | sequential_access | not_accessed | not_accessed + table_4 | sequential_access | not_accessed | not_accessed + table_5 | sequential_access | not_accessed | not_accessed + table_6 | not_accessed | not_accessed | not_accessed + table_7 | not_accessed | not_accessed | not_accessed +(7 rows) + +ROLLBACK; +-- a simple real-time join touches all shard per table +BEGIN; + SELECT + count(*) + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | parallel_access | not_accessed | not_accessed +(2 rows) + +ROLLBACK; +-- a simple real-time join touches all shard per table +-- in sequential mode +BEGIN; + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT + count(*) + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed + table_2 | sequential_access | not_accessed | not_accessed +(2 rows) + +ROLLBACK; +-- a simple subquery pushdown that touches all shards +BEGIN; + SELECT + count(*) + FROM + ( + SELECT + random() + FROM + table_1, table_2, table_3, table_4, table_5 + WHERE + table_1.key = table_2.key AND table_2.key = table_3.key AND + table_3.key = table_4.key AND table_4.key = table_5.key + ) as foo; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | parallel_access | not_accessed | not_accessed + table_3 | parallel_access | not_accessed | not_accessed + table_4 | parallel_access | not_accessed | not_accessed + table_5 | parallel_access | not_accessed | not_accessed + table_6 | not_accessed | not_accessed | not_accessed + table_7 | not_accessed | not_accessed | not_accessed +(7 rows) + +ROLLBACK; +-- simple multi shard update both sequential and parallel modes +-- note that in multi shard modify mode we always add select +-- access for all the shards accessed. But, sequential mode is OK +BEGIN; + UPDATE table_1 SET value = 15; + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-----------------+-------------- + table_1 | sequential_access | parallel_access | not_accessed +(1 row) + + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + UPDATE table_2 SET value = 15; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_1 | sequential_access | parallel_access | not_accessed + table_2 | sequential_access | sequential_access | not_accessed +(2 rows) + +ROLLBACK; +-- now UPDATE/DELETE with subselect pushdown +BEGIN; + UPDATE + table_1 SET value = 15 + WHERE key IN (SELECT key FROM table_2 JOIN table_3 USING (key) WHERE table_2.value = 15); + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-----------------+-------------- + table_1 | parallel_access | parallel_access | not_accessed + table_2 | parallel_access | not_accessed | not_accessed + table_3 | parallel_access | not_accessed | not_accessed +(3 rows) + +ROLLBACK; +-- INSERT .. SELECT pushdown +BEGIN; + INSERT INTO table_2 SELECT * FROM table_1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-----------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | not_accessed | parallel_access | not_accessed +(2 rows) + +ROLLBACK; +-- INSERT .. SELECT pushdown in sequential mode should be OK +BEGIN; + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + INSERT INTO table_2 SELECT * FROM table_1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed + table_2 | not_accessed | sequential_access | not_accessed +(2 rows) + +ROLLBACK; +-- coordinator INSERT .. SELECT +BEGIN; + INSERT INTO table_2 SELECT * FROM table_1 OFFSET 0; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-----------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | not_accessed | parallel_access | not_accessed +(2 rows) + +ROLLBACK; + +-- recursively planned SELECT +BEGIN; + SELECT + count(*) + FROM + ( + SELECT + random() + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + OFFSET 0 + ) as foo; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | parallel_access | not_accessed | not_accessed +(2 rows) + +ROLLBACK; +-- recursively planned SELECT and coordinator INSERT .. SELECT +BEGIN; + INSERT INTO table_3 (key) + SELECT + * + FROM + ( + SELECT + random() * 1000 + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + OFFSET 0 + ) as foo; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+-----------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | parallel_access | not_accessed | not_accessed + table_3 | not_accessed | parallel_access | not_accessed +(3 rows) + +ROLLBACK; +-- recursively planned SELECT and coordinator INSERT .. SELECT +-- but modifies single shard, marked as sequential operation +BEGIN; + INSERT INTO table_3 (key) + SELECT + * + FROM + ( + SELECT + random() * 1000 + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + AND table_1.key = 1 + OFFSET 0 + ) as foo; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed + table_2 | sequential_access | not_accessed | not_accessed + table_3 | not_accessed | sequential_access | not_accessed +(3 rows) + +ROLLBACK; +-- recursively planned SELECT and recursively planned multi-shard DELETE +BEGIN; + DELETE FROM table_3 where key IN + ( + SELECT + * + FROM + ( + SELECT + table_1.key + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + OFFSET 0 + ) as foo + ) AND value IN (SELECT key FROM table_4); + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3', 'table_4') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-----------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_2 | parallel_access | not_accessed | not_accessed + table_3 | sequential_access | parallel_access | not_accessed + table_4 | parallel_access | not_accessed | not_accessed +(4 rows) + +ROLLBACK; +-- copy out +BEGIN; + COPY (SELECT * FROM table_1 WHERE key IN (1,2,3) ORDER BY 1) TO stdout; +1 1 +2 2 +3 3 + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed +(1 row) + +ROLLBACK; +-- copy in +BEGIN; + COPY table_1 FROM STDIN WITH CSV; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+-----------------+-------------- + table_1 | not_accessed | parallel_access | not_accessed +(1 row) + +ROLLBACK; +-- copy in single shard +BEGIN; + COPY table_1 FROM STDIN WITH CSV; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+-------------------+-------------- + table_1 | not_accessed | sequential_access | not_accessed +(1 row) + +ROLLBACK; +-- reference table accesses should always be a sequential +BEGIN; + SELECT count(*) FROM table_6; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_6'); + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_6 | sequential_access | not_accessed | not_accessed +(1 row) + + UPDATE table_6 SET value = 15; + SELECT * FROM relation_acesses WHERE table_name IN ('table_6'); + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+-------------- + table_6 | sequential_access | sequential_access | not_accessed +(1 row) + + ALTER TABLE table_6 ADD COLUMN x INT; + SELECT * FROM relation_acesses WHERE table_name IN ('table_6'); + table_name | select_access | dml_access | ddl_access +------------+-------------------+-------------------+------------------- + table_6 | sequential_access | sequential_access | sequential_access +(1 row) + +ROLLBACK; +-- reference table join with a distributed table +BEGIN; + SELECT count(*) FROM table_1 JOIN table_6 USING(key); + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_6', 'table_1'); + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed + table_6 | parallel_access | not_accessed | not_accessed +(2 rows) + +ROLLBACK; +-- TRUNCATE should be DDL +BEGIN; + TRUNCATE table_1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+----------------- + table_1 | not_accessed | not_accessed | parallel_access +(1 row) + +ROLLBACK; +-- TRUNCATE can be a sequential DDL +BEGIN; + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + TRUNCATE table_1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+------------------- + table_1 | not_accessed | not_accessed | sequential_access +(1 row) + +ROLLBACK; +-- TRUNCATE on a reference table should be sequential +BEGIN; + TRUNCATE table_6; + SELECT * FROM relation_acesses WHERE table_name IN ('table_6') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+------------------- + table_6 | not_accessed | not_accessed | sequential_access +(1 row) + +ROLLBACK; +-- creating foreign keys should consider adding the placement accesses for the referenced table +ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key); +BEGIN; + ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+----------------- + table_1 | not_accessed | not_accessed | parallel_access + table_2 | not_accessed | not_accessed | parallel_access +(2 rows) + +ROLLBACK; +-- creating foreign keys should consider adding the placement accesses for the referenced table +-- in sequential mode as well +BEGIN; + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+------------------- + table_1 | not_accessed | not_accessed | sequential_access + table_2 | not_accessed | not_accessed | sequential_access +(2 rows) + +ROLLBACK; +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ... + ^ +SELECT create_distributed_table('partitioning_test', 'id'); +ERROR: relation "partitioning_test" does not exist +LINE 1: SELECT create_distributed_table('partitioning_test', 'id'); + ^ +-- Adding partition tables via CREATE TABLE should have DDL access the partitioned table as well +BEGIN; + CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin... + ^ + SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +-- Adding partition tables via ATTACH PARTITION on local tables should have DDL access the partitioned table as well +CREATE TABLE partitioning_test_2009 AS SELECT * FROM partitioning_test; +ERROR: relation "partitioning_test" does not exist +LINE 1: ...ATE TABLE partitioning_test_2009 AS SELECT * FROM partitioni... + ^ +BEGIN; + ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE partitioning_test ATTACH PARTITION partitioning_... + ^ + SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +-- Adding partition tables via ATTACH PARTITION on distributed tables should have DDL access the partitioned table as well +CREATE TABLE partitioning_test_2010 AS SELECT * FROM partitioning_test; +ERROR: relation "partitioning_test" does not exist +LINE 1: ...ATE TABLE partitioning_test_2010 AS SELECT * FROM partitioni... + ^ +SELECT create_distributed_table('partitioning_test_2010', 'id'); +ERROR: relation "partitioning_test_2010" does not exist +LINE 1: SELECT create_distributed_table('partitioning_test_2010', 'i... + ^ +BEGIN; + ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2010 FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE partitioning_test ATTACH PARTITION partitioning_... + ^ + SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2010') ORDER BY 1; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +-- TRUNCATE CASCADE works fine +ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); +BEGIN; + TRUNCATE table_1 CASCADE; +NOTICE: truncate cascades to table "table_2" + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+----------------- + table_1 | not_accessed | not_accessed | parallel_access + table_2 | not_accessed | not_accessed | parallel_access +(2 rows) + +ROLLBACK; +-- CTEs with SELECT only should work fine +BEGIN; + + WITH cte AS (SELECT count(*) FROM table_1) + SELECT * FROM cte; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-----------------+--------------+-------------- + table_1 | parallel_access | not_accessed | not_accessed +(1 row) + +COMMIT; +-- CTEs with SELECT only in sequential mode should work fine +BEGIN; + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + WITH cte AS (SELECT count(*) FROM table_1) + SELECT * FROM cte; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+--------------+-------------- + table_1 | sequential_access | not_accessed | not_accessed +(1 row) + +COMMIT; +-- modifying CTEs should work fine with multi-row inserts, which are by default in sequential +BEGIN; + + WITH cte_1 AS (INSERT INTO table_1 VALUES (1000,1000), (1001, 1001), (1002, 1002) RETURNING *) + SELECT * FROM cte_1 ORDER BY 1; + key | value +------+------- + 1000 | 1000 + 1001 | 1001 + 1002 | 1002 +(3 rows) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+-------------------+-------------- + table_1 | not_accessed | sequential_access | not_accessed +(1 row) + +ROLLBACK; +-- modifying CTEs should work fine with parallel mode +BEGIN; + + WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *) + SELECT count(*) FROM cte_1 ORDER BY 1; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-----------------+-------------- + table_1 | sequential_access | parallel_access | not_accessed +(1 row) + +ROLLBACK; +-- modifying CTEs should work fine with sequential mode +BEGIN; + + WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *) + SELECT count(*) FROM cte_1 ORDER BY 1; + count +------- + 101 +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+-------------------+-----------------+-------------- + table_1 | sequential_access | parallel_access | not_accessed +(1 row) + +ROLLBACK; +-- create distributed table with data loading +-- should mark both parallel dml and parallel ddl +DROP TABLE table_3; +CREATE TABLE table_3 (key int, value int); +INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i; +BEGIN; + SELECT create_distributed_table('table_3', 'key'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + + SELECT * FROM relation_acesses WHERE table_name IN ('table_3') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+-----------------+----------------- + table_3 | not_accessed | parallel_access | parallel_access +(1 row) + +COMMIT; +SET search_path TO 'public'; +DROP SCHEMA access_tracking CASCADE; +NOTICE: drop cascades to 12 other objects +DETAIL: drop cascades to function access_tracking.relation_select_access_mode(oid) +drop cascades to function access_tracking.relation_dml_access_mode(oid) +drop cascades to function access_tracking.relation_ddl_access_mode(oid) +drop cascades to function access_tracking.relation_access_mode_to_text(integer) +drop cascades to view access_tracking.relation_acesses +drop cascades to table access_tracking.table_1 +drop cascades to table access_tracking.table_2 +drop cascades to table access_tracking.table_4 +drop cascades to table access_tracking.table_5 +drop cascades to table access_tracking.table_6 +drop cascades to table access_tracking.table_7 +drop cascades to table access_tracking.table_3 diff --git a/src/test/regress/expected/sequential_modifications.out b/src/test/regress/expected/sequential_modifications.out index 77f4ee0d3..9170313d4 100644 --- a/src/test/regress/expected/sequential_modifications.out +++ b/src/test/regress/expected/sequential_modifications.out @@ -6,8 +6,8 @@ -- CREATE SCHEMA test_seq_ddl; SET search_path TO 'test_seq_ddl'; -SET citus.next_shard_id TO 1600; -SET citus.next_placement_id TO 1600; +SET citus.next_shard_id TO 16000; +SET citus.next_placement_id TO 16000; -- this function simply checks the equality of the number of transactions in the -- pg_dist_transaction and number of primary worker nodes -- The function is useful to ensure that a single connection is opened per worker @@ -591,7 +591,7 @@ BEGIN; (1 row) \COPY test_seq_copy FROM STDIN DELIMITER AS ','; -ERROR: cannot establish a new connection for placement 1673, since DDL has been executed on a connection that is in use +ERROR: cannot establish a new connection for placement 16073, since DDL has been executed on a connection that is in use CONTEXT: COPY test_seq_copy, line 2: "2,2" ROLLBACK; SELECT distributed_2PCs_are_equal_to_worker_count(); diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index af0de31f8..16bbe4c67 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -58,7 +58,7 @@ test: multi_subquery_complex_reference_clause multi_subquery_window_functions mu test: multi_subquery_in_where_reference_clause test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql -test: multi_reference_table multi_select_for_update +test: multi_reference_table multi_select_for_update relation_access_tracking test: multi_average_expression multi_working_columns multi_having_pushdown test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg diff --git a/src/test/regress/sql/relation_access_tracking.sql b/src/test/regress/sql/relation_access_tracking.sql new file mode 100644 index 000000000..f36870b7f --- /dev/null +++ b/src/test/regress/sql/relation_access_tracking.sql @@ -0,0 +1,442 @@ + +--- +--- tests around access tracking within transaction blocks +--- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + +CREATE SCHEMA access_tracking; +SET search_path TO 'access_tracking'; + +CREATE OR REPLACE FUNCTION relation_select_access_mode(relationId Oid) + RETURNS int + LANGUAGE C STABLE STRICT + AS 'citus', $$relation_select_access_mode$$; + +CREATE OR REPLACE FUNCTION relation_dml_access_mode(relationId Oid) + RETURNS int + LANGUAGE C STABLE STRICT + AS 'citus', $$relation_dml_access_mode$$; + +CREATE OR REPLACE FUNCTION relation_ddl_access_mode(relationId Oid) + RETURNS int + LANGUAGE C STABLE STRICT + AS 'citus', $$relation_ddl_access_mode$$; + + +CREATE OR REPLACE FUNCTION relation_access_mode_to_text(relationShardAccess int) +RETURNS text AS +$$ +BEGIN + IF relationShardAccess = 0 THEN + RETURN 'not_accessed'; + ELSIF relationShardAccess = 1 THEN + RETURN 'sequential_access'; + ELSE + RETURN 'parallel_access'; + END IF; +END; +$$ LANGUAGE 'plpgsql' IMMUTABLE; + + + +CREATE VIEW relation_acesses AS + SELECT table_name, + relation_access_mode_to_text(relation_select_access_mode(table_name::regclass)) as select_access, + relation_access_mode_to_text(relation_dml_access_mode(table_name::regclass)) as dml_access, + relation_access_mode_to_text(relation_ddl_access_mode(table_name::regclass)) as ddl_access + FROM + ((SELECT 'table_' || i as table_name FROM generate_series(1, 7) i) UNION (SELECT 'partitioning_test') UNION (SELECT 'partitioning_test_2009') UNION (SELECT 'partitioning_test_2010')) tables; + +SET citus.shard_replication_factor TO 1; +CREATE TABLE table_1 (key int, value int); +SELECT create_distributed_table('table_1', 'key'); + +CREATE TABLE table_2 (key int, value int); +SELECT create_distributed_table('table_2', 'key'); + +CREATE TABLE table_3 (key int, value int); +SELECT create_distributed_table('table_3', 'key'); + +CREATE TABLE table_4 (key int, value int); +SELECT create_distributed_table('table_4', 'key'); + +CREATE TABLE table_5 (key int, value int); +SELECT create_distributed_table('table_5', 'key'); + +CREATE TABLE table_6 (key int, value int); +SELECT create_reference_Table('table_6'); + +INSERT INTO table_1 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_2 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_4 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_5 SELECT i, i FROM generate_series(0,100) i; +INSERT INTO table_6 SELECT i, i FROM generate_series(0,100) i; + +-- create_distributed_table works fine +BEGIN; + CREATE TABLE table_7 (key int, value int); + SELECT create_distributed_table('table_7', 'key'); + SELECT * FROM relation_acesses WHERE table_name IN ('table_7') ORDER BY 1; +COMMIT; + +-- outisde the transaction blocks, the function always returns zero +SELECT count(*) FROM table_1; +SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + +-- a very simple test that first checks sequential +-- and parallel SELECTs,DMLs, and DDLs +BEGIN; + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + SELECT count(*) FROM table_1 WHERE key = 1; + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + SELECT count(*) FROM table_1 WHERE key = 1 OR key = 2; + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + INSERT INTO table_1 VALUES (1,1); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + INSERT INTO table_1 VALUES (1,1), (2,2); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + ALTER TABLE table_1 ADD COLUMN test_col INT; + + -- now see that the other tables are not accessed at all + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + +ROLLBACK; + + +-- this test shows that even if two multiple single shard +-- commands executed, we can treat the transaction as sequential +BEGIN; + SELECT count(*) FROM table_1 WHERE key = 1; + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + SELECT count(*) FROM table_1 WHERE key = 2; + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + INSERT INTO table_1 VALUES (1,1); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + INSERT INTO table_1 VALUES (2,2); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; +ROLLBACK; + +-- a sample DDL example +BEGIN; + ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key); + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; +ROLLBACK; + +-- a simple join touches single shard per table +BEGIN; + SELECT + count(*) + FROM + table_1, table_2, table_3, table_4, table_5 + WHERE + table_1.key = table_2.key AND table_2.key = table_3.key AND + table_3.key = table_4.key AND table_4.key = table_5.key AND + table_1.key = 1; + + SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1; +ROLLBACK; + +-- a simple real-time join touches all shard per table +BEGIN; + SELECT + count(*) + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key; + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; +ROLLBACK; + +-- a simple real-time join touches all shard per table +-- in sequential mode +BEGIN; + + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + SELECT + count(*) + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key; + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; +ROLLBACK; + +-- a simple subquery pushdown that touches all shards +BEGIN; + SELECT + count(*) + FROM + ( + + SELECT + random() + FROM + table_1, table_2, table_3, table_4, table_5 + WHERE + table_1.key = table_2.key AND table_2.key = table_3.key AND + table_3.key = table_4.key AND table_4.key = table_5.key + ) as foo; + + SELECT * FROM relation_acesses WHERE table_name LIKE 'table_%' ORDER BY 1; +ROLLBACK; + +-- simple multi shard update both sequential and parallel modes +-- note that in multi shard modify mode we always add select +-- access for all the shards accessed. But, sequential mode is OK +BEGIN; + UPDATE table_1 SET value = 15; + SELECT * FROM relation_acesses WHERE table_name = 'table_1'; + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + UPDATE table_2 SET value = 15; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; +ROLLBACK; + +-- now UPDATE/DELETE with subselect pushdown +BEGIN; + UPDATE + table_1 SET value = 15 + WHERE key IN (SELECT key FROM table_2 JOIN table_3 USING (key) WHERE table_2.value = 15); + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1; +ROLLBACK; + +-- INSERT .. SELECT pushdown +BEGIN; + INSERT INTO table_2 SELECT * FROM table_1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; +ROLLBACK; + +-- INSERT .. SELECT pushdown in sequential mode should be OK +BEGIN; + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + + INSERT INTO table_2 SELECT * FROM table_1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; +ROLLBACK; + +-- coordinator INSERT .. SELECT +BEGIN; + INSERT INTO table_2 SELECT * FROM table_1 OFFSET 0; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; +ROLLBACK; + + + +-- recursively planned SELECT +BEGIN; + SELECT + count(*) + FROM + ( + + SELECT + random() + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + OFFSET 0 + ) as foo; + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; +ROLLBACK; + +-- recursively planned SELECT and coordinator INSERT .. SELECT +BEGIN; + INSERT INTO table_3 (key) + SELECT + * + FROM + ( + + SELECT + random() * 1000 + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + OFFSET 0 + ) as foo; + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1; +ROLLBACK; + +-- recursively planned SELECT and coordinator INSERT .. SELECT +-- but modifies single shard, marked as sequential operation +BEGIN; + INSERT INTO table_3 (key) + SELECT + * + FROM + ( + + SELECT + random() * 1000 + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + AND table_1.key = 1 + OFFSET 0 + ) as foo; + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3') ORDER BY 1; +ROLLBACK; + +-- recursively planned SELECT and recursively planned multi-shard DELETE +BEGIN; + DELETE FROM table_3 where key IN + ( + SELECT + * + FROM + ( + SELECT + table_1.key + FROM + table_1, table_2 + WHERE + table_1.key = table_2.key + OFFSET 0 + ) as foo + ) AND value IN (SELECT key FROM table_4); + + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2', 'table_3', 'table_4') ORDER BY 1; +ROLLBACK; + +-- copy out +BEGIN; + COPY (SELECT * FROM table_1 WHERE key IN (1,2,3) ORDER BY 1) TO stdout; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; +ROLLBACK; + +-- copy in +BEGIN; + COPY table_1 FROM STDIN WITH CSV; +1,1 +2,2 +3,3 +\. + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; +ROLLBACK; + +-- copy in single shard +BEGIN; + COPY table_1 FROM STDIN WITH CSV; +1,1 +1,2 +1,3 +\. + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; +ROLLBACK; + +-- reference table accesses should always be a sequential +BEGIN; + SELECT count(*) FROM table_6; + SELECT * FROM relation_acesses WHERE table_name IN ('table_6'); + + UPDATE table_6 SET value = 15; + SELECT * FROM relation_acesses WHERE table_name IN ('table_6'); + + ALTER TABLE table_6 ADD COLUMN x INT; + SELECT * FROM relation_acesses WHERE table_name IN ('table_6'); +ROLLBACK; + +-- reference table join with a distributed table +BEGIN; + SELECT count(*) FROM table_1 JOIN table_6 USING(key); + SELECT * FROM relation_acesses WHERE table_name IN ('table_6', 'table_1'); +ROLLBACK; + +-- FIXME: TRUNCATE should be DDL +BEGIN; + TRUNCATE table_1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; +ROLLBACK; + +-- FIXME: creating foreign keys should consider adding the placement accesses for the referenced table +ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key); +BEGIN; + ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; +ROLLBACK; + +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +SELECT create_distributed_table('partitioning_test', 'id'); + +-- FIXME: Adding partition tables should have DDL access the partitioned table as well +BEGIN; + CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); + SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1; +ROLLBACK; + +-- FIXME: Adding partition tables should have DDL access the partitioned table as well +CREATE TABLE partitioning_test_2009 AS SELECT * FROM partitioning_test; +BEGIN; + ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); + SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1; +ROLLBACK; + +-- TRUNCATE CASCADE works fine +ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); +BEGIN; + TRUNCATE table_1 CASCADE; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; +ROLLBACK; + +-- CTEs with SELECT only should work fine +BEGIN; + + WITH cte AS (SELECT count(*) FROM table_1) + SELECT * FROM cte; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; +COMMIT; + +-- CTEs with SELECT only in sequential mode should work fine +BEGIN; + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + + WITH cte AS (SELECT count(*) FROM table_1) + SELECT * FROM cte; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; +COMMIT; + +-- modifying CTEs should work fine with multi-row inserts, which are by default in sequential +BEGIN; + + WITH cte_1 AS (INSERT INTO table_1 VALUES (1000,1000), (1001, 1001), (1002, 1002) RETURNING *) + SELECT * FROM cte_1 ORDER BY 1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; +ROLLBACK; + +-- modifying CTEs should work fine with parallel mode +BEGIN; + + WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *) + SELECT count(*) FROM cte_1 ORDER BY 1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; +ROLLBACK; + +-- modifying CTEs should work fine with sequential mode +BEGIN; + + WITH cte_1 AS (UPDATE table_1 SET value = 15 RETURNING *) + SELECT count(*) FROM cte_1 ORDER BY 1; + SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; +ROLLBACK; + +-- create distributed table with data loading +-- should mark both parallel dml and parallel ddl +DROP TABLE table_3; +CREATE TABLE table_3 (key int, value int); +INSERT INTO table_3 SELECT i, i FROM generate_series(0,100) i; +BEGIN; + SELECT create_distributed_table('table_3', 'key'); + SELECT * FROM relation_acesses WHERE table_name IN ('table_3') ORDER BY 1; +COMMIT; + +SET search_path TO 'public'; +DROP SCHEMA access_tracking CASCADE; diff --git a/src/test/regress/sql/sequential_modifications.sql b/src/test/regress/sql/sequential_modifications.sql index 0f3856477..d3575f7f9 100644 --- a/src/test/regress/sql/sequential_modifications.sql +++ b/src/test/regress/sql/sequential_modifications.sql @@ -6,8 +6,8 @@ -- CREATE SCHEMA test_seq_ddl; SET search_path TO 'test_seq_ddl'; -SET citus.next_shard_id TO 1600; -SET citus.next_placement_id TO 1600; +SET citus.next_shard_id TO 16000; +SET citus.next_placement_id TO 16000; -- this function simply checks the equality of the number of transactions in the -- pg_dist_transaction and number of primary worker nodes