mirror of https://github.com/citusdata/citus.git
Refactor SendQueryToPlacements api
parent
6317bbe9a8
commit
902e68c9ef
|
@ -59,8 +59,7 @@ static void LockShardsForModify(List *shardIntervalList);
|
||||||
static bool HasReplication(List *shardIntervalList);
|
static bool HasReplication(List *shardIntervalList);
|
||||||
static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId);
|
static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId);
|
||||||
static int SendQueryToPlacements(char *shardQueryString,
|
static int SendQueryToPlacements(char *shardQueryString,
|
||||||
ShardConnections *shardConnections,
|
ShardConnections *shardConnections);
|
||||||
bool returnTupleCount);
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
|
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
|
||||||
|
|
||||||
|
@ -244,17 +243,9 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
|
||||||
int affectedTupleCount = 0;
|
int affectedTupleCount = 0;
|
||||||
char *relationOwner = TableOwner(relationId);
|
char *relationOwner = TableOwner(relationId);
|
||||||
ListCell *shardIntervalCell = NULL;
|
ListCell *shardIntervalCell = NULL;
|
||||||
bool truncateCommand = false;
|
|
||||||
bool requestTupleCount = true;
|
|
||||||
|
|
||||||
OpenTransactionsToAllShardPlacements(shardIntervalList, relationOwner);
|
OpenTransactionsToAllShardPlacements(shardIntervalList, relationOwner);
|
||||||
|
|
||||||
if (query->commandType == CMD_UTILITY && IsA(query->utilityStmt, TruncateStmt))
|
|
||||||
{
|
|
||||||
truncateCommand = true;
|
|
||||||
requestTupleCount = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach(shardIntervalCell, shardIntervalList)
|
foreach(shardIntervalCell, shardIntervalList)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(
|
ShardInterval *shardInterval = (ShardInterval *) lfirst(
|
||||||
|
@ -274,8 +265,7 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
|
||||||
|
|
||||||
shardQueryStringData = shardQueryString->data;
|
shardQueryStringData = shardQueryString->data;
|
||||||
shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData,
|
shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData,
|
||||||
shardConnections,
|
shardConnections);
|
||||||
requestTupleCount);
|
|
||||||
affectedTupleCount += shardAffectedTupleCount;
|
affectedTupleCount += shardAffectedTupleCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,8 +282,7 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
|
||||||
* should be called after all queries have been sent successfully.
|
* should be called after all queries have been sent successfully.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections,
|
SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections)
|
||||||
bool returnTupleCount)
|
|
||||||
{
|
{
|
||||||
uint64 shardId = shardConnections->shardId;
|
uint64 shardId = shardConnections->shardId;
|
||||||
List *connectionList = shardConnections->connectionList;
|
List *connectionList = shardConnections->connectionList;
|
||||||
|
@ -302,11 +291,6 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
|
||||||
|
|
||||||
Assert(connectionList != NIL);
|
Assert(connectionList != NIL);
|
||||||
|
|
||||||
if (!returnTupleCount)
|
|
||||||
{
|
|
||||||
shardAffectedTupleCount = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach(connectionCell, connectionList)
|
foreach(connectionCell, connectionList)
|
||||||
{
|
{
|
||||||
TransactionConnection *transactionConnection =
|
TransactionConnection *transactionConnection =
|
||||||
|
@ -328,10 +312,16 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
|
||||||
|
|
||||||
placementAffectedTupleString = PQcmdTuples(result);
|
placementAffectedTupleString = PQcmdTuples(result);
|
||||||
|
|
||||||
if (returnTupleCount)
|
/* returned tuple count is empty for utility commands, use 0 as affected count */
|
||||||
|
if (*placementAffectedTupleString == '\0')
|
||||||
|
{
|
||||||
|
placementAffectedTupleCount = 0;
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
placementAffectedTupleCount = pg_atoi(placementAffectedTupleString,
|
placementAffectedTupleCount = pg_atoi(placementAffectedTupleString,
|
||||||
sizeof(int32), 0);
|
sizeof(int32), 0);
|
||||||
|
}
|
||||||
|
|
||||||
if ((shardAffectedTupleCount == -1) ||
|
if ((shardAffectedTupleCount == -1) ||
|
||||||
(shardAffectedTupleCount == placementAffectedTupleCount))
|
(shardAffectedTupleCount == placementAffectedTupleCount))
|
||||||
|
@ -346,7 +336,6 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
|
||||||
errdetail("Affected tuple counts at placements of shard "
|
errdetail("Affected tuple counts at placements of shard "
|
||||||
UINT64_FORMAT " are different.", shardId)));
|
UINT64_FORMAT " are different.", shardId)));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue