mirror of https://github.com/citusdata/citus.git
Make sure that inter-shard DDL commands are always covers both tables
parent
2f01894589
commit
21038f0d0e
|
@ -86,6 +86,8 @@ static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdT
|
|||
operation, bool alwaysThrowErrorOnFailure, bool
|
||||
expectResults);
|
||||
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
|
||||
static List * BuildPlacementAccessList(uint32 groupId, List *relationShardList,
|
||||
ShardPlacementAccessType accessType);
|
||||
static List * GetModifyConnections(Task *task, bool markCritical);
|
||||
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
|
||||
bool isModificationQuery, bool expectResults);
|
||||
|
@ -842,6 +844,28 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
|
|||
*/
|
||||
List *
|
||||
BuildPlacementSelectList(uint32 groupId, List *relationShardList)
|
||||
{
|
||||
return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_SELECT);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildPlacementDDLList is a warpper around BuildPlacementAccessList() for DDL access.
|
||||
*/
|
||||
List *
|
||||
BuildPlacementDDLList(uint32 groupId, List *relationShardList)
|
||||
{
|
||||
return BuildPlacementAccessList(groupId, relationShardList, PLACEMENT_ACCESS_DDL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildPlacementAccessList returns a list of placement accesses for the given
|
||||
* relationShardList and the access type.
|
||||
*/
|
||||
static List *
|
||||
BuildPlacementAccessList(uint32 groupId, List *relationShardList,
|
||||
ShardPlacementAccessType accessType)
|
||||
{
|
||||
ListCell *relationShardCell = NULL;
|
||||
List *placementAccessList = NIL;
|
||||
|
@ -858,7 +882,7 @@ BuildPlacementSelectList(uint32 groupId, List *relationShardList)
|
|||
continue;
|
||||
}
|
||||
|
||||
placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT);
|
||||
placementAccess = CreatePlacementAccess(placement, accessType);
|
||||
placementAccessList = lappend(placementAccessList, placementAccess);
|
||||
}
|
||||
|
||||
|
@ -1092,9 +1116,22 @@ GetModifyConnections(Task *task, bool markCritical)
|
|||
accessType = PLACEMENT_ACCESS_DML;
|
||||
}
|
||||
|
||||
/* create placement accesses for placements that appear in a subselect */
|
||||
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
|
||||
relationShardList);
|
||||
if (accessType == PLACEMENT_ACCESS_DDL)
|
||||
{
|
||||
/*
|
||||
* All relations appearing inter-shard DDL commands should be marked
|
||||
* with DDL access.
|
||||
*/
|
||||
placementAccessList =
|
||||
BuildPlacementDDLList(taskPlacement->groupId, relationShardList);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* create placement accesses for placements that appear in a subselect */
|
||||
placementAccessList =
|
||||
BuildPlacementSelectList(taskPlacement->groupId, relationShardList);
|
||||
}
|
||||
|
||||
|
||||
Assert(list_length(placementAccessList) == list_length(relationShardList));
|
||||
|
||||
|
|
|
@ -3121,10 +3121,18 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
|
|||
uint64 leftShardId = leftShardInterval->shardId;
|
||||
StringInfo applyCommand = makeStringInfo();
|
||||
Task *task = NULL;
|
||||
RelationShard *leftRelationShard = CitusMakeNode(RelationShard);
|
||||
RelationShard *rightRelationShard = CitusMakeNode(RelationShard);
|
||||
|
||||
ShardInterval *rightShardInterval = (ShardInterval *) lfirst(rightShardCell);
|
||||
uint64 rightShardId = rightShardInterval->shardId;
|
||||
|
||||
leftRelationShard->relationId = leftRelationId;
|
||||
leftRelationShard->shardId = leftShardId;
|
||||
|
||||
rightRelationShard->relationId = rightRelationId;
|
||||
rightRelationShard->shardId = rightShardId;
|
||||
|
||||
appendStringInfo(applyCommand, WORKER_APPLY_INTER_SHARD_DDL_COMMAND,
|
||||
leftShardId, escapedLeftSchemaName, rightShardId,
|
||||
escapedRightSchemaName, escapedCommandString);
|
||||
|
@ -3138,6 +3146,7 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
|
|||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||
task->anchorShardId = leftShardId;
|
||||
task->taskPlacementList = FinalizedShardPlacementList(leftShardId);
|
||||
task->relationShardList = list_make2(leftRelationShard, rightRelationShard);
|
||||
|
||||
taskList = lappend(taskList, task);
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/pg_dist_shard.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
|
@ -340,6 +341,13 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
|
|||
if (useExclusiveConnections)
|
||||
{
|
||||
RecordParallelDDLAccess(targetRelationId);
|
||||
|
||||
/* we should mark the parent as well */
|
||||
if (PartitionTable(targetRelationId))
|
||||
{
|
||||
Oid parentRelationId = PartitionParentOid(targetRelationId);
|
||||
RecordParallelDDLAccess(parentRelationId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,8 +30,10 @@
|
|||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
#include "distributed/foreign_constraint.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
@ -531,8 +533,38 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
|
|||
shardIndex = ShardIndex(shardInterval);
|
||||
}
|
||||
|
||||
connection = GetPlacementConnection(connectionFlags, shardPlacement,
|
||||
placementOwner);
|
||||
/*
|
||||
* For partitions, make sure that we mark the parent table relation access
|
||||
* with DDL. This is only important for parallel relation access in transaction
|
||||
* blocks, thus check useExclusiveConnection and transaction block as well.
|
||||
*/
|
||||
if ((IsTransactionBlock() && useExclusiveConnection) &&
|
||||
alterTableAttachPartitionCommand != NULL)
|
||||
{
|
||||
RelationShard *parentRelationShard = CitusMakeNode(RelationShard);
|
||||
RelationShard *partitionRelationShard = CitusMakeNode(RelationShard);
|
||||
List *relationShardList = NIL;
|
||||
List *placementAccessList = NIL;
|
||||
|
||||
parentRelationShard->relationId = PartitionParentOid(distributedRelationId);
|
||||
parentRelationShard->shardId =
|
||||
ColocatedShardIdInRelation(parentRelationShard->relationId, shardIndex);
|
||||
partitionRelationShard->relationId = distributedRelationId;
|
||||
partitionRelationShard->shardId = shardId;
|
||||
|
||||
relationShardList = list_make2(parentRelationShard, partitionRelationShard);
|
||||
placementAccessList = BuildPlacementDDLList(shardPlacement->groupId,
|
||||
relationShardList);
|
||||
|
||||
connection = GetPlacementListConnection(connectionFlags, placementAccessList,
|
||||
placementOwner);
|
||||
}
|
||||
else
|
||||
{
|
||||
connection = GetPlacementConnection(connectionFlags, shardPlacement,
|
||||
placementOwner);
|
||||
}
|
||||
|
||||
if (useExclusiveConnection)
|
||||
{
|
||||
ClaimConnectionExclusively(connection);
|
||||
|
|
|
@ -109,9 +109,22 @@ OpenTransactionsForAllTasks(List *taskList, int connectionFlags)
|
|||
|
||||
placementAccessList = lappend(placementAccessList, &placementModification);
|
||||
|
||||
/* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */
|
||||
placementSelectList = BuildPlacementSelectList(shardPlacement->groupId,
|
||||
task->relationShardList);
|
||||
if (accessType == PLACEMENT_ACCESS_DDL)
|
||||
{
|
||||
/*
|
||||
* All relations appearing inter-shard DDL commands should be marked
|
||||
* with DDL access.
|
||||
*/
|
||||
placementSelectList = BuildPlacementDDLList(shardPlacement->groupId,
|
||||
task->relationShardList);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */
|
||||
placementSelectList = BuildPlacementSelectList(shardPlacement->groupId,
|
||||
task->relationShardList);
|
||||
}
|
||||
|
||||
placementAccessList = list_concat(placementAccessList, placementSelectList);
|
||||
|
||||
/*
|
||||
|
|
|
@ -215,12 +215,24 @@ RecordRelationMultiShardModifyAccessForTask(Task *task)
|
|||
|
||||
|
||||
/*
|
||||
* RecordRelationMultiShardDDLAccessForTask is a wrapper around
|
||||
* RecordParallelDDLAccess
|
||||
* RecordRelationMultiShardDDLAccessForTask marks all the relationShards
|
||||
* with parallel DDL access if exists. That case is valid for inter-shard
|
||||
* DDL commands such as foreign key creation. The function also records
|
||||
* the relation that anchorShardId belongs to.
|
||||
*/
|
||||
void
|
||||
RecordRelationMultiShardDDLAccessForTask(Task *task)
|
||||
{
|
||||
List *relationShardList = task->relationShardList;
|
||||
ListCell *relationShardCell = NULL;
|
||||
|
||||
foreach(relationShardCell, relationShardList)
|
||||
{
|
||||
RelationShard *relationShard = (RelationShard *) lfirst(relationShardCell);
|
||||
|
||||
RecordParallelDDLAccess(relationShard->relationId);
|
||||
}
|
||||
|
||||
RecordParallelDDLAccess(RelationIdForShard(task->anchorShardId));
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "access/sdir.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "executor/execdesc.h"
|
||||
#include "executor/tuptable.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
@ -47,5 +48,6 @@ extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,
|
|||
/* helper functions */
|
||||
extern bool TaskListRequires2PC(List *taskList);
|
||||
extern List * BuildPlacementSelectList(uint32 groupId, List *relationShardList);
|
||||
extern List * BuildPlacementDDLList(uint32 groupId, List *relationShardList);
|
||||
|
||||
#endif /* MULTI_ROUTER_EXECUTOR_H_ */
|
||||
|
|
|
@ -594,17 +594,30 @@ BEGIN;
|
|||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- FIXME: creating foreign keys should consider adding the placement accesses for the referenced table
|
||||
-- creating foreign keys should consider adding the placement accesses for the referenced table
|
||||
ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key);
|
||||
BEGIN;
|
||||
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
|
||||
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
|
||||
table_name | select_access | dml_access | ddl_access
|
||||
------------+---------------+--------------+-----------------
|
||||
table_1 | not_accessed | not_accessed | not_accessed
|
||||
table_1 | not_accessed | not_accessed | parallel_access
|
||||
table_2 | not_accessed | not_accessed | parallel_access
|
||||
(2 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- creating foreign keys should consider adding the placement accesses for the referenced table
|
||||
-- in sequential mode as well
|
||||
BEGIN;
|
||||
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
|
||||
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
|
||||
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
|
||||
table_name | select_access | dml_access | ddl_access
|
||||
------------+---------------+--------------+-------------------
|
||||
table_1 | not_accessed | not_accessed | sequential_access
|
||||
table_2 | not_accessed | not_accessed | sequential_access
|
||||
(2 rows)
|
||||
|
||||
ROLLBACK;
|
||||
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
|
||||
SELECT create_distributed_table('partitioning_test', 'id');
|
||||
|
@ -613,28 +626,46 @@ SELECT create_distributed_table('partitioning_test', 'id');
|
|||
|
||||
(1 row)
|
||||
|
||||
-- FIXME: Adding partition tables should have DDL access the partitioned table as well
|
||||
-- Adding partition tables via CREATE TABLE should have DDL access the partitioned table as well
|
||||
BEGIN;
|
||||
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1;
|
||||
table_name | select_access | dml_access | ddl_access
|
||||
------------------------+---------------+--------------+-----------------
|
||||
partitioning_test | not_accessed | not_accessed | not_accessed
|
||||
partitioning_test | not_accessed | not_accessed | parallel_access
|
||||
partitioning_test_2009 | not_accessed | not_accessed | parallel_access
|
||||
(2 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- FIXME: Adding partition tables should have DDL access the partitioned table as well
|
||||
-- Adding partition tables via ATTACH PARTITION on local tables should have DDL access the partitioned table as well
|
||||
CREATE TABLE partitioning_test_2009 AS SELECT * FROM partitioning_test;
|
||||
BEGIN;
|
||||
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1;
|
||||
table_name | select_access | dml_access | ddl_access
|
||||
------------------------+---------------+--------------+-----------------
|
||||
partitioning_test | not_accessed | not_accessed | not_accessed
|
||||
partitioning_test | not_accessed | not_accessed | parallel_access
|
||||
partitioning_test_2009 | not_accessed | not_accessed | parallel_access
|
||||
(2 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- Adding partition tables via ATTACH PARTITION on distributed tables should have DDL access the partitioned table as well
|
||||
CREATE TABLE partitioning_test_2010 AS SELECT * FROM partitioning_test;
|
||||
SELECT create_distributed_table('partitioning_test_2010', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2010 FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
|
||||
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2010') ORDER BY 1;
|
||||
table_name | select_access | dml_access | ddl_access
|
||||
------------------------+---------------+--------------+-----------------
|
||||
partitioning_test | not_accessed | not_accessed | parallel_access
|
||||
partitioning_test_2010 | not_accessed | not_accessed | parallel_access
|
||||
(2 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- TRUNCATE CASCADE works fine
|
||||
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
|
||||
|
@ -758,7 +789,7 @@ NOTICE: Copying data from local table...
|
|||
COMMIT;
|
||||
SET search_path TO 'public';
|
||||
DROP SCHEMA access_tracking CASCADE;
|
||||
NOTICE: drop cascades to 14 other objects
|
||||
NOTICE: drop cascades to 15 other objects
|
||||
DETAIL: drop cascades to function access_tracking.relation_select_access_mode(oid)
|
||||
drop cascades to function access_tracking.relation_dml_access_mode(oid)
|
||||
drop cascades to function access_tracking.relation_ddl_access_mode(oid)
|
||||
|
@ -772,4 +803,5 @@ drop cascades to table access_tracking.table_6
|
|||
drop cascades to table access_tracking.table_7
|
||||
drop cascades to table access_tracking.partitioning_test
|
||||
drop cascades to table access_tracking.partitioning_test_2009
|
||||
drop cascades to table access_tracking.partitioning_test_2010
|
||||
drop cascades to table access_tracking.table_3
|
||||
|
|
|
@ -357,29 +357,46 @@ BEGIN;
|
|||
SELECT * FROM relation_acesses WHERE table_name IN ('table_1') ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
-- FIXME: creating foreign keys should consider adding the placement accesses for the referenced table
|
||||
-- creating foreign keys should consider adding the placement accesses for the referenced table
|
||||
ALTER TABLE table_1 ADD CONSTRAINT table_1_u UNIQUE (key);
|
||||
BEGIN;
|
||||
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
|
||||
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
-- creating foreign keys should consider adding the placement accesses for the referenced table
|
||||
-- in sequential mode as well
|
||||
BEGIN;
|
||||
SET LOCAL citus.multi_shard_modify_mode = 'sequential';
|
||||
|
||||
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
|
||||
SELECT * FROM relation_acesses WHERE table_name IN ('table_1', 'table_2') ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
|
||||
SELECT create_distributed_table('partitioning_test', 'id');
|
||||
|
||||
-- FIXME: Adding partition tables should have DDL access the partitioned table as well
|
||||
-- Adding partition tables via CREATE TABLE should have DDL access the partitioned table as well
|
||||
BEGIN;
|
||||
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
-- FIXME: Adding partition tables should have DDL access the partitioned table as well
|
||||
-- Adding partition tables via ATTACH PARTITION on local tables should have DDL access the partitioned table as well
|
||||
CREATE TABLE partitioning_test_2009 AS SELECT * FROM partitioning_test;
|
||||
BEGIN;
|
||||
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2009') ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
-- Adding partition tables via ATTACH PARTITION on distributed tables should have DDL access the partitioned table as well
|
||||
CREATE TABLE partitioning_test_2010 AS SELECT * FROM partitioning_test;
|
||||
SELECT create_distributed_table('partitioning_test_2010', 'id');
|
||||
BEGIN;
|
||||
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2010 FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
|
||||
SELECT * FROM relation_acesses WHERE table_name IN ('partitioning_test', 'partitioning_test_2010') ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
-- TRUNCATE CASCADE works fine
|
||||
ALTER TABLE table_2 ADD CONSTRAINT table_2_u FOREIGN KEY (key) REFERENCES table_1(key);
|
||||
BEGIN;
|
||||
|
|
Loading…
Reference in New Issue