Address review feedback

Should just about do it.
pull/1287/head
Jason Petersen 2017-04-03 11:42:57 -06:00
parent cf775c4773
commit 4cdfc3a10f
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
6 changed files with 23 additions and 33 deletions

View File

@ -870,13 +870,13 @@ ExecuteModifyTasksWithoutResults(List *taskList)
/*
* ExecuteSequentialTasksWithoutResults basically calls ExecuteModifyTasks in a
* loop in order to simulate sequential execution of a list of tasks. Useful in
* cases where issuing commands in parallel before waiting for results could
* ExecuteTasksSequentiallyWithoutResults basically calls ExecuteModifyTasks in
* a loop in order to simulate sequential execution of a list of tasks. Useful
* in cases where issuing commands in parallel before waiting for results could
* result in deadlocks (such as CREATE INDEX CONCURRENTLY).
*/
int64
ExecuteSequentialTasksWithoutResults(List *taskList)
void
ExecuteTasksSequentiallyWithoutResults(List *taskList)
{
ListCell *taskCell = NULL;
@ -885,10 +885,8 @@ ExecuteSequentialTasksWithoutResults(List *taskList)
Task *task = (Task *) lfirst(taskCell);
List *singleTask = list_make1(task);
ExecuteModifyTasks(singleTask, false, NULL, NULL);
ExecuteModifyTasksWithoutResults(singleTask);
}
return 0;
}

View File

@ -77,7 +77,6 @@
#include "utils/palloc.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
@ -136,7 +135,7 @@ static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
static void ShowNoticeIfNotUsing2PC(void);
static List * DDLTaskList(Oid relationId, const char *commandString);
static List * IndexTaskList(Oid relationId, IndexStmt *indexStmt);
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString);
@ -684,9 +683,9 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
{
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId;
ddlJob->concurrent = createIndexStatement->concurrent;
ddlJob->concurrentIndexCmd = createIndexStatement->concurrent;
ddlJob->commandString = createIndexCommand;
ddlJob->taskList = IndexTaskList(relationId, createIndexStatement);
ddlJob->taskList = CreateIndexTaskList(relationId, createIndexStatement);
ddlJobs = list_make1(ddlJob);
}
@ -778,7 +777,7 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
ddlJob->targetRelationId = distributedRelationId;
ddlJob->concurrent = dropIndexStatement->concurrent;
ddlJob->concurrentIndexCmd = dropIndexStatement->concurrent;
ddlJob->commandString = dropIndexCommand;
ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
dropIndexStatement);
@ -873,7 +872,7 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = leftRelationId;
ddlJob->concurrent = false;
ddlJob->concurrentIndexCmd = false;
ddlJob->commandString = alterTableCommand;
if (rightRelationId)
@ -1987,7 +1986,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
EnsureCoordinator();
if (!ddlJob->concurrent)
if (!ddlJob->concurrentIndexCmd)
{
ShowNoticeIfNotUsing2PC();
@ -2008,6 +2007,8 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
PG_TRY();
{
ExecuteTasksSequentiallyWithoutResults(ddlJob->taskList);
if (shouldSyncMetadata)
{
List *commandList = list_make2(DISABLE_DDL_PROPAGATION,
@ -2015,8 +2016,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList);
}
ExecuteSequentialTasksWithoutResults(ddlJob->taskList);
}
PG_CATCH();
{
@ -2024,8 +2023,8 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
(errmsg("CONCURRENTLY-enabled index command failed"),
errdetail("CONCURRENTLY-enabled index commands can fail partially, "
"leaving behind an INVALID index."),
errhint("Use DROP INDEX IF EXISTS to remove the invalid index, then "
"retry the original command.")));
errhint("Use DROP INDEX CONCURRENTLY IF EXISTS to remove the "
"invalid index, then retry the original command.")));
}
PG_END_TRY();
}
@ -2103,11 +2102,11 @@ DDLTaskList(Oid relationId, const char *commandString)
/*
* IndexTaskList builds a list of tasks to execute a CREATE INDEX command
* CreateIndexTaskList builds a list of tasks to execute a CREATE INDEX command
* against a specified distributed table.
*/
static List *
IndexTaskList(Oid relationId, IndexStmt *indexStmt)
CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt)
{
List *taskList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
@ -2714,7 +2713,7 @@ PlanGrantStmt(GrantStmt *grantStmt)
ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relOid;
ddlJob->concurrent = false;
ddlJob->concurrentIndexCmd = false;
ddlJob->commandString = pstrdup(ddlString.data);
ddlJob->taskList = DDLTaskList(relOid, ddlString.data);

View File

@ -83,13 +83,6 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList)
char *nodeUser = CitusExtensionOwnerName();
ListCell *commandCell = NULL;
if (XactModificationLevel > XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot open new connections after the first modification "
"command within a transaction")));
}
/* run commands serially */
foreach(workerNodeCell, workerNodeList)
{

View File

@ -41,7 +41,7 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern int64 ExecuteSequentialTasksWithoutResults(List *taskList);
extern void ExecuteTasksSequentiallyWithoutResults(List *taskList);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -23,7 +23,7 @@ extern bool EnableDDLPropagation;
typedef struct DDLJob
{
Oid targetRelationId; /* oid of the target distributed relation */
bool concurrent; /* related to a CONCURRENTLY index command? */
bool concurrentIndexCmd; /* related to a CONCURRENTLY index command? */
const char *commandString; /* initial (coordinator) DDL command string */
List *taskList; /* worker DDL tasks to execute */
} DDLJob;

View File

@ -252,7 +252,7 @@ CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON index_test_hash_102089(b);
CREATE INDEX CONCURRENTLY ith_b_idx ON index_test_hash(b);
ERROR: CONCURRENTLY-enabled index command failed
DETAIL: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index.
HINT: Use DROP INDEX IF EXISTS to remove the invalid index, then retry the original command.
HINT: Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index, then retry the original command.
-- the failure results in an INVALID index
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
Index Valid?
@ -276,7 +276,7 @@ DROP INDEX CONCURRENTLY ith_b_idx_102089;
DROP INDEX CONCURRENTLY ith_b_idx;
ERROR: CONCURRENTLY-enabled index command failed
DETAIL: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index.
HINT: Use DROP INDEX IF EXISTS to remove the invalid index, then retry the original command.
HINT: Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index, then retry the original command.
-- the failure results in an INVALID index
SELECT indisvalid AS "Index Valid?" FROM pg_index WHERE indexrelid='ith_b_idx'::regclass;
Index Valid?