diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 73f10cea7..5e4ad1fbc 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -418,11 +418,15 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo command); } + if (executeSequentially) + { + SetLocalMultiShardModifyModeToSequential(); + } + ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = leftRelationId; ddlJob->concurrentIndexCmd = false; ddlJob->commandString = alterTableCommand; - ddlJob->executeSequentially = executeSequentially; if (rightRelationId) { diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 5806dc647..54f6de839 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -553,7 +553,7 @@ static void ExecuteDistributedDDLJob(DDLJob *ddlJob) { bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId); - int targetPoolSize = ddlJob->executeSequentially ? 1 : MaxAdaptiveExecutorPoolSize; + int targetPoolSize = MaxAdaptiveExecutorPoolSize; EnsureCoordinator(); EnsurePartitionTableNotReplicated(ddlJob->targetRelationId); @@ -579,8 +579,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } /* use adaptive executor when enabled */ - ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, targetPoolSize, - ddlJob->executeSequentially); + ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, targetPoolSize); } else { @@ -592,8 +591,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) PG_TRY(); { /* use adaptive executor when enabled */ - ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, targetPoolSize, - ddlJob->executeSequentially); + ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, targetPoolSize); if (shouldSyncMetadata) { diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index 8f69f769e..ee9926cbf 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -94,7 +94,7 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) taskList = VacuumTaskList(relationId, vacuumStmt->options, vacuumColumnList); /* use adaptive executor when enabled */ - ExecuteUtilityTaskListWithoutResults(taskList, targetPoolSize, false); + ExecuteUtilityTaskListWithoutResults(taskList, targetPoolSize); executedVacuumCount++; } relationIndex++; diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index e29ab8c7b..a55deeada 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -657,8 +657,7 @@ AdaptiveExecutor(CustomScanState *node) * through router executor. */ void -ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize, - bool forceSequentialExecution) +ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize) { if (TaskExecutorType == MULTI_EXECUTOR_ADAPTIVE) { @@ -666,8 +665,7 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize, } else { - if (MultiShardConnectionType == SEQUENTIAL_CONNECTION || - forceSequentialExecution) + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { ExecuteModifyTasksSequentiallyWithoutResults(taskList, CMD_UTILITY); } diff --git a/src/backend/distributed/master/master_truncate.c b/src/backend/distributed/master/master_truncate.c index dff775e0a..ed91ad747 100644 --- a/src/backend/distributed/master/master_truncate.c +++ b/src/backend/distributed/master/master_truncate.c @@ -77,8 +77,7 @@ citus_truncate_trigger(PG_FUNCTION_ARGS) { List *taskList = TruncateTaskList(relationId); - ExecuteUtilityTaskListWithoutResults(taskList, MaxAdaptiveExecutorPoolSize, - false); + ExecuteUtilityTaskListWithoutResults(taskList, MaxAdaptiveExecutorPoolSize); } PG_RETURN_DATUM(PointerGetDatum(NULL)); diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 64792f3a1..113e82eb0 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -37,7 +37,6 @@ typedef struct DDLJob { Oid targetRelationId; /* oid of the target distributed relation */ bool concurrentIndexCmd; /* related to a CONCURRENTLY index command? */ - bool executeSequentially; const char *commandString; /* initial (coordinator) DDL command string */ List *taskList; /* worker DDL tasks to execute */ } DDLJob; diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 9d2358a5c..83789b15d 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -42,9 +42,10 @@ extern uint64 ExecuteTaskListExtended(CmdType operation, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize); -extern void ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize, - bool forceSequentialExecution); -extern uint64 ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize); + +extern void ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize); +extern uint64 ExecuteTaskList(CmdType operation, List *taskList, int + targetPoolSize); extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); diff --git a/src/test/regress/expected/foreign_key_restriction_enforcement.out b/src/test/regress/expected/foreign_key_restriction_enforcement.out index fce35abd4..a69248877 100644 --- a/src/test/regress/expected/foreign_key_restriction_enforcement.out +++ b/src/test/regress/expected/foreign_key_restriction_enforcement.out @@ -1061,8 +1061,6 @@ ADD CONSTRAINT fkey_delete FOREIGN KEY(value_1) REFERENCES reference_table(id) ON DELETE CASCADE; -DEBUG: switching to sequential query execution mode -DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i; DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator diff --git a/src/test/regress/expected/foreign_key_restriction_enforcement_0.out b/src/test/regress/expected/foreign_key_restriction_enforcement_0.out index e08a5dec4..7a7cef976 100644 --- a/src/test/regress/expected/foreign_key_restriction_enforcement_0.out +++ b/src/test/regress/expected/foreign_key_restriction_enforcement_0.out @@ -1061,8 +1061,6 @@ ADD CONSTRAINT fkey_delete FOREIGN KEY(value_1) REFERENCES reference_table(id) ON DELETE CASCADE; -DEBUG: switching to sequential query execution mode -DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i; DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Collecting INSERT ... SELECT results on coordinator