From 4cdfc3a10f52e77a6b214a28062a6d7e19ebad36 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Mon, 3 Apr 2017 11:42:57 -0600 Subject: [PATCH] Address review feedback Should just about do it. --- .../executor/multi_router_executor.c | 14 +++++----- .../distributed/executor/multi_utility.c | 27 +++++++++---------- .../transaction/worker_transaction.c | 7 ----- .../distributed/multi_router_executor.h | 2 +- src/include/distributed/multi_utility.h | 2 +- .../expected/multi_index_statements.out | 4 +-- 6 files changed, 23 insertions(+), 33 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index b377d9ba6..e546c3311 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -870,13 +870,13 @@ ExecuteModifyTasksWithoutResults(List *taskList) /* - * ExecuteSequentialTasksWithoutResults basically calls ExecuteModifyTasks in a - * loop in order to simulate sequential execution of a list of tasks. Useful in - * cases where issuing commands in parallel before waiting for results could + * ExecuteTasksSequentiallyWithoutResults basically calls ExecuteModifyTasks in + * a loop in order to simulate sequential execution of a list of tasks. Useful + * in cases where issuing commands in parallel before waiting for results could * result in deadlocks (such as CREATE INDEX CONCURRENTLY). */ -int64 -ExecuteSequentialTasksWithoutResults(List *taskList) +void +ExecuteTasksSequentiallyWithoutResults(List *taskList) { ListCell *taskCell = NULL; @@ -885,10 +885,8 @@ ExecuteSequentialTasksWithoutResults(List *taskList) Task *task = (Task *) lfirst(taskCell); List *singleTask = list_make1(task); - ExecuteModifyTasks(singleTask, false, NULL, NULL); + ExecuteModifyTasksWithoutResults(singleTask); } - - return 0; } diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 485ff2da7..d70fd1930 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -77,7 +77,6 @@ #include "utils/palloc.h" #include "utils/rel.h" #include "utils/relcache.h" -#include "utils/snapmgr.h" #include "utils/syscache.h" @@ -136,7 +135,7 @@ static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); static void ExecuteDistributedDDLJob(DDLJob *ddlJob); static void ShowNoticeIfNotUsing2PC(void); static List * DDLTaskList(Oid relationId, const char *commandString); -static List * IndexTaskList(Oid relationId, IndexStmt *indexStmt); +static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt); static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt); static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, const char *commandString); @@ -684,9 +683,9 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) { DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; - ddlJob->concurrent = createIndexStatement->concurrent; + ddlJob->concurrentIndexCmd = createIndexStatement->concurrent; ddlJob->commandString = createIndexCommand; - ddlJob->taskList = IndexTaskList(relationId, createIndexStatement); + ddlJob->taskList = CreateIndexTaskList(relationId, createIndexStatement); ddlJobs = list_make1(ddlJob); } @@ -778,7 +777,7 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); ddlJob->targetRelationId = distributedRelationId; - ddlJob->concurrent = dropIndexStatement->concurrent; + ddlJob->concurrentIndexCmd = dropIndexStatement->concurrent; ddlJob->commandString = dropIndexCommand; ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId, dropIndexStatement); @@ -873,7 +872,7 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = leftRelationId; - ddlJob->concurrent = false; + ddlJob->concurrentIndexCmd = false; ddlJob->commandString = alterTableCommand; if (rightRelationId) @@ -1987,7 +1986,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) EnsureCoordinator(); - if (!ddlJob->concurrent) + if (!ddlJob->concurrentIndexCmd) { ShowNoticeIfNotUsing2PC(); @@ -2008,6 +2007,8 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) PG_TRY(); { + ExecuteTasksSequentiallyWithoutResults(ddlJob->taskList); + if (shouldSyncMetadata) { List *commandList = list_make2(DISABLE_DDL_PROPAGATION, @@ -2015,8 +2016,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList); } - - ExecuteSequentialTasksWithoutResults(ddlJob->taskList); } PG_CATCH(); { @@ -2024,8 +2023,8 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) (errmsg("CONCURRENTLY-enabled index command failed"), errdetail("CONCURRENTLY-enabled index commands can fail partially, " "leaving behind an INVALID index."), - errhint("Use DROP INDEX IF EXISTS to remove the invalid index, then " - "retry the original command."))); + errhint("Use DROP INDEX CONCURRENTLY IF EXISTS to remove the " + "invalid index, then retry the original command."))); } PG_END_TRY(); } @@ -2103,11 +2102,11 @@ DDLTaskList(Oid relationId, const char *commandString) /* - * IndexTaskList builds a list of tasks to execute a CREATE INDEX command + * CreateIndexTaskList builds a list of tasks to execute a CREATE INDEX command * against a specified distributed table. */ static List * -IndexTaskList(Oid relationId, IndexStmt *indexStmt) +CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt) { List *taskList = NIL; List *shardIntervalList = LoadShardIntervalList(relationId); @@ -2714,7 +2713,7 @@ PlanGrantStmt(GrantStmt *grantStmt) ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relOid; - ddlJob->concurrent = false; + ddlJob->concurrentIndexCmd = false; ddlJob->commandString = pstrdup(ddlString.data); ddlJob->taskList = DDLTaskList(relOid, ddlString.data); diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index c792c6f9c..6f54de4c5 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -83,13 +83,6 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) char *nodeUser = CitusExtensionOwnerName(); ListCell *commandCell = NULL; - if (XactModificationLevel > XACT_MODIFICATION_NONE) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("cannot open new connections after the first modification " - "command within a transaction"))); - } - /* run commands serially */ foreach(workerNodeCell, workerNodeList) { diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 389e868c2..b5db9ddf9 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -41,7 +41,7 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); -extern int64 ExecuteSequentialTasksWithoutResults(List *taskList); +extern void ExecuteTasksSequentiallyWithoutResults(List *taskList); #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/include/distributed/multi_utility.h b/src/include/distributed/multi_utility.h index 8afad5d54..7444c10a1 100644 --- a/src/include/distributed/multi_utility.h +++ b/src/include/distributed/multi_utility.h @@ -23,7 +23,7 @@ extern bool EnableDDLPropagation; typedef struct DDLJob { Oid targetRelationId; /* oid of the target distributed relation */ - bool concurrent; /* related to a CONCURRENTLY index command? */ + bool concurrentIndexCmd; /* related to a CONCURRENTLY index command? */ const char *commandString; /* initial (coordinator) DDL command string */ List *taskList; /* worker DDL tasks to execute */ } DDLJob; diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index 3cce9ad30..b92999c8b 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -252,7 +252,7 @@ CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON index_test_hash_102089(b); CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b); ERROR: CONCURRENTLY-enabled index command failed DETAIL: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. -HINT: Use DROP INDEX IF EXISTS to remove the invalid index, then retry the original command. +HINT: Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index, then retry the original command. -- the failure results in an INVALID index SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass; Index Valid? @@ -276,7 +276,7 @@ DROP INDEX CONCURRENTLY ith_b_idx_102089; DROP INDEX CONCURRENTLY ith_b_idx; ERROR: CONCURRENTLY-enabled index command failed DETAIL: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. -HINT: Use DROP INDEX IF EXISTS to remove the invalid index, then retry the original command. +HINT: Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index, then retry the original command. -- the failure results in an INVALID index SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass; Index Valid?