From 40f8149320769abed351b246a996625ca7b295b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eren=20Ba=C5=9Fak?= Date: Thu, 28 Jul 2016 16:16:21 +0300 Subject: [PATCH 1/2] Allow Cancellation During Distributed DDL Commands This change allows users to interrupt long running DDL commands. Interrupt requests are handled after each DDL command being propagated to a shard placement, which means that generally the cancel request will be processed right after the execution of the DDL is finished in the current placement. --- src/backend/distributed/executor/multi_utility.c | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 94ba64ac0..43d570df3 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -676,7 +676,6 @@ static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, const char *alterObjectSchemaCommand, bool isTopLevel) { - Oid relationId = InvalidOid; bool noWait = false; @@ -1071,9 +1070,6 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, "to execute DDL commands on distributed tables."))); } - /* make sure we don't process cancel signals */ - HOLD_INTERRUPTS(); - executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString); /* if command could not be executed on any finalized shard placement, error out */ @@ -1081,14 +1077,6 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, { ereport(ERROR, (errmsg("could not execute DDL command on worker node shards"))); } - - if (QueryCancelPending) - { - ereport(WARNING, (errmsg("cancel requests are ignored during DDL commands"))); - QueryCancelPending = false; - } - - RESUME_INTERRUPTS(); } @@ -1226,6 +1214,8 @@ ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId, PQclear(result); transactionConnection->transactionState = TRANSACTION_STATE_OPEN; + + CHECK_FOR_INTERRUPTS(); } } From 8a590c9e5bffc31b8185a28cea11afd9c244e22e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eren=20Ba=C5=9Fak?= Date: Thu, 28 Jul 2016 16:35:53 +0300 Subject: [PATCH 2/2] Remove AllFinalizedlacementsAccessible Function This change removes AllFinalizedPlacementsAccessible function since, we open connections to all shard placements before any command is sent so we immediately error out if a shard placement is not accessible. --- .../distributed/executor/multi_utility.c | 68 ------------------- 1 file changed, 68 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 43d570df3..97b6dca1e 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -92,7 +92,6 @@ static void SetLocalCommitProtocolTo2PC(void); static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString); static void ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId, ShardConnections *shardConnections); -static bool AllFinalizedPlacementsAccessible(Oid relationId); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); static void CheckCopyPermissions(CopyStmt *copyStatement); @@ -1057,19 +1056,10 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, bool isTopLevel) { bool executionOK = false; - bool allPlacementsAccessible = false; PreventTransactionChain(isTopLevel, "distributed DDL commands"); SetLocalCommitProtocolTo2PC(); - allPlacementsAccessible = AllFinalizedPlacementsAccessible(relationId); - if (!allPlacementsAccessible) - { - ereport(ERROR, (errmsg("cannot execute command: %s", ddlCommandString), - errdetail("All finalized shard placements need to be accessible " - "to execute DDL commands on distributed tables."))); - } - executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString); /* if command could not be executed on any finalized shard placement, error out */ @@ -1220,64 +1210,6 @@ ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId, } -/* - * AllFinalizedPlacementsAccessible returns true if all the finalized shard - * placements for a given relation are accessible. Otherwise, the function - * returns false. To do so, the function first gets a list of responsive - * worker nodes and then checks if all the finalized shard placements lie - * on those worker nodes. - */ -static bool -AllFinalizedPlacementsAccessible(Oid relationId) -{ - bool allPlacementsAccessible = true; - ListCell *shardCell = NULL; - List *responsiveNodeList = ResponsiveWorkerNodeList(); - - List *shardList = LoadShardList(relationId); - foreach(shardCell, shardList) - { - List *shardPlacementList = NIL; - ListCell *shardPlacementCell = NULL; - uint64 *shardIdPointer = (uint64 *) lfirst(shardCell); - uint64 shardId = (*shardIdPointer); - - shardPlacementList = FinalizedShardPlacementList(shardId); - foreach(shardPlacementCell, shardPlacementList) - { - ListCell *responsiveNodeCell = NULL; - bool placementAccessible = false; - ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); - - /* verify that the placement lies on one of the responsive worker nodes */ - foreach(responsiveNodeCell, responsiveNodeList) - { - WorkerNode *node = (WorkerNode *) lfirst(responsiveNodeCell); - if (strncmp(node->workerName, placement->nodeName, WORKER_LENGTH) == 0 && - node->workerPort == placement->nodePort) - { - placementAccessible = true; - break; - } - } - - if (!placementAccessible) - { - allPlacementsAccessible = false; - break; - } - } - - if (!allPlacementsAccessible) - { - break; - } - } - - return allPlacementsAccessible; -} - - /* * Before acquiring a table lock, check whether we have sufficient rights. * In the case of DROP INDEX, also try to lock the table before the index.