mirror of https://github.com/citusdata/citus.git
Fixes review comments
parent
480c22acc2
commit
d0ef13dc89
|
@ -18,7 +18,7 @@ generated_downgrade_sql_files += $(patsubst %,$(citus_abs_srcdir)/build/sql/%,$(
|
||||||
DATA_built = $(generated_sql_files)
|
DATA_built = $(generated_sql_files)
|
||||||
|
|
||||||
# directories with source files
|
# directories with source files
|
||||||
SUBDIRS = . commands connection database ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock
|
SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib shardsplit test transaction utils worker clock
|
||||||
# enterprise modules
|
# enterprise modules
|
||||||
SUBDIRS += replication
|
SUBDIRS += replication
|
||||||
|
|
||||||
|
|
|
@ -36,24 +36,6 @@
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/adaptive_executor.h"
|
#include "distributed/adaptive_executor.h"
|
||||||
|
|
||||||
/* macros to add DefElems to a list */
|
|
||||||
#define DEFELEM_ADD_STRING(options, key, value) \
|
|
||||||
{ \
|
|
||||||
DefElem *elem = makeDefElem(key, (Node *) makeString(value), -1); \
|
|
||||||
options = lappend(options, elem); \
|
|
||||||
}
|
|
||||||
|
|
||||||
#define DEFELEM_ADD_BOOL(options, key, value) \
|
|
||||||
{ \
|
|
||||||
DefElem *elem = makeDefElem(key, (Node *) makeBoolean(value), -1); \
|
|
||||||
options = lappend(options, elem); \
|
|
||||||
}
|
|
||||||
|
|
||||||
#define DEFELEM_ADD_INT(options, key, value) \
|
|
||||||
{ \
|
|
||||||
DefElem *elem = makeDefElem(key, (Node *) makeInteger(value), -1); \
|
|
||||||
options = lappend(options, elem); \
|
|
||||||
}
|
|
||||||
|
|
||||||
static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
|
static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
|
||||||
|
|
||||||
|
@ -257,7 +239,7 @@ CreateDDLTaskList(char *command, List *workerNodeList, bool outsideTransaction)
|
||||||
Task *task = CitusMakeNode(Task);
|
Task *task = CitusMakeNode(Task);
|
||||||
task->taskType = DDL_TASK;
|
task->taskType = DDL_TASK;
|
||||||
SetTaskQueryStringList(task, commandList);
|
SetTaskQueryStringList(task, commandList);
|
||||||
task->cannotBeExecutedInTransction = outsideTransaction;
|
task->cannotBeExecutedInTransaction = outsideTransaction;
|
||||||
|
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
foreach_ptr(workerNode, workerNodeList)
|
foreach_ptr(workerNode, workerNodeList)
|
||||||
|
@ -306,7 +288,7 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PostprocessCreatedbStmt creates the plan to synchronize CREATE DATABASE
|
* PostprocessCreatedbStmt creates the plan to synchronize CREATE DATABASE
|
||||||
* across nodes. We use the cannotBeExecutedInTransction option to avoid
|
* across nodes. We use the cannotBeExecutedInTransaction option to avoid
|
||||||
* sending transaction blocks.
|
* sending transaction blocks.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
|
|
|
@ -938,7 +938,7 @@ CreateIndexTaskList(IndexStmt *indexStmt)
|
||||||
task->dependentTaskList = NULL;
|
task->dependentTaskList = NULL;
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
||||||
task->cannotBeExecutedInTransction = indexStmt->concurrent;
|
task->cannotBeExecutedInTransaction = indexStmt->concurrent;
|
||||||
|
|
||||||
taskList = lappend(taskList, task);
|
taskList = lappend(taskList, task);
|
||||||
|
|
||||||
|
@ -983,7 +983,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt)
|
||||||
task->dependentTaskList = NULL;
|
task->dependentTaskList = NULL;
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
||||||
task->cannotBeExecutedInTransction =
|
task->cannotBeExecutedInTransaction =
|
||||||
IsReindexWithParam_compat(reindexStmt, "concurrently");
|
IsReindexWithParam_compat(reindexStmt, "concurrently");
|
||||||
|
|
||||||
taskList = lappend(taskList, task);
|
taskList = lappend(taskList, task);
|
||||||
|
@ -1309,7 +1309,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
|
||||||
task->dependentTaskList = NULL;
|
task->dependentTaskList = NULL;
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
||||||
task->cannotBeExecutedInTransction = dropStmt->concurrent;
|
task->cannotBeExecutedInTransaction = dropStmt->concurrent;
|
||||||
|
|
||||||
taskList = lappend(taskList, task);
|
taskList = lappend(taskList, task);
|
||||||
|
|
||||||
|
|
|
@ -279,7 +279,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum
|
||||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
||||||
task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM);
|
task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM);
|
||||||
|
|
||||||
taskList = lappend(taskList, task);
|
taskList = lappend(taskList, task);
|
||||||
}
|
}
|
||||||
|
@ -719,7 +719,7 @@ ExecuteUnqualifiedVacuumTasks(VacuumStmt *vacuumStmt, CitusVacuumParams vacuumPa
|
||||||
SetTaskQueryStringList(task, unqualifiedVacuumCommands);
|
SetTaskQueryStringList(task, unqualifiedVacuumCommands);
|
||||||
task->dependentTaskList = NULL;
|
task->dependentTaskList = NULL;
|
||||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||||
task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM);
|
task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM);
|
||||||
|
|
||||||
|
|
||||||
bool hasPeerWorker = false;
|
bool hasPeerWorker = false;
|
||||||
|
|
|
@ -61,7 +61,7 @@ TaskListRequiresRollback(List *taskList)
|
||||||
}
|
}
|
||||||
|
|
||||||
Task *task = (Task *) linitial(taskList);
|
Task *task = (Task *) linitial(taskList);
|
||||||
if (task->cannotBeExecutedInTransction)
|
if (task->cannotBeExecutedInTransaction)
|
||||||
{
|
{
|
||||||
/* vacuum, create index concurrently etc. */
|
/* vacuum, create index concurrently etc. */
|
||||||
return false;
|
return false;
|
||||||
|
@ -164,7 +164,7 @@ TaskListCannotBeExecutedInTransaction(List *taskList)
|
||||||
Task *task = NULL;
|
Task *task = NULL;
|
||||||
foreach_ptr(task, taskList)
|
foreach_ptr(task, taskList)
|
||||||
{
|
{
|
||||||
if (task->cannotBeExecutedInTransction)
|
if (task->cannotBeExecutedInTransaction)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -535,7 +535,7 @@ OutTask(OUTFUNC_ARGS)
|
||||||
WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
|
WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
|
||||||
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
|
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
|
||||||
WRITE_BOOL_FIELD(isLocalTableModification);
|
WRITE_BOOL_FIELD(isLocalTableModification);
|
||||||
WRITE_BOOL_FIELD(cannotBeExecutedInTransction);
|
WRITE_BOOL_FIELD(cannotBeExecutedInTransaction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -329,7 +329,7 @@ typedef struct Task
|
||||||
/*
|
/*
|
||||||
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
|
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
|
||||||
*/
|
*/
|
||||||
bool cannotBeExecutedInTransction;
|
bool cannotBeExecutedInTransaction;
|
||||||
|
|
||||||
Const *partitionKeyValue;
|
Const *partitionKeyValue;
|
||||||
int colocationId;
|
int colocationId;
|
||||||
|
|
Loading…
Reference in New Issue