Merge pull request #2688 from citusdata/unify_fkey_to_ref_recording

Refactor Parallel Relation Access Recording
pull/2696/head
Önder Kalacı 2019-05-02 17:19:06 +02:00 committed by GitHub
commit febe412108
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 71 additions and 37 deletions

View File

@ -109,18 +109,14 @@ MultiRealTimeExecute(Job *job)
BeginOrContinueCoordinatedTransaction();
}
RecordParallelRelationAccessForTaskList(taskList);
/* initialize task execution structures for remote execution */
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
TaskExecution *taskExecution = NULL;
/* keep track of multi shard accesses before opening the connections */
if (MultiShardConnectionType == PARALLEL_CONNECTION)
{
RecordRelationParallelSelectAccessForTask(task);
}
taskExecution = InitTaskExecution(task, EXEC_TASK_CONNECT_START);
taskExecutionList = lappend(taskExecutionList, taskExecution);
}

View File

@ -1500,31 +1500,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
CoordinatedTransactionUse2PC();
}
/*
* With a similar rationale as above, where we expect all tasks to operate on
* the same relations, we prefer to record relation accesses for the first
* task only.
*/
if (firstTask->taskType == MODIFY_TASK)
{
RecordRelationParallelModifyAccessForTask(firstTask);
/*
* We prefer to mark with SELECT access as well because for multi shard
* modification queries, the placement access list is always marked with both
* DML and SELECT accesses.
*/
RecordRelationParallelSelectAccessForTask(firstTask);
}
else if (firstTask->taskType == DDL_TASK &&
PartitionMethod(firstShardInterval->relationId) != DISTRIBUTE_BY_NONE)
{
/*
* Even single task DDLs hit here, so we'd prefer
* not to record for reference tables.
*/
RecordRelationParallelDDLAccessForTask(firstTask);
}
RecordParallelRelationAccessForTaskList(taskList);
if (firstTask->taskType == DDL_TASK || firstTask->taskType == VACUUM_ANALYZE_TASK)
{

View File

@ -85,6 +85,9 @@ static HTAB *RelationAccessHash;
static void RecordRelationAccess(Oid relationId, ShardPlacementAccessType accessType);
static void RecordPlacementAccessToCache(Oid relationId,
ShardPlacementAccessType accessType);
static void RecordRelationParallelSelectAccessForTask(Task *task);
static void RecordRelationParallelModifyAccessForTask(Task *task);
static void RecordRelationParallelDDLAccessForTask(Task *task);
static RelationAccessMode GetRelationAccessMode(Oid relationId,
ShardPlacementAccessType accessType);
static void RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessType
@ -281,11 +284,72 @@ RecordPlacementAccessToCache(Oid relationId, ShardPlacementAccessType accessType
}
/*
* RecordParallelRelationAccessForTaskList gets a task list and records
* the necessary parallel relation accesses for the task list.
*
* This function is used to enforce foreign keys from distributed
* tables to reference tables.
*/
void
RecordParallelRelationAccessForTaskList(List *taskList)
{
Task *firstTask = NULL;
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
/* sequential mode prevents parallel access */
return;
}
if (list_length(taskList) < 2)
{
/* single shard task doesn't mean parallel access in our definition */
return;
}
/*
* Since all the tasks in a task list is expected to operate on the same
* distributed table(s), we only need to process the first task.
*/
firstTask = linitial(taskList);
if (firstTask->taskType == SQL_TASK)
{
RecordRelationParallelSelectAccessForTask(firstTask);
}
else if (firstTask->taskType == MODIFY_TASK)
{
if (firstTask->rowValuesLists != NIL)
{
/*
* We always run multi-row INSERTs in a sequential
* mode (hard-coded). Thus, we do not mark as parallel
* access even if the prerequisites hold.
*/
}
else
{
/*
* We prefer to mark with all remaining multi-shard modifications
* with both modify and select accesses.
*/
RecordRelationParallelModifyAccessForTask(firstTask);
RecordRelationParallelSelectAccessForTask(firstTask);
}
}
else
{
RecordRelationParallelDDLAccessForTask(firstTask);
}
}
/*
* RecordRelationParallelSelectAccessForTask goes over all the relations
* in the relationShardList and records the select access per each table.
*/
void
static void
RecordRelationParallelSelectAccessForTask(Task *task)
{
List *relationShardList = NIL;
@ -327,7 +391,7 @@ RecordRelationParallelSelectAccessForTask(Task *task)
* where as the subqueries inside the modify query is recorded with select
* access.
*/
void
static void
RecordRelationParallelModifyAccessForTask(Task *task)
{
List *relationShardList = NULL;
@ -374,7 +438,7 @@ RecordRelationParallelModifyAccessForTask(Task *task)
* DDL commands such as foreign key creation. The function also records
* the relation that anchorShardId belongs to.
*/
void
static void
RecordRelationParallelDDLAccessForTask(Task *task)
{
List *relationShardList = task->relationShardList;

View File

@ -31,12 +31,10 @@ extern void AllocateRelationAccessHash(void);
extern void ResetRelationAccessHash(void);
extern void AssociatePlacementAccessWithRelation(ShardPlacement *placement,
ShardPlacementAccessType accessType);
extern void RecordParallelRelationAccessForTaskList(List *taskList);
extern void RecordParallelSelectAccess(Oid relationId);
extern void RecordRelationParallelSelectAccessForTask(Task *task);
extern void RecordRelationParallelModifyAccessForTask(Task *task);
extern void RecordParallelModifyAccess(Oid relationId);
extern void RecordParallelDDLAccess(Oid relationId);
extern void RecordRelationParallelDDLAccessForTask(Task *task);
extern RelationAccessMode GetRelationDDLAccessMode(Oid relationId);
extern RelationAccessMode GetRelationDMLAccessMode(Oid relationId);
extern RelationAccessMode GetRelationSelectAccessMode(Oid relationId);