mirror of https://github.com/citusdata/citus.git
Address PR feedback from Metin
parent
9bb579b1b1
commit
2fb3eae5e2
|
@ -237,6 +237,7 @@ typedef struct ShardConnections
|
|||
|
||||
/* Local functions forward declarations */
|
||||
static HTAB * CreateShardConnectionHash(void);
|
||||
static int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
|
||||
static bool IsUniformHashDistribution(ShardInterval **shardIntervalArray,
|
||||
int shardCount);
|
||||
static FmgrInfo * ShardIntervalCompareFunction(Var *partitionColumn, char
|
||||
|
@ -320,8 +321,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
if (partitionMethod != DISTRIBUTE_BY_RANGE && partitionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg(
|
||||
"COPY is only supported for hash- and range-partitioned tables")));
|
||||
errmsg("COPY is only supported for hash- and "
|
||||
"range-partitioned tables")));
|
||||
}
|
||||
|
||||
/* resolve hash function for partition column */
|
||||
|
@ -360,12 +361,11 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/* create a mapping of shard id to a connection for each of its placements */
|
||||
shardConnectionHash = CreateShardConnectionHash();
|
||||
|
||||
/* lock shards in order of shard id to prevent deadlock */
|
||||
shardIntervalList = SortList(shardIntervalList, CompareTasksByShardId);
|
||||
shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
|
@ -569,6 +569,34 @@ CreateShardConnectionHash(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CompareShardIntervalsById is a comparison function for sort shard
|
||||
* intervals by their shard ID.
|
||||
*/
|
||||
static int
|
||||
CompareShardIntervalsById(const void *leftElement, const void *rightElement)
|
||||
{
|
||||
ShardInterval *leftInterval = *((ShardInterval **) leftElement);
|
||||
ShardInterval *rightInterval = *((ShardInterval **) rightElement);
|
||||
int64 leftShardId = leftInterval->shardId;
|
||||
int64 rightShardId = rightInterval->shardId;
|
||||
|
||||
/* we compare 64-bit integers, instead of casting their difference to int */
|
||||
if (leftShardId > rightShardId)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else if (leftShardId < rightShardId)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardIntervalCompareFunction returns the appropriate compare function for the
|
||||
* partition column type. In case of hash-partitioning, it always returns the compare
|
||||
|
@ -595,7 +623,7 @@ ShardIntervalCompareFunction(Var *partitionColumn, char partitionMethod)
|
|||
|
||||
/*
|
||||
* IsUniformHashDistribution determines whether the given list of sorted shards
|
||||
* hash a uniform hash distribution, as produced by master_create_worker_shards.
|
||||
* has a uniform hash distribution, as produced by master_create_worker_shards.
|
||||
*/
|
||||
static bool
|
||||
IsUniformHashDistribution(ShardInterval **shardIntervalArray, int shardCount)
|
||||
|
@ -619,8 +647,6 @@ IsUniformHashDistribution(ShardInterval **shardIntervalArray, int shardCount)
|
|||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
shardIndex += 1;
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -681,26 +707,31 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter
|
|||
while (lowerBoundIndex < upperBoundIndex)
|
||||
{
|
||||
int middleIndex = (lowerBoundIndex + upperBoundIndex) >> 1;
|
||||
if (DatumGetInt32(FunctionCall2Coll(compareFunction,
|
||||
DEFAULT_COLLATION_OID,
|
||||
partitionColumnValue,
|
||||
shardIntervalCache[middleIndex]->minValue)) <
|
||||
0)
|
||||
int maxValueComparison = 0;
|
||||
int minValueComparison = 0;
|
||||
|
||||
minValueComparison = FunctionCall2Coll(compareFunction,
|
||||
DEFAULT_COLLATION_OID,
|
||||
partitionColumnValue,
|
||||
shardIntervalCache[middleIndex]->minValue);
|
||||
|
||||
if (DatumGetInt32(minValueComparison) < 0)
|
||||
{
|
||||
upperBoundIndex = middleIndex;
|
||||
continue;
|
||||
}
|
||||
else if (DatumGetInt32(FunctionCall2Coll(compareFunction,
|
||||
DEFAULT_COLLATION_OID,
|
||||
partitionColumnValue,
|
||||
shardIntervalCache[middleIndex]->maxValue))
|
||||
<= 0)
|
||||
|
||||
maxValueComparison = FunctionCall2Coll(compareFunction,
|
||||
DEFAULT_COLLATION_OID,
|
||||
partitionColumnValue,
|
||||
shardIntervalCache[middleIndex]->maxValue);
|
||||
|
||||
if (DatumGetInt32(maxValueComparison) <= 0)
|
||||
{
|
||||
return shardIntervalCache[middleIndex];
|
||||
}
|
||||
else
|
||||
{
|
||||
lowerBoundIndex = middleIndex + 1;
|
||||
}
|
||||
|
||||
lowerBoundIndex = middleIndex + 1;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -708,9 +739,9 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter
|
|||
|
||||
|
||||
/*
|
||||
* Open connections for each placement of a shard. If a connection cannot be opened,
|
||||
* the shard placement is marked as inactive and the COPY continues with the
|
||||
* remaining shard placements.
|
||||
* OpenShardConnections opens a connection for each placement of a shard. If a
|
||||
* connection cannot be opened, the shard placement is marked as inactive and
|
||||
* the COPY continues with the remaining shard placements.
|
||||
*/
|
||||
static void
|
||||
OpenShardConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
|
||||
|
@ -921,7 +952,7 @@ CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections)
|
|||
|
||||
|
||||
/*
|
||||
* ConnectionList flatten the connection hash to a list of placement connections.
|
||||
* ConnectionList flattens the connection hash to a list of placement connections.
|
||||
*/
|
||||
static List *
|
||||
ConnectionList(HTAB *connectionHash)
|
||||
|
|
|
@ -54,11 +54,14 @@ PrepareTransactions(List *connectionList)
|
|||
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||
|
||||
ReportRemoteError(connection, result);
|
||||
PQclear(result);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("Failed to prepare transaction")));
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_PREPARED;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue