mirror of https://github.com/citusdata/citus.git
Fixes typo and renames multi_process_utility (#7259)
parent
5eaf6c221e
commit
71a4633dad
|
@ -938,7 +938,7 @@ CreateIndexTaskList(IndexStmt *indexStmt)
|
||||||
task->dependentTaskList = NULL;
|
task->dependentTaskList = NULL;
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
||||||
task->cannotBeExecutedInTransction = indexStmt->concurrent;
|
task->cannotBeExecutedInTransaction = indexStmt->concurrent;
|
||||||
|
|
||||||
taskList = lappend(taskList, task);
|
taskList = lappend(taskList, task);
|
||||||
|
|
||||||
|
@ -983,7 +983,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt)
|
||||||
task->dependentTaskList = NULL;
|
task->dependentTaskList = NULL;
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
||||||
task->cannotBeExecutedInTransction =
|
task->cannotBeExecutedInTransaction =
|
||||||
IsReindexWithParam_compat(reindexStmt, "concurrently");
|
IsReindexWithParam_compat(reindexStmt, "concurrently");
|
||||||
|
|
||||||
taskList = lappend(taskList, task);
|
taskList = lappend(taskList, task);
|
||||||
|
@ -1309,7 +1309,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
|
||||||
task->dependentTaskList = NULL;
|
task->dependentTaskList = NULL;
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
||||||
task->cannotBeExecutedInTransction = dropStmt->concurrent;
|
task->cannotBeExecutedInTransaction = dropStmt->concurrent;
|
||||||
|
|
||||||
taskList = lappend(taskList, task);
|
taskList = lappend(taskList, task);
|
||||||
|
|
||||||
|
|
|
@ -95,13 +95,13 @@ int UtilityHookLevel = 0;
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations for helper functions */
|
/* Local functions forward declarations for helper functions */
|
||||||
static void ProcessUtilityInternal(PlannedStmt *pstmt,
|
static void citus_ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
const char *queryString,
|
const char *queryString,
|
||||||
ProcessUtilityContext context,
|
ProcessUtilityContext context,
|
||||||
ParamListInfo params,
|
ParamListInfo params,
|
||||||
struct QueryEnvironment *queryEnv,
|
struct QueryEnvironment *queryEnv,
|
||||||
DestReceiver *dest,
|
DestReceiver *dest,
|
||||||
QueryCompletion *completionTag);
|
QueryCompletion *completionTag);
|
||||||
static void set_indexsafe_procflags(void);
|
static void set_indexsafe_procflags(void);
|
||||||
static char * CurrentSearchPath(void);
|
static char * CurrentSearchPath(void);
|
||||||
static void IncrementUtilityHookCountersIfNecessary(Node *parsetree);
|
static void IncrementUtilityHookCountersIfNecessary(Node *parsetree);
|
||||||
|
@ -130,7 +130,7 @@ ProcessUtilityParseTree(Node *node, const char *queryString, ProcessUtilityConte
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* multi_ProcessUtility is the main entry hook for implementing Citus-specific
|
* citus_ProcessUtility is the main entry hook for implementing Citus-specific
|
||||||
* utility behavior. Its primary responsibilities are intercepting COPY and DDL
|
* utility behavior. Its primary responsibilities are intercepting COPY and DDL
|
||||||
* commands and augmenting the coordinator's command with corresponding tasks
|
* commands and augmenting the coordinator's command with corresponding tasks
|
||||||
* to be run on worker nodes, after suitably ensuring said commands' options
|
* to be run on worker nodes, after suitably ensuring said commands' options
|
||||||
|
@ -139,7 +139,7 @@ ProcessUtilityParseTree(Node *node, const char *queryString, ProcessUtilityConte
|
||||||
* TRUNCATE and VACUUM are also supported.
|
* TRUNCATE and VACUUM are also supported.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
multi_ProcessUtility(PlannedStmt *pstmt,
|
citus_ProcessUtility(PlannedStmt *pstmt,
|
||||||
const char *queryString,
|
const char *queryString,
|
||||||
bool readOnlyTree,
|
bool readOnlyTree,
|
||||||
ProcessUtilityContext context,
|
ProcessUtilityContext context,
|
||||||
|
@ -329,8 +329,8 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
|
|
||||||
PG_TRY();
|
PG_TRY();
|
||||||
{
|
{
|
||||||
ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest,
|
citus_ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest,
|
||||||
completionTag);
|
completionTag);
|
||||||
|
|
||||||
if (UtilityHookLevel == 1)
|
if (UtilityHookLevel == 1)
|
||||||
{
|
{
|
||||||
|
@ -404,7 +404,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ProcessUtilityInternal is a helper function for multi_ProcessUtility where majority
|
* citus_ProcessUtilityInternal is a helper function for citus_ProcessUtility where majority
|
||||||
* of the Citus specific utility statements are handled here. The distinction between
|
* of the Citus specific utility statements are handled here. The distinction between
|
||||||
* both functions is that Citus_ProcessUtility does not handle CALL and DO statements.
|
* both functions is that Citus_ProcessUtility does not handle CALL and DO statements.
|
||||||
* The reason for the distinction is implemented to be able to find the "top-level" DDL
|
* The reason for the distinction is implemented to be able to find the "top-level" DDL
|
||||||
|
@ -412,13 +412,13 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
* this goal.
|
* this goal.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
ProcessUtilityInternal(PlannedStmt *pstmt,
|
citus_ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
const char *queryString,
|
const char *queryString,
|
||||||
ProcessUtilityContext context,
|
ProcessUtilityContext context,
|
||||||
ParamListInfo params,
|
ParamListInfo params,
|
||||||
struct QueryEnvironment *queryEnv,
|
struct QueryEnvironment *queryEnv,
|
||||||
DestReceiver *dest,
|
DestReceiver *dest,
|
||||||
QueryCompletion *completionTag)
|
QueryCompletion *completionTag)
|
||||||
{
|
{
|
||||||
Node *parsetree = pstmt->utilityStmt;
|
Node *parsetree = pstmt->utilityStmt;
|
||||||
List *ddlJobs = NIL;
|
List *ddlJobs = NIL;
|
||||||
|
@ -1386,7 +1386,7 @@ PostStandardProcessUtility(Node *parsetree)
|
||||||
* on the local table first. However, in order to decide whether the
|
* on the local table first. However, in order to decide whether the
|
||||||
* command leads to an invalidation, we need to check before the command
|
* command leads to an invalidation, we need to check before the command
|
||||||
* is being executed since we read pg_constraint table. Thus, we maintain a
|
* is being executed since we read pg_constraint table. Thus, we maintain a
|
||||||
* local flag and do the invalidation after multi_ProcessUtility,
|
* local flag and do the invalidation after citus_ProcessUtility,
|
||||||
* before ExecuteDistributedDDLJob().
|
* before ExecuteDistributedDDLJob().
|
||||||
*/
|
*/
|
||||||
InvalidateForeignKeyGraphForDDL();
|
InvalidateForeignKeyGraphForDDL();
|
||||||
|
|
|
@ -279,7 +279,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum
|
||||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
||||||
task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM);
|
task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM);
|
||||||
|
|
||||||
taskList = lappend(taskList, task);
|
taskList = lappend(taskList, task);
|
||||||
}
|
}
|
||||||
|
@ -719,7 +719,7 @@ ExecuteUnqualifiedVacuumTasks(VacuumStmt *vacuumStmt, CitusVacuumParams vacuumPa
|
||||||
SetTaskQueryStringList(task, unqualifiedVacuumCommands);
|
SetTaskQueryStringList(task, unqualifiedVacuumCommands);
|
||||||
task->dependentTaskList = NULL;
|
task->dependentTaskList = NULL;
|
||||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||||
task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM);
|
task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM);
|
||||||
|
|
||||||
|
|
||||||
bool hasPeerWorker = false;
|
bool hasPeerWorker = false;
|
||||||
|
|
|
@ -61,7 +61,7 @@ TaskListRequiresRollback(List *taskList)
|
||||||
}
|
}
|
||||||
|
|
||||||
Task *task = (Task *) linitial(taskList);
|
Task *task = (Task *) linitial(taskList);
|
||||||
if (task->cannotBeExecutedInTransction)
|
if (task->cannotBeExecutedInTransaction)
|
||||||
{
|
{
|
||||||
/* vacuum, create index concurrently etc. */
|
/* vacuum, create index concurrently etc. */
|
||||||
return false;
|
return false;
|
||||||
|
@ -164,7 +164,7 @@ TaskListCannotBeExecutedInTransaction(List *taskList)
|
||||||
Task *task = NULL;
|
Task *task = NULL;
|
||||||
foreach_ptr(task, taskList)
|
foreach_ptr(task, taskList)
|
||||||
{
|
{
|
||||||
if (task->cannotBeExecutedInTransction)
|
if (task->cannotBeExecutedInTransaction)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -543,7 +543,7 @@ _PG_init(void)
|
||||||
*/
|
*/
|
||||||
PrevProcessUtility = (ProcessUtility_hook != NULL) ?
|
PrevProcessUtility = (ProcessUtility_hook != NULL) ?
|
||||||
ProcessUtility_hook : standard_ProcessUtility;
|
ProcessUtility_hook : standard_ProcessUtility;
|
||||||
ProcessUtility_hook = multi_ProcessUtility;
|
ProcessUtility_hook = citus_ProcessUtility;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Acquire symbols for columnar functions that citus calls.
|
* Acquire symbols for columnar functions that citus calls.
|
||||||
|
|
|
@ -326,7 +326,7 @@ CopyNodeTask(COPYFUNC_ARGS)
|
||||||
COPY_STRING_FIELD(fetchedExplainAnalyzePlan);
|
COPY_STRING_FIELD(fetchedExplainAnalyzePlan);
|
||||||
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
|
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
|
||||||
COPY_SCALAR_FIELD(isLocalTableModification);
|
COPY_SCALAR_FIELD(isLocalTableModification);
|
||||||
COPY_SCALAR_FIELD(cannotBeExecutedInTransction);
|
COPY_SCALAR_FIELD(cannotBeExecutedInTransaction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -535,7 +535,7 @@ OutTask(OUTFUNC_ARGS)
|
||||||
WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
|
WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
|
||||||
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
|
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
|
||||||
WRITE_BOOL_FIELD(isLocalTableModification);
|
WRITE_BOOL_FIELD(isLocalTableModification);
|
||||||
WRITE_BOOL_FIELD(cannotBeExecutedInTransction);
|
WRITE_BOOL_FIELD(cannotBeExecutedInTransaction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -78,7 +78,7 @@ typedef struct DDLJob
|
||||||
|
|
||||||
extern ProcessUtility_hook_type PrevProcessUtility;
|
extern ProcessUtility_hook_type PrevProcessUtility;
|
||||||
|
|
||||||
extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
|
extern void citus_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
|
||||||
bool readOnlyTree,
|
bool readOnlyTree,
|
||||||
ProcessUtilityContext context, ParamListInfo params,
|
ProcessUtilityContext context, ParamListInfo params,
|
||||||
struct QueryEnvironment *queryEnv, DestReceiver *dest,
|
struct QueryEnvironment *queryEnv, DestReceiver *dest,
|
||||||
|
|
|
@ -329,7 +329,7 @@ typedef struct Task
|
||||||
/*
|
/*
|
||||||
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
|
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
|
||||||
*/
|
*/
|
||||||
bool cannotBeExecutedInTransction;
|
bool cannotBeExecutedInTransaction;
|
||||||
|
|
||||||
Const *partitionKeyValue;
|
Const *partitionKeyValue;
|
||||||
int colocationId;
|
int colocationId;
|
||||||
|
|
Loading…
Reference in New Issue