diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 4c7f692bd..ea07d541b 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -86,6 +86,8 @@ static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdT operation, bool alwaysThrowErrorOnFailure, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); +static List * BuildPlacementAccessList(uint32 groupId, List *relationShardList, + ShardPlacementAccessType accessType); static List * GetModifyConnections(Task *task, bool markCritical); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults); @@ -842,6 +844,28 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) */ List * BuildPlacementSelectList(uint32 groupId, List *relationShardList) +{ + return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_SELECT); +} + + +/* + * BuildPlacementDDLList is a warpper around BuildPlacementAccessList() for DDL access. + */ +List * +BuildPlacementDDLList(uint32 groupId, List *relationShardList) +{ + return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_DDL); +} + + +/* + * BuildPlacementAccessList returns a list of placement accesses for the given + * relationShardList and the access type. + */ +static List * +BuildPlacementAccessList(uint32 groupId, List *relationShardList, + ShardPlacementAccessType accessType) { ListCell *relationShardCell = NULL; List *placementAccessList = NIL; @@ -858,7 +882,7 @@ BuildPlacementSelectList(uint32 groupId, List *relationShardList) continue; } - placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT); + placementAccess = CreatePlacementAccess(placement, accessType); placementAccessList = lappend(placementAccessList, placementAccess); } @@ -1092,9 +1116,22 @@ GetModifyConnections(Task *task, bool markCritical) accessType = PLACEMENT_ACCESS_DML; } - /* create placement accesses for placements that appear in a subselect */ - placementAccessList = BuildPlacementSelectList(taskPlacement->groupId, - relationShardList); + if (accessType == PLACEMENT_ACCESS_DDL) + { + /* + * All relations appearing inter-shard DDL commands should be marked + * with DDL access. + */ + placementAccessList = + BuildPlacementDDLList(taskPlacement->groupId, relationShardList); + } + else + { + /* create placement accesses for placements that appear in a subselect */ + placementAccessList = + BuildPlacementSelectList(taskPlacement->groupId, relationShardList); + } + Assert(list_length(placementAccessList) == list_length(relationShardList)); diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index ca744028f..6c8e658d8 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -3121,10 +3121,18 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, uint64 leftShardId = leftShardInterval->shardId; StringInfo applyCommand = makeStringInfo(); Task *task = NULL; + RelationShard *leftRelationShard = CitusMakeNode(RelationShard); + RelationShard *rightRelationShard = CitusMakeNode(RelationShard); ShardInterval *rightShardInterval = (ShardInterval *) lfirst(rightShardCell); uint64 rightShardId = rightShardInterval->shardId; + leftRelationShard->relationId = leftRelationId; + leftRelationShard->shardId = leftShardId; + + rightRelationShard->relationId = rightRelationId; + rightRelationShard->shardId = rightShardId; + appendStringInfo(applyCommand, WORKER_APPLY_INTER_SHARD_DDL_COMMAND, leftShardId, escapedLeftSchemaName, rightShardId, escapedRightSchemaName, escapedCommandString); @@ -3138,6 +3146,7 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = leftShardId; task->taskPlacementList = FinalizedShardPlacementList(leftShardId); + task->relationShardList = list_make2(leftRelationShard, rightRelationShard); taskList = lappend(taskList, task); } diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 7fc272fef..43163df9e 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -30,6 +30,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/reference_table_utils.h" @@ -340,6 +341,13 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool if (useExclusiveConnections) { RecordParallelDDLAccess(targetRelationId); + + /* we should mark the parent as well */ + if (PartitionTable(targetRelationId)) + { + Oid parentRelationId = PartitionParentOid(targetRelationId); + RecordParallelDDLAccess(parentRelationId); + } } } diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 2dcb79077..e46d657de 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -30,8 +30,10 @@ #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/connection_management.h" +#include "distributed/distributed_planner.h" #include "distributed/foreign_constraint.h" #include "distributed/multi_client_executor.h" +#include "distributed/multi_router_executor.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" @@ -531,8 +533,38 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, shardIndex = ShardIndex(shardInterval); } - connection = GetPlacementConnection(connectionFlags, shardPlacement, - placementOwner); + /* + * For partitions, make sure that we mark the parent table relation access + * with DDL. This is only important for parallel relation access in transaction + * blocks, thus check useExclusiveConnection and transaction block as well. + */ + if ((IsTransactionBlock() && useExclusiveConnection) && + alterTableAttachPartitionCommand != NULL) + { + RelationShard *parentRelationShard = CitusMakeNode(RelationShard); + RelationShard *partitionRelationShard = CitusMakeNode(RelationShard); + List *relationShardList = NIL; + List *placementAccessList = NIL; + + parentRelationShard->relationId = PartitionParentOid(distributedRelationId); + parentRelationShard->shardId = + ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex); + partitionRelationShard->relationId = distributedRelationId; + partitionRelationShard->shardId = shardId; + + relationShardList = list_make2(parentRelationShard, partitionRelationShard); + placementAccessList = BuildPlacementDDLList(shardPlacement->groupId, + relationShardList); + + connection = GetPlacementListConnection(connectionFlags, placementAccessList, + placementOwner); + } + else + { + connection = GetPlacementConnection(connectionFlags, shardPlacement, + placementOwner); + } + if (useExclusiveConnection) { ClaimConnectionExclusively(connection); diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 3dfdbde53..22ff71482 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -109,9 +109,22 @@ OpenTransactionsForAllTasks(List *taskList, int connectionFlags) placementAccessList = lappend(placementAccessList, &placementModification); - /* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */ - placementSelectList = BuildPlacementSelectList(shardPlacement->groupId, - task->relationShardList); + if (accessType == PLACEMENT_ACCESS_DDL) + { + /* + * All relations appearing inter-shard DDL commands should be marked + * with DDL access. + */ + placementSelectList = BuildPlacementDDLList(shardPlacement->groupId, + task->relationShardList); + } + else + { + /* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */ + placementSelectList = BuildPlacementSelectList(shardPlacement->groupId, + task->relationShardList); + } + placementAccessList = list_concat(placementAccessList, placementSelectList); /* diff --git a/src/backend/distributed/transaction/relation_access_tracking.c b/src/backend/distributed/transaction/relation_access_tracking.c index cff8b4081..cc310d409 100644 --- a/src/backend/distributed/transaction/relation_access_tracking.c +++ b/src/backend/distributed/transaction/relation_access_tracking.c @@ -215,12 +215,24 @@ RecordRelationMultiShardModifyAccessForTask(Task *task) /* - * RecordRelationMultiShardDDLAccessForTask is a wrapper around - * RecordParallelDDLAccess + * RecordRelationMultiShardDDLAccessForTask marks all the relationShards + * with parallel DDL access if exists. That case is valid for inter-shard + * DDL commands such as foreign key creation. The function also records + * the relation that anchorShardId belongs to. */ void RecordRelationMultiShardDDLAccessForTask(Task *task) { + List *relationShardList = task->relationShardList; + ListCell *relationShardCell = NULL; + + foreach(relationShardCell, relationShardList) + { + RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell); + + RecordParallelDDLAccess(relationShard->relationId); + } + RecordParallelDDLAccess(RelationIdForShard(task->anchorShardId)); } diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 94f5beba7..b6482f24d 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -14,6 +14,7 @@ #include "access/sdir.h" #include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" +#include "distributed/placement_connection.h" #include "executor/execdesc.h" #include "executor/tuptable.h" #include "nodes/pg_list.h" @@ -47,5 +48,6 @@ extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, /* helper functions */ extern bool TaskListRequires2PC(List *taskList); extern List * BuildPlacementSelectList(uint32 groupId, List *relationShardList); +extern List * BuildPlacementDDLList(uint32 groupId, List *relationShardList); #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/test/regress/expected/relation_access_tracking.out b/src/test/regress/expected/relation_access_tracking.out index fb315cfc2..69ec80fbd 100644 --- a/src/test/regress/expected/relation_access_tracking.out +++ b/src/test/regress/expected/relation_access_tracking.out @@ -594,17 +594,30 @@ BEGIN; (1 row) ROLLBACK; --- FIXME: creating foreign keys should consider adding the placement accesses for the referenced table +-- creating foreign keys should consider adding the placement accesses for the referenced table ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key); BEGIN; ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; table_name | select_access | dml_access | ddl_access ------------+---------------+--------------+----------------- - table_1 | not_accessed | not_accessed | not_accessed + table_1 | not_accessed | not_accessed | parallel_access table_2 | not_accessed | not_accessed | parallel_access (2 rows) +ROLLBACK; +-- creating foreign keys should consider adding the placement accesses for the referenced table +-- in sequential mode as well +BEGIN; + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------+---------------+--------------+------------------- + table_1 | not_accessed | not_accessed | sequential_access + table_2 | not_accessed | not_accessed | sequential_access +(2 rows) + ROLLBACK; CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); SELECT create_distributed_table('partitioning_test', 'id'); @@ -613,28 +626,46 @@ SELECT create_distributed_table('partitioning_test', 'id'); (1 row) --- FIXME: Adding partition tables should have DDL access the partitioned table as well +-- Adding partition tables via CREATE TABLE should have DDL access the partitioned table as well BEGIN; CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1; table_name | select_access | dml_access | ddl_access ------------------------+---------------+--------------+----------------- - partitioning_test | not_accessed | not_accessed | not_accessed + partitioning_test | not_accessed | not_accessed | parallel_access partitioning_test_2009 | not_accessed | not_accessed | parallel_access (2 rows) ROLLBACK; --- FIXME: Adding partition tables should have DDL access the partitioned table as well +-- Adding partition tables via ATTACH PARTITION on local tables should have DDL access the partitioned table as well CREATE TABLE partitioning_test_2009 AS SELECT * FROM partitioning_test; BEGIN; ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1; table_name | select_access | dml_access | ddl_access ------------------------+---------------+--------------+----------------- - partitioning_test | not_accessed | not_accessed | not_accessed + partitioning_test | not_accessed | not_accessed | parallel_access partitioning_test_2009 | not_accessed | not_accessed | parallel_access (2 rows) +ROLLBACK; +-- Adding partition tables via ATTACH PARTITION on distributed tables should have DDL access the partitioned table as well +CREATE TABLE partitioning_test_2010 AS SELECT * FROM partitioning_test; +SELECT create_distributed_table('partitioning_test_2010', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +BEGIN; + ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2010 FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); + SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2010') ORDER BY 1; + table_name | select_access | dml_access | ddl_access +------------------------+---------------+--------------+----------------- + partitioning_test | not_accessed | not_accessed | parallel_access + partitioning_test_2010 | not_accessed | not_accessed | parallel_access +(2 rows) + ROLLBACK; -- TRUNCATE CASCADE works fine ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); @@ -758,7 +789,7 @@ NOTICE: Copying data from local table... COMMIT; SET search_path TO 'public'; DROP SCHEMA access_tracking CASCADE; -NOTICE: drop cascades to 14 other objects +NOTICE: drop cascades to 15 other objects DETAIL: drop cascades to function access_tracking.relation_select_access_mode(oid) drop cascades to function access_tracking.relation_dml_access_mode(oid) drop cascades to function access_tracking.relation_ddl_access_mode(oid) @@ -772,4 +803,5 @@ drop cascades to table access_tracking.table_6 drop cascades to table access_tracking.table_7 drop cascades to table access_tracking.partitioning_test drop cascades to table access_tracking.partitioning_test_2009 +drop cascades to table access_tracking.partitioning_test_2010 drop cascades to table access_tracking.table_3 diff --git a/src/test/regress/sql/relation_access_tracking.sql b/src/test/regress/sql/relation_access_tracking.sql index f36870b7f..d3af36403 100644 --- a/src/test/regress/sql/relation_access_tracking.sql +++ b/src/test/regress/sql/relation_access_tracking.sql @@ -357,29 +357,46 @@ BEGIN; SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1; ROLLBACK; --- FIXME: creating foreign keys should consider adding the placement accesses for the referenced table +-- creating foreign keys should consider adding the placement accesses for the referenced table ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key); BEGIN; ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; ROLLBACK; +-- creating foreign keys should consider adding the placement accesses for the referenced table +-- in sequential mode as well +BEGIN; + SET LOCAL citus.multi_shard_modify_mode = 'sequential'; + + ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); + SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1; +ROLLBACK; + CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); SELECT create_distributed_table('partitioning_test', 'id'); --- FIXME: Adding partition tables should have DDL access the partitioned table as well +-- Adding partition tables via CREATE TABLE should have DDL access the partitioned table as well BEGIN; CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1; ROLLBACK; --- FIXME: Adding partition tables should have DDL access the partitioned table as well +-- Adding partition tables via ATTACH PARTITION on local tables should have DDL access the partitioned table as well CREATE TABLE partitioning_test_2009 AS SELECT * FROM partitioning_test; BEGIN; ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01'); SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1; ROLLBACK; +-- Adding partition tables via ATTACH PARTITION on distributed tables should have DDL access the partitioned table as well +CREATE TABLE partitioning_test_2010 AS SELECT * FROM partitioning_test; +SELECT create_distributed_table('partitioning_test_2010', 'id'); +BEGIN; + ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2010 FOR VALUES FROM ('2010-01-01') TO ('2011-01-01'); + SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2010') ORDER BY 1; +ROLLBACK; + -- TRUNCATE CASCADE works fine ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key); BEGIN;