From 2fb3eae5e2bb9fe7f27d2f37e8a31eb105e0fb5e Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 3 Mar 2016 14:10:51 +0100 Subject: [PATCH] Address PR feedback from Metin --- src/backend/distributed/commands/multi_copy.c | 81 +++++++++++++------ .../distributed/utils/multi_transaction.c | 3 + 2 files changed, 59 insertions(+), 25 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 54d6baf71..54ac24948 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -237,6 +237,7 @@ typedef struct ShardConnections /* Local functions forward declarations */ static HTAB * CreateShardConnectionHash(void); +static int CompareShardIntervalsById(const void *leftElement, const void *rightElement); static bool IsUniformHashDistribution(ShardInterval **shardIntervalArray, int shardCount); static FmgrInfo * ShardIntervalCompareFunction(Var *partitionColumn, char @@ -320,8 +321,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) if (partitionMethod != DISTRIBUTE_BY_RANGE && partitionMethod != DISTRIBUTE_BY_HASH) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg( - "COPY is only supported for hash- and range-partitioned tables"))); + errmsg("COPY is only supported for hash- and " + "range-partitioned tables"))); } /* resolve hash function for partition column */ @@ -360,12 +361,11 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) } } - /* create a mapping of shard id to a connection for each of its placements */ shardConnectionHash = CreateShardConnectionHash(); /* lock shards in order of shard id to prevent deadlock */ - shardIntervalList = SortList(shardIntervalList, CompareTasksByShardId); + shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById); foreach(shardIntervalCell, shardIntervalList) { @@ -569,6 +569,34 @@ CreateShardConnectionHash(void) } +/* + * CompareShardIntervalsById is a comparison function for sort shard + * intervals by their shard ID. + */ +static int +CompareShardIntervalsById(const void *leftElement, const void *rightElement) +{ + ShardInterval *leftInterval = *((ShardInterval **) leftElement); + ShardInterval *rightInterval = *((ShardInterval **) rightElement); + int64 leftShardId = leftInterval->shardId; + int64 rightShardId = rightInterval->shardId; + + /* we compare 64-bit integers, instead of casting their difference to int */ + if (leftShardId > rightShardId) + { + return 1; + } + else if (leftShardId < rightShardId) + { + return -1; + } + else + { + return 0; + } +} + + /* * ShardIntervalCompareFunction returns the appropriate compare function for the * partition column type. In case of hash-partitioning, it always returns the compare @@ -595,7 +623,7 @@ ShardIntervalCompareFunction(Var *partitionColumn, char partitionMethod) /* * IsUniformHashDistribution determines whether the given list of sorted shards - * hash a uniform hash distribution, as produced by master_create_worker_shards. + * has a uniform hash distribution, as produced by master_create_worker_shards. */ static bool IsUniformHashDistribution(ShardInterval **shardIntervalArray, int shardCount) @@ -619,8 +647,6 @@ IsUniformHashDistribution(ShardInterval **shardIntervalArray, int shardCount) { return false; } - - shardIndex += 1; } return true; @@ -681,26 +707,31 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter while (lowerBoundIndex < upperBoundIndex) { int middleIndex = (lowerBoundIndex + upperBoundIndex) >> 1; - if (DatumGetInt32(FunctionCall2Coll(compareFunction, - DEFAULT_COLLATION_OID, - partitionColumnValue, - shardIntervalCache[middleIndex]->minValue)) < - 0) + int maxValueComparison = 0; + int minValueComparison = 0; + + minValueComparison = FunctionCall2Coll(compareFunction, + DEFAULT_COLLATION_OID, + partitionColumnValue, + shardIntervalCache[middleIndex]->minValue); + + if (DatumGetInt32(minValueComparison) < 0) { upperBoundIndex = middleIndex; + continue; } - else if (DatumGetInt32(FunctionCall2Coll(compareFunction, - DEFAULT_COLLATION_OID, - partitionColumnValue, - shardIntervalCache[middleIndex]->maxValue)) - <= 0) + + maxValueComparison = FunctionCall2Coll(compareFunction, + DEFAULT_COLLATION_OID, + partitionColumnValue, + shardIntervalCache[middleIndex]->maxValue); + + if (DatumGetInt32(maxValueComparison) <= 0) { return shardIntervalCache[middleIndex]; } - else - { - lowerBoundIndex = middleIndex + 1; - } + + lowerBoundIndex = middleIndex + 1; } return NULL; @@ -708,9 +739,9 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter /* - * Open connections for each placement of a shard. If a connection cannot be opened, - * the shard placement is marked as inactive and the COPY continues with the - * remaining shard placements. + * OpenShardConnections opens a connection for each placement of a shard. If a + * connection cannot be opened, the shard placement is marked as inactive and + * the COPY continues with the remaining shard placements. */ static void OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, @@ -921,7 +952,7 @@ CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections) /* - * ConnectionList flatten the connection hash to a list of placement connections. + * ConnectionList flattens the connection hash to a list of placement connections. */ static List * ConnectionList(HTAB *connectionHash) diff --git a/src/backend/distributed/utils/multi_transaction.c b/src/backend/distributed/utils/multi_transaction.c index 58c3ed15c..667542ada 100644 --- a/src/backend/distributed/utils/multi_transaction.c +++ b/src/backend/distributed/utils/multi_transaction.c @@ -54,11 +54,14 @@ PrepareTransactions(List *connectionList) transactionConnection->transactionState = TRANSACTION_STATE_CLOSED; ReportRemoteError(connection, result); + PQclear(result); ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("Failed to prepare transaction"))); } + PQclear(result); + transactionConnection->transactionState = TRANSACTION_STATE_PREPARED; } }