Refactor switching to sequential mode

We don't need to wait until the execution. As soon as we realize
that we need sequential execution, we should do it.
pull/2866/head
Onder Kalaci 2019-08-05 18:12:59 +02:00
parent 263faffb27
commit b2e01d0745
9 changed files with 13 additions and 20 deletions

View File

@ -418,11 +418,15 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
command);
}
if (executeSequentially)
{
SetLocalMultiShardModifyModeToSequential();
}
ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = leftRelationId;
ddlJob->concurrentIndexCmd = false;
ddlJob->commandString = alterTableCommand;
ddlJob->executeSequentially = executeSequentially;
if (rightRelationId)
{

View File

@ -553,7 +553,7 @@ static void
ExecuteDistributedDDLJob(DDLJob *ddlJob)
{
bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId);
int targetPoolSize = ddlJob->executeSequentially ? 1 : MaxAdaptiveExecutorPoolSize;
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
EnsureCoordinator();
EnsurePartitionTableNotReplicated(ddlJob->targetRelationId);
@ -579,8 +579,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
}
/* use adaptive executor when enabled */
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, targetPoolSize,
ddlJob->executeSequentially);
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, targetPoolSize);
}
else
{
@ -592,8 +591,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
PG_TRY();
{
/* use adaptive executor when enabled */
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, targetPoolSize,
ddlJob->executeSequentially);
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, targetPoolSize);
if (shouldSyncMetadata)
{

View File

@ -94,7 +94,7 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
taskList = VacuumTaskList(relationId, vacuumStmt->options, vacuumColumnList);
/* use adaptive executor when enabled */
ExecuteUtilityTaskListWithoutResults(taskList, targetPoolSize, false);
ExecuteUtilityTaskListWithoutResults(taskList, targetPoolSize);
executedVacuumCount++;
}
relationIndex++;

View File

@ -657,8 +657,7 @@ AdaptiveExecutor(CustomScanState *node)
* through router executor.
*/
void
ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize,
bool forceSequentialExecution)
ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize)
{
if (TaskExecutorType == MULTI_EXECUTOR_ADAPTIVE)
{
@ -666,8 +665,7 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize,
}
else
{
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION ||
forceSequentialExecution)
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
ExecuteModifyTasksSequentiallyWithoutResults(taskList, ROW_MODIFY_NONE);
}

View File

@ -77,8 +77,7 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
{
List *taskList = TruncateTaskList(relationId);
ExecuteUtilityTaskListWithoutResults(taskList, MaxAdaptiveExecutorPoolSize,
false);
ExecuteUtilityTaskListWithoutResults(taskList, MaxAdaptiveExecutorPoolSize);
}
PG_RETURN_DATUM(PointerGetDatum(NULL));

View File

@ -37,7 +37,6 @@ typedef struct DDLJob
{
Oid targetRelationId; /* oid of the target distributed relation */
bool concurrentIndexCmd; /* related to a CONCURRENTLY index command? */
bool executeSequentially;
const char *commandString; /* initial (coordinator) DDL command string */
List *taskList; /* worker DDL tasks to execute */
} DDLJob;

View File

@ -42,8 +42,7 @@ extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize);
extern void ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize,
bool forceSequentialExecution);
extern void ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize);
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int
targetPoolSize);
extern TupleTableSlot * CitusExecScan(CustomScanState *node);

View File

@ -1064,8 +1064,6 @@ ADD CONSTRAINT
fkey_delete FOREIGN KEY(value_1)
REFERENCES
reference_table(id) ON DELETE CASCADE;
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator

View File

@ -1064,8 +1064,6 @@ ADD CONSTRAINT
fkey_delete FOREIGN KEY(value_1)
REFERENCES
reference_table(id) ON DELETE CASCADE;
DEBUG: switching to sequential query execution mode
DETAIL: Reference relation "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed relations due to foreign keys. Any parallel modification to those hash distributed relations in the same transaction can only be executed in sequential query execution mode
INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator