Merge pull request #686 from citusdata/allow_cancellation_on_ddl_commands

Tidy Up multi_ProcessUtility
pull/678/head
Eren Başak 2016-07-28 17:34:14 +03:00 committed by GitHub
commit afb829b102
1 changed files with 2 additions and 80 deletions

View File

@ -92,7 +92,6 @@ static void SetLocalCommitProtocolTo2PC(void);
static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString);
static void ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId,
ShardConnections *shardConnections);
static bool AllFinalizedPlacementsAccessible(Oid relationId);
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
void *arg);
static void CheckCopyPermissions(CopyStmt *copyStatement);
@ -676,7 +675,6 @@ static Node *
ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
const char *alterObjectSchemaCommand, bool isTopLevel)
{
Oid relationId = InvalidOid;
bool noWait = false;
@ -1058,22 +1056,10 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
bool isTopLevel)
{
bool executionOK = false;
bool allPlacementsAccessible = false;
PreventTransactionChain(isTopLevel, "distributed DDL commands");
SetLocalCommitProtocolTo2PC();
allPlacementsAccessible = AllFinalizedPlacementsAccessible(relationId);
if (!allPlacementsAccessible)
{
ereport(ERROR, (errmsg("cannot execute command: %s", ddlCommandString),
errdetail("All finalized shard placements need to be accessible "
"to execute DDL commands on distributed tables.")));
}
/* make sure we don't process cancel signals */
HOLD_INTERRUPTS();
executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString);
/* if command could not be executed on any finalized shard placement, error out */
@ -1081,14 +1067,6 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
{
ereport(ERROR, (errmsg("could not execute DDL command on worker node shards")));
}
if (QueryCancelPending)
{
ereport(WARNING, (errmsg("cancel requests are ignored during DDL commands")));
QueryCancelPending = false;
}
RESUME_INTERRUPTS();
}
@ -1226,68 +1204,12 @@ ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId,
PQclear(result);
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
CHECK_FOR_INTERRUPTS();
}
}
/*
* AllFinalizedPlacementsAccessible returns true if all the finalized shard
* placements for a given relation are accessible. Otherwise, the function
* returns false. To do so, the function first gets a list of responsive
* worker nodes and then checks if all the finalized shard placements lie
* on those worker nodes.
*/
static bool
AllFinalizedPlacementsAccessible(Oid relationId)
{
bool allPlacementsAccessible = true;
ListCell *shardCell = NULL;
List *responsiveNodeList = ResponsiveWorkerNodeList();
List *shardList = LoadShardList(relationId);
foreach(shardCell, shardList)
{
List *shardPlacementList = NIL;
ListCell *shardPlacementCell = NULL;
uint64 *shardIdPointer = (uint64 *) lfirst(shardCell);
uint64 shardId = (*shardIdPointer);
shardPlacementList = FinalizedShardPlacementList(shardId);
foreach(shardPlacementCell, shardPlacementList)
{
ListCell *responsiveNodeCell = NULL;
bool placementAccessible = false;
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
/* verify that the placement lies on one of the responsive worker nodes */
foreach(responsiveNodeCell, responsiveNodeList)
{
WorkerNode *node = (WorkerNode *) lfirst(responsiveNodeCell);
if (strncmp(node->workerName, placement->nodeName, WORKER_LENGTH) == 0 &&
node->workerPort == placement->nodePort)
{
placementAccessible = true;
break;
}
}
if (!placementAccessible)
{
allPlacementsAccessible = false;
break;
}
}
if (!allPlacementsAccessible)
{
break;
}
}
return allPlacementsAccessible;
}
/*
* Before acquiring a table lock, check whether we have sufficient rights.
* In the case of DROP INDEX, also try to lock the table before the index.