diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 9b4fee684..0d974fac1 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1183,23 +1183,43 @@ CanUseExclusiveConnections(Oid relationId, bool localTableEmpty) } else if (shouldRunSequential && ParallelQueryExecutedInTransaction()) { + /* + * We decided to use sequential execution. It's either because relation + * has a pre-existing foreign key to a reference table or because we + * decided to use sequential execution due to a query executed in the + * current xact beforehand. + * We have specific error messages for either cases. + */ + char *relationName = get_rel_name(relationId); - /* - * If there has already been a parallel query executed, the sequential mode - * would still use the already opened parallel connections to the workers, - * thus contradicting our purpose of using sequential mode. - */ - ereport(ERROR, (errmsg("cannot distribute relation \"%s\" in this " - "transaction because it has a foreign key to " - "a reference table", relationName), - errdetail("If a hash distributed table has a foreign key " - "to a reference table, it has to be created " - "in sequential mode before any parallel commands " - "have been executed in the same transaction"), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); + if (hasForeignKeyToReferenceTable) + { + /* + * If there has already been a parallel query executed, the sequential mode + * would still use the already opened parallel connections to the workers, + * thus contradicting our purpose of using sequential mode. + */ + ereport(ERROR, (errmsg("cannot distribute relation \"%s\" in this " + "transaction because it has a foreign key to " + "a reference table", relationName), + errdetail("If a hash distributed table has a foreign key " + "to a reference table, it has to be created " + "in sequential mode before any parallel commands " + "have been executed in the same transaction"), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.multi_shard_modify_mode TO " + "\'sequential\';\""))); + } + else if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) + { + ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode because " + "a parallel query was executed in this transaction", + relationName), + errhint("If you have manually set " + "citus.multi_shard_modify_mode to 'sequential', " + "try with 'parallel' option. "))); + } } else if (shouldRunSequential) { diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 2b5804a25..d0af329c4 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2288,7 +2288,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->shardStateHash = CreateShardStateHash(TopTransactionContext); copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext); - RecordRelationAccessIfReferenceTable(tableId, PLACEMENT_ACCESS_DML); + RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); /* * For all the primary (e.g., writable) nodes, reserve a shared connection. diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 6075fe8dc..90d6f24d9 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -466,7 +466,7 @@ AssignPlacementListToConnection(List *placementAccessList, MultiConnection *conn /* record the relation access */ Oid relationId = RelationIdForShard(placement->shardId); - RecordRelationAccessIfReferenceTable(relationId, accessType); + RecordRelationAccessIfNonDistTable(relationId, accessType); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 0d08be1b0..c4e59a7ed 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -118,6 +118,7 @@ static void SplitLocalAndRemotePlacements(List *taskPlacementList, static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, TupleDestination *tupleDest, Task *task, ParamListInfo paramListInfo); +static void RecordNonDistTableAccessesForTask(Task *task); static void LogLocalCommand(Task *task); static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tupleDest, @@ -549,6 +550,8 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, int eflags = 0; uint64 totalRowsProcessed = 0; + RecordNonDistTableAccessesForTask(task); + /* * Use the tupleStore provided by the scanState because it is shared accross * the other task executions and the adaptive executor. @@ -585,6 +588,59 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, } +/* + * RecordNonDistTableAccessesForTask records relation accesses for the non-distributed + * relations that given task will access (if any). + */ +static void +RecordNonDistTableAccessesForTask(Task *task) +{ + List *taskPlacementList = task->taskPlacementList; + if (list_length(taskPlacementList) == 0) + { + /* + * We need at least one task placement to record relation access. + * FIXME: Unfortunately, it is possible due to + * https://github.com/citusdata/citus/issues/4104. + * We can safely remove this check when above bug is fixed. + */ + return; + } + + /* + * We use only the first placement to find the relation accesses. It is + * sufficient as PlacementAccessListForTask iterates relationShardList + * field of the task and generates accesses per relation in the task. + * As we are only interested in relations, not the placements, we can + * skip rest of the placements. + * Also, here we don't need to iterate relationShardList field of task + * to mark each accessed relation because PlacementAccessListForTask + * already computes and returns relations that task accesses. + */ + ShardPlacement *taskPlacement = linitial(taskPlacementList); + List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); + + ShardPlacementAccess *placementAccess = NULL; + foreach_ptr(placementAccess, placementAccessList) + { + uint64 placementAccessShardId = placementAccess->placement->shardId; + if (placementAccessShardId == INVALID_SHARD_ID) + { + /* + * When a SELECT prunes down to 0 shard, we still may pass through + * the local executor. In that case, we don't need to record any + * relation access as we don't actually access any shard placement. + */ + continue; + } + + Oid accessedRelationId = RelationIdForShard(placementAccessShardId); + ShardPlacementAccessType shardPlacementAccessType = placementAccess->accessType; + RecordRelationAccessIfNonDistTable(accessedRelationId, shardPlacementAccessType); + } +} + + /* * SetLocalExecutionStatus sets the local execution status to * the given status, it errors if the transition is not possible from the diff --git a/src/backend/distributed/transaction/relation_access_tracking.c b/src/backend/distributed/transaction/relation_access_tracking.c index a3b56ecca..e74068fbc 100644 --- a/src/backend/distributed/transaction/relation_access_tracking.c +++ b/src/backend/distributed/transaction/relation_access_tracking.c @@ -153,13 +153,13 @@ AllocateRelationAccessHash(void) /* - * RecordRelationAccessIfReferenceTable marks the relation accessed if it is a + * RecordRelationAccessIfNonDistTable marks the relation accessed if it is a * reference relation. * * The function is a wrapper around RecordRelationAccessBase(). */ void -RecordRelationAccessIfReferenceTable(Oid relationId, ShardPlacementAccessType accessType) +RecordRelationAccessIfNonDistTable(Oid relationId, ShardPlacementAccessType accessType) { if (!ShouldRecordRelationAccess()) { @@ -492,6 +492,17 @@ RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessType placementA /* act accordingly if it's a conflicting access */ CheckConflictingParallelRelationAccesses(relationId, placementAccess); + /* + * CheckConflictingParallelRelationAccesses might switch to sequential + * execution. If that's the case, no need to continue because the executor + * would take the necessary actions to switch to sequential execution + * immediately. + */ + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) + { + return; + } + /* If a relation is partitioned, record accesses to all of its partitions as well. */ if (PartitionedTable(relationId)) { diff --git a/src/include/distributed/relation_access_tracking.h b/src/include/distributed/relation_access_tracking.h index dc215e8b6..deacdec94 100644 --- a/src/include/distributed/relation_access_tracking.h +++ b/src/include/distributed/relation_access_tracking.h @@ -36,8 +36,8 @@ typedef enum RelationAccessMode extern void AllocateRelationAccessHash(void); extern void ResetRelationAccessHash(void); -extern void RecordRelationAccessIfReferenceTable(Oid relationId, - ShardPlacementAccessType accessType); +extern void RecordRelationAccessIfNonDistTable(Oid relationId, + ShardPlacementAccessType accessType); extern void RecordParallelRelationAccessForTaskList(List *taskList); extern void RecordParallelSelectAccess(Oid relationId); extern void RecordParallelModifyAccess(Oid relationId); diff --git a/src/test/regress/expected/foreign_key_restriction_enforcement.out b/src/test/regress/expected/foreign_key_restriction_enforcement.out index d7b147370..77fa988f8 100644 --- a/src/test/regress/expected/foreign_key_restriction_enforcement.out +++ b/src/test/regress/expected/foreign_key_restriction_enforcement.out @@ -1268,9 +1268,8 @@ BEGIN; SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int); SELECT create_distributed_table('test_table_2', 'id'); -ERROR: cannot distribute relation "test_table_2" in this transaction because it has a foreign key to a reference table -DETAIL: If a hash distributed table has a foreign key to a reference table, it has to be created in sequential mode before any parallel commands have been executed in the same transaction -HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" +ERROR: cannot distribute "test_table_2" in sequential mode because a parallel query was executed in this transaction +HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', try with 'parallel' option. CREATE TABLE test_table_1(id int PRIMARY KEY); ERROR: current transaction is aborted, commands ignored until end of transaction block SELECT create_reference_table('test_table_1'); diff --git a/src/test/regress/expected/local_shard_utility_command_execution.out b/src/test/regress/expected/local_shard_utility_command_execution.out index 76f0c291d..810cd02b6 100644 --- a/src/test/regress/expected/local_shard_utility_command_execution.out +++ b/src/test/regress/expected/local_shard_utility_command_execution.out @@ -629,7 +629,6 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 (2 rows) ROLLBACK; --- Try a bunch of commands and expect failure at SELECT create_distributed_table BEGIN; -- here this SELECT will enforce the whole block for local execution SELECT COUNT(*) FROM ref_table; @@ -664,8 +663,6 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500124, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500127, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500130, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') - -- as we create table via remote connections, below SELECT create_distributed_table - -- would error out CREATE TABLE another_dist_table(a int); SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table'); NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500133, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500133, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') @@ -685,6 +682,44 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 (1 row) COMMIT; +-- add a foreign key for next test +ALTER TABLE dist_table ADD CONSTRAINT fkey_dist_to_ref FOREIGN KEY (b) REFERENCES ref_table(a); +BEGIN; + SELECT count(*) FROM ref_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500035 ref_table + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- should show parallel + SHOW citus.multi_shard_modify_mode ; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + parallel +(1 row) + + -- wants to do parallel execution but will switch to sequential mode + ALTER TABLE dist_table DROP COLUMN c; +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500100, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500103, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500106, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500109, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500112, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500115, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500118, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500121, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500124, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500127, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500130, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;') + -- should show sequential + SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +ROLLBACK; --------------------------------------------------------------------- ------------ partitioned tables ------------- --------------------------------------------------------------------- diff --git a/src/test/regress/sql/local_shard_utility_command_execution.sql b/src/test/regress/sql/local_shard_utility_command_execution.sql index 3f2148b9e..d754c5888 100644 --- a/src/test/regress/sql/local_shard_utility_command_execution.sql +++ b/src/test/regress/sql/local_shard_utility_command_execution.sql @@ -278,7 +278,6 @@ BEGIN; SELECT tablename FROM pg_tables where schemaname='foo_schema' ORDER BY tablename; ROLLBACK; --- Try a bunch of commands and expect failure at SELECT create_distributed_table BEGIN; -- here this SELECT will enforce the whole block for local execution SELECT COUNT(*) FROM ref_table; @@ -287,12 +286,26 @@ BEGIN; ALTER TABLE dist_table ADD column c int; ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL; - -- as we create table via remote connections, below SELECT create_distributed_table - -- would error out CREATE TABLE another_dist_table(a int); SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table'); COMMIT; +-- add a foreign key for next test +ALTER TABLE dist_table ADD CONSTRAINT fkey_dist_to_ref FOREIGN KEY (b) REFERENCES ref_table(a); + +BEGIN; + SELECT count(*) FROM ref_table; + + -- should show parallel + SHOW citus.multi_shard_modify_mode ; + + -- wants to do parallel execution but will switch to sequential mode + ALTER TABLE dist_table DROP COLUMN c; + + -- should show sequential + SHOW citus.multi_shard_modify_mode; +ROLLBACK; + --------------------------------------------- ------------ partitioned tables ------------- ---------------------------------------------