mirror of https://github.com/citusdata/citus.git
Some small typos & cleanup
parent
2a9fccc7a0
commit
3c99db40b9
|
@ -37,9 +37,9 @@
|
||||||
* same connection since it may hold relevant locks or have uncommitted
|
* same connection since it may hold relevant locks or have uncommitted
|
||||||
* writes. In that case we "assign" the task to a connection by adding
|
* writes. In that case we "assign" the task to a connection by adding
|
||||||
* it to the task queue of specific connection (in
|
* it to the task queue of specific connection (in
|
||||||
* AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task unassigned
|
* AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task
|
||||||
* and add it to the task queue of a worker pool, which means that it
|
* unassigned and add it to the task queue of a worker pool, which means
|
||||||
* can be executed over any connection in the pool.
|
* that it can be executed over any connection in the pool.
|
||||||
*
|
*
|
||||||
* A task may be executed on multiple placements in case of a reference
|
* A task may be executed on multiple placements in case of a reference
|
||||||
* table or a replicated distributed table. Depending on the type of
|
* table or a replicated distributed table. Depending on the type of
|
||||||
|
@ -772,7 +772,7 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
|
||||||
uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList);
|
uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We're deliberately not setting execution->rowsProceessed here. The main reason
|
* We're deliberately not setting execution->rowsProcessed here. The main reason
|
||||||
* is that modifications to reference tables would end-up setting it both here
|
* is that modifications to reference tables would end-up setting it both here
|
||||||
* and in AdaptiveExecutor. Instead, we set executorState here and skip updating it
|
* and in AdaptiveExecutor. Instead, we set executorState here and skip updating it
|
||||||
* for reference table modifications in AdaptiveExecutor.
|
* for reference table modifications in AdaptiveExecutor.
|
||||||
|
@ -884,7 +884,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
|
||||||
ParamListInfo paramListInfo = NULL;
|
ParamListInfo paramListInfo = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The code-paths that rely on this function do not know how execute
|
* The code-paths that rely on this function do not know how to execute
|
||||||
* commands locally.
|
* commands locally.
|
||||||
*/
|
*/
|
||||||
ErrorIfTransactionAccessedPlacementsLocally();
|
ErrorIfTransactionAccessedPlacementsLocally();
|
||||||
|
|
|
@ -134,7 +134,8 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
|
||||||
|
|
||||||
if (paramListInfo != NULL)
|
if (paramListInfo != NULL)
|
||||||
{
|
{
|
||||||
const char **parameterValues = NULL; /* not used anywhere, so decleare here */
|
/* not used anywhere, so declare here */
|
||||||
|
const char **parameterValues = NULL;
|
||||||
|
|
||||||
ExtractParametersForLocalExecution(paramListInfo, ¶meterTypes,
|
ExtractParametersForLocalExecution(paramListInfo, ¶meterTypes,
|
||||||
¶meterValues);
|
¶meterValues);
|
||||||
|
@ -228,6 +229,25 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LogLocalCommand logs commands executed locally on this node. Although we're
|
||||||
|
* talking about local execution, the function relies on citus.log_remote_commands
|
||||||
|
* GUC. This makes sense because the local execution is still on a shard of a
|
||||||
|
* distributed table, meaning it is part of distributed execution.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
LogLocalCommand(Task *task)
|
||||||
|
{
|
||||||
|
if (!(LogRemoteCommands || LogLocalCommands))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ereport(NOTICE, (errmsg("executing the command locally: %s",
|
||||||
|
ApplyLogRedaction(TaskQueryString(task)))));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExtractLocalAndRemoteTasks gets a taskList and generates two
|
* ExtractLocalAndRemoteTasks gets a taskList and generates two
|
||||||
* task lists namely localTaskList and remoteTaskList. The function goes
|
* task lists namely localTaskList and remoteTaskList. The function goes
|
||||||
|
@ -236,9 +256,9 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT
|
||||||
* the lists could be NIL depending on the input taskList.
|
* the lists could be NIL depending on the input taskList.
|
||||||
*
|
*
|
||||||
* One slightly different case is modifications to replicated tables
|
* One slightly different case is modifications to replicated tables
|
||||||
* (e.g., reference tables) where a single task ends in two seperate tasks
|
* (e.g., reference tables) where a single task ends in two separate tasks
|
||||||
* and the local task is added to localTaskList and the remanings to the
|
* and the local task is added to localTaskList and the remaning ones to
|
||||||
* remoteTaskList.
|
* the remoteTaskList.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
|
ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
|
||||||
|
@ -280,8 +300,6 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Task *remoteTask = NULL;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* At this point, we're dealing with reference tables or intermediate results
|
* At this point, we're dealing with reference tables or intermediate results
|
||||||
* where the task has placements on both local and remote nodes. We always
|
* where the task has placements on both local and remote nodes. We always
|
||||||
|
@ -301,7 +319,7 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
remoteTask = copyObject(task);
|
Task *remoteTask = copyObject(task);
|
||||||
remoteTask->taskPlacementList = remoteTaskPlacementList;
|
remoteTask->taskPlacementList = remoteTaskPlacementList;
|
||||||
|
|
||||||
*remoteTaskList = lappend(*remoteTaskList, remoteTask);
|
*remoteTaskList = lappend(*remoteTaskList, remoteTask);
|
||||||
|
@ -353,7 +371,7 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que
|
||||||
{
|
{
|
||||||
EState *executorState = ScanStateGetExecutorState(scanState);
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
||||||
DestReceiver *tupleStoreDestReceiever = CreateDestReceiver(DestTuplestore);
|
DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore);
|
||||||
ScanDirection scanDirection = ForwardScanDirection;
|
ScanDirection scanDirection = ForwardScanDirection;
|
||||||
QueryEnvironment *queryEnv = create_queryEnv();
|
QueryEnvironment *queryEnv = create_queryEnv();
|
||||||
int eflags = 0;
|
int eflags = 0;
|
||||||
|
@ -363,14 +381,14 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que
|
||||||
* Use the tupleStore provided by the scanState because it is shared accross
|
* Use the tupleStore provided by the scanState because it is shared accross
|
||||||
* the other task executions and the adaptive executor.
|
* the other task executions and the adaptive executor.
|
||||||
*/
|
*/
|
||||||
SetTuplestoreDestReceiverParams(tupleStoreDestReceiever,
|
SetTuplestoreDestReceiverParams(tupleStoreDestReceiver,
|
||||||
scanState->tuplestorestate,
|
scanState->tuplestorestate,
|
||||||
CurrentMemoryContext, false);
|
CurrentMemoryContext, false);
|
||||||
|
|
||||||
/* Create a QueryDesc for the query */
|
/* Create a QueryDesc for the query */
|
||||||
QueryDesc *queryDesc = CreateQueryDesc(taskPlan, queryString,
|
QueryDesc *queryDesc = CreateQueryDesc(taskPlan, queryString,
|
||||||
GetActiveSnapshot(), InvalidSnapshot,
|
GetActiveSnapshot(), InvalidSnapshot,
|
||||||
tupleStoreDestReceiever, paramListInfo,
|
tupleStoreDestReceiver, paramListInfo,
|
||||||
queryEnv, 0);
|
queryEnv, 0);
|
||||||
|
|
||||||
ExecutorStart(queryDesc, eflags);
|
ExecutorStart(queryDesc, eflags);
|
||||||
|
@ -430,6 +448,7 @@ ShouldExecuteTasksLocally(List *taskList)
|
||||||
}
|
}
|
||||||
|
|
||||||
bool singleTask = (list_length(taskList) == 1);
|
bool singleTask = (list_length(taskList) == 1);
|
||||||
|
|
||||||
if (singleTask && TaskAccessesLocalNode((Task *) linitial(taskList)))
|
if (singleTask && TaskAccessesLocalNode((Task *) linitial(taskList)))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
@ -455,7 +474,7 @@ ShouldExecuteTasksLocally(List *taskList)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* For multi-task executions, switching to local execution would likely to
|
* For multi-task executions, switching to local execution would likely to
|
||||||
* perform poorly, because we'd lose the parallelizm. Note that the local
|
* perform poorly, because we'd lose the parallelism. Note that the local
|
||||||
* execution is happening one task at a time (e.g., similar to sequential
|
* execution is happening one task at a time (e.g., similar to sequential
|
||||||
* distributed execution).
|
* distributed execution).
|
||||||
*/
|
*/
|
||||||
|
@ -515,25 +534,6 @@ ErrorIfTransactionAccessedPlacementsLocally(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* LogLocalCommand logs commands executed locally on this node. Although we're
|
|
||||||
* talking about local execution, the function relies on citus.log_remote_commands GUC.
|
|
||||||
* This makes sense because the local execution is still on a shard of a distributed table,
|
|
||||||
* meaning it is part of distributed execution.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
LogLocalCommand(Task *task)
|
|
||||||
{
|
|
||||||
if (!(LogRemoteCommands || LogLocalCommands))
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
ereport(NOTICE, (errmsg("executing the command locally: %s",
|
|
||||||
ApplyLogRedaction(TaskQueryString(task)))));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DisableLocalExecution simply a C interface for
|
* DisableLocalExecution simply a C interface for
|
||||||
* setting the following:
|
* setting the following:
|
||||||
|
|
|
@ -27,7 +27,6 @@
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
|
||||||
static List * TruncateTaskList(Oid relationId);
|
static List * TruncateTaskList(Oid relationId);
|
||||||
|
|
||||||
|
|
||||||
|
@ -89,29 +88,36 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
|
||||||
static List *
|
static List *
|
||||||
TruncateTaskList(Oid relationId)
|
TruncateTaskList(Oid relationId)
|
||||||
{
|
{
|
||||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
/* resulting task list */
|
||||||
ListCell *shardIntervalCell = NULL;
|
|
||||||
List *taskList = NIL;
|
List *taskList = NIL;
|
||||||
|
|
||||||
|
/* enumerate the tasks when putting them to the taskList */
|
||||||
int taskId = 1;
|
int taskId = 1;
|
||||||
|
|
||||||
Oid schemaId = get_rel_namespace(relationId);
|
Oid schemaId = get_rel_namespace(relationId);
|
||||||
char *schemaName = get_namespace_name(schemaId);
|
char *schemaName = get_namespace_name(schemaId);
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = get_rel_name(relationId);
|
||||||
|
|
||||||
|
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||||
|
|
||||||
/* lock metadata before getting placement lists */
|
/* lock metadata before getting placement lists */
|
||||||
LockShardListMetadata(shardIntervalList, ShareLock);
|
LockShardListMetadata(shardIntervalList, ShareLock);
|
||||||
|
|
||||||
|
ListCell *shardIntervalCell = NULL;
|
||||||
|
|
||||||
foreach(shardIntervalCell, shardIntervalList)
|
foreach(shardIntervalCell, shardIntervalList)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||||
uint64 shardId = shardInterval->shardId;
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
char *shardRelationName = pstrdup(relationName);
|
||||||
|
|
||||||
|
/* build shard relation name */
|
||||||
|
AppendShardIdToName(&shardRelationName, shardId);
|
||||||
|
|
||||||
|
char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName);
|
||||||
|
|
||||||
StringInfo shardQueryString = makeStringInfo();
|
StringInfo shardQueryString = makeStringInfo();
|
||||||
char *shardName = pstrdup(relationName);
|
appendStringInfo(shardQueryString, "TRUNCATE TABLE %s CASCADE", quotedShardName);
|
||||||
|
|
||||||
AppendShardIdToName(&shardName, shardId);
|
|
||||||
|
|
||||||
appendStringInfo(shardQueryString, "TRUNCATE TABLE %s CASCADE",
|
|
||||||
quote_qualified_identifier(schemaName, shardName));
|
|
||||||
|
|
||||||
Task *task = CitusMakeNode(Task);
|
Task *task = CitusMakeNode(Task);
|
||||||
task->jobId = INVALID_JOB_ID;
|
task->jobId = INVALID_JOB_ID;
|
||||||
|
|
|
@ -20,16 +20,14 @@ extern bool LogLocalCommands;
|
||||||
extern bool TransactionAccessedLocalPlacement;
|
extern bool TransactionAccessedLocalPlacement;
|
||||||
extern bool TransactionConnectedToLocalGroup;
|
extern bool TransactionConnectedToLocalGroup;
|
||||||
|
|
||||||
|
/* extern function declarations */
|
||||||
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList);
|
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList);
|
||||||
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
|
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
|
||||||
List **localTaskList, List **remoteTaskList);
|
List **localTaskList, List **remoteTaskList);
|
||||||
extern bool ShouldExecuteTasksLocally(List *taskList);
|
extern bool ShouldExecuteTasksLocally(List *taskList);
|
||||||
|
extern bool AnyTaskAccessesLocalNode(List *taskList);
|
||||||
|
extern bool TaskAccessesLocalNode(Task *task);
|
||||||
extern void ErrorIfTransactionAccessedPlacementsLocally(void);
|
extern void ErrorIfTransactionAccessedPlacementsLocally(void);
|
||||||
extern void SetTaskQueryAndPlacementList(Task *task, Query *query, List *placementList);
|
|
||||||
extern char * TaskQueryString(Task *task);
|
|
||||||
extern bool TaskAccessesLocalNode(Task *task);
|
|
||||||
extern void DisableLocalExecution(void);
|
extern void DisableLocalExecution(void);
|
||||||
extern bool AnyTaskAccessesRemoteNode(List *taskList);
|
|
||||||
extern bool TaskAccessesLocalNode(Task *task);
|
|
||||||
|
|
||||||
#endif /* LOCAL_EXECUTION_H */
|
#endif /* LOCAL_EXECUTION_H */
|
||||||
|
|
Loading…
Reference in New Issue