mirror of https://github.com/citusdata/citus.git
Merge pull request #855 from citusdata/feature/parallel_modify
Parallelise master_modify_multiple_shards and other thingspull/869/merge
commit
98e0648d40
|
@ -433,7 +433,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
}
|
||||
|
||||
/* prevent concurrent placement changes and non-commutative DML statements */
|
||||
LockShards(shardIntervalList, ShareLock);
|
||||
LockShardListMetadata(shardIntervalList, ShareLock);
|
||||
LockShardListResources(shardIntervalList, ShareLock);
|
||||
|
||||
/* initialize the shard interval cache */
|
||||
shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "catalog/pg_type.h"
|
||||
#include "distributed/citus_clauses.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/commit_protocol.h"
|
||||
#include "distributed/connection_cache.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
@ -35,6 +36,7 @@
|
|||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/multi_planner.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/multi_shard_transaction.h"
|
||||
#include "distributed/relay_utility.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "executor/execdesc.h"
|
||||
|
@ -64,6 +66,7 @@
|
|||
/* controls use of locks to enforce safe commutativity */
|
||||
bool AllModificationsCommutative = false;
|
||||
|
||||
|
||||
/*
|
||||
* The following static variables are necessary to track the progression of
|
||||
* multi-statement transactions managed by the router executor. After the first
|
||||
|
@ -84,15 +87,17 @@ static bool subXactAbortAttempted = false;
|
|||
|
||||
/* functions needed during start phase */
|
||||
static void InitTransactionStateForTask(Task *task);
|
||||
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery);
|
||||
static void AcquireExecutorShardLock(Task *task, LOCKMODE lockMode);
|
||||
static HTAB * CreateXactParticipantHash(void);
|
||||
|
||||
/* functions needed during run phase */
|
||||
static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc,
|
||||
Task *task,
|
||||
bool isModificationQuery,
|
||||
bool expectResults);
|
||||
static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
|
||||
bool isModificationQuery, bool expectResults);
|
||||
static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList,
|
||||
bool isModificationQuery, bool expectResults);
|
||||
static List * TaskShardIntervalList(List *taskList);
|
||||
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
|
||||
static void AcquireExecutorMultiShardLocks(List *shardIntervalList);
|
||||
static bool IsReplicated(List *shardIntervalList);
|
||||
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
|
||||
DestReceiver *destination,
|
||||
Tuplestorestate *tupleStore);
|
||||
|
@ -106,8 +111,8 @@ static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
|
|||
static bool SendQueryInSingleRowMode(PGconn *connection, char *query,
|
||||
ParamListInfo paramListInfo);
|
||||
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||
TupleDesc tupleDescriptor, int64 *rows);
|
||||
static bool ConsumeQueryResult(PGconn *connection, int64 *rows);
|
||||
TupleDesc tupleDescriptor, bool failOnError, int64 *rows);
|
||||
static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows);
|
||||
static void RecordShardIdParticipant(uint64 affectedShardId,
|
||||
NodeConnectionEntry *participantEntry);
|
||||
|
||||
|
@ -126,7 +131,6 @@ static void MarkRemainingInactivePlacements(void);
|
|||
void
|
||||
RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
|
||||
{
|
||||
LOCKMODE lockMode = NoLock;
|
||||
EState *executorState = NULL;
|
||||
CmdType commandType = queryDesc->operation;
|
||||
|
||||
|
@ -137,25 +141,6 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
|
|||
if (commandType != CMD_SELECT)
|
||||
{
|
||||
eflags |= EXEC_FLAG_SKIP_TRIGGERS;
|
||||
|
||||
if (XactModificationLevel == XACT_MODIFICATION_SCHEMA)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("distributed data modifications must not appear in "
|
||||
"transaction blocks which contain distributed DDL "
|
||||
"commands")));
|
||||
}
|
||||
|
||||
/*
|
||||
* We could naturally handle function-based transactions (i.e. those
|
||||
* using PL/pgSQL or similar) by checking the type of queryDesc->dest,
|
||||
* but some customers already use functions that touch multiple shards
|
||||
* from within a function, so we'll ignore functions for now.
|
||||
*/
|
||||
if (IsTransactionBlock() && xactParticipantHash == NULL)
|
||||
{
|
||||
InitTransactionStateForTask(task);
|
||||
}
|
||||
}
|
||||
|
||||
/* signal that it is a router execution */
|
||||
|
@ -174,13 +159,6 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
|
|||
* work.
|
||||
*/
|
||||
queryDesc->planstate = (PlanState *) makeNode(MaterialState);
|
||||
|
||||
lockMode = CommutativityRuleToLockMode(commandType, task->upsertQuery);
|
||||
|
||||
if (lockMode != NoLock)
|
||||
{
|
||||
AcquireExecutorShardLock(task, lockMode);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -241,71 +219,172 @@ InitTransactionStateForTask(Task *task)
|
|||
|
||||
|
||||
/*
|
||||
* CommutativityRuleToLockMode determines the commutativity rule for the given
|
||||
* command and returns the appropriate lock mode to enforce that rule. The
|
||||
* function assumes a SELECT doesn't modify state and therefore is commutative
|
||||
* with all other commands. The function also assumes that an INSERT commutes
|
||||
* with another INSERT, but not with an UPDATE/DELETE/UPSERT; and an
|
||||
* UPDATE/DELETE/UPSERT doesn't commute with an INSERT, UPDATE, DELETE or UPSERT.
|
||||
* AcquireExecutorShardLock acquires a lock on the shard for the given task and
|
||||
* command type if necessary to avoid divergence between multiple replicas of
|
||||
* the same shard. No lock is obtained when there is only one replica.
|
||||
*
|
||||
* Note that the above comment defines INSERT INTO ... ON CONFLICT type of queries
|
||||
* as an UPSERT. Since UPSERT is not defined as a separate command type in postgres,
|
||||
* we have to pass it as a second parameter to the function.
|
||||
* The function determines the appropriate lock mode based on the commutativity
|
||||
* rule of the command. In each case, it uses a lock mode that enforces the
|
||||
* commutativity rule.
|
||||
*
|
||||
* The above mapping is overridden entirely when all_modifications_commutative
|
||||
* is set to true. In that case, all commands just claim a shared lock. This
|
||||
* allows the shard repair logic to lock out modifications while permitting all
|
||||
* commands to otherwise commute.
|
||||
* The mapping is overridden when all_modifications_commutative is set to true.
|
||||
* In that case, all modifications are treated as commutative, which can be used
|
||||
* to communicate that the application is only generating commutative
|
||||
* UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary.
|
||||
*/
|
||||
static LOCKMODE
|
||||
CommutativityRuleToLockMode(CmdType commandType, bool upsertQuery)
|
||||
static void
|
||||
AcquireExecutorShardLock(Task *task, CmdType commandType)
|
||||
{
|
||||
LOCKMODE lockMode = NoLock;
|
||||
int64 shardId = task->anchorShardId;
|
||||
|
||||
/* bypass commutativity checks when flag enabled */
|
||||
if (AllModificationsCommutative)
|
||||
if (commandType == CMD_SELECT || list_length(task->taskPlacementList) == 1)
|
||||
{
|
||||
return ShareLock;
|
||||
}
|
||||
/*
|
||||
* The executor shard lock is used to maintain consistency between
|
||||
* replicas and therefore no lock is required for read-only queries
|
||||
* or in general when there is only one replica.
|
||||
*/
|
||||
|
||||
if (commandType == CMD_SELECT)
|
||||
{
|
||||
lockMode = NoLock;
|
||||
}
|
||||
else if (upsertQuery)
|
||||
else if (AllModificationsCommutative)
|
||||
{
|
||||
/*
|
||||
* Bypass commutativity checks when citus.all_modifications_commutative
|
||||
* is enabled.
|
||||
*
|
||||
* A RowExclusiveLock does not conflict with itself and therefore allows
|
||||
* multiple commutative commands to proceed concurrently. It does
|
||||
* conflict with ExclusiveLock, which may still be obtained by another
|
||||
* session that executes an UPDATE/DELETE/UPSERT command with
|
||||
* citus.all_modifications_commutative disabled.
|
||||
*/
|
||||
|
||||
lockMode = RowExclusiveLock;
|
||||
}
|
||||
else if (task->upsertQuery || commandType == CMD_UPDATE || commandType == CMD_DELETE)
|
||||
{
|
||||
/*
|
||||
* UPDATE/DELETE/UPSERT commands do not commute with other modifications
|
||||
* since the rows modified by one command may be affected by the outcome
|
||||
* of another command.
|
||||
*
|
||||
* We need to handle upsert before INSERT, because PostgreSQL models
|
||||
* upsert commands as INSERT with an ON CONFLICT section.
|
||||
*
|
||||
* ExclusiveLock conflicts with all lock types used by modifications
|
||||
* and therefore prevents other modifications from running
|
||||
* concurrently.
|
||||
*/
|
||||
|
||||
lockMode = ExclusiveLock;
|
||||
}
|
||||
else if (commandType == CMD_INSERT)
|
||||
{
|
||||
lockMode = ShareLock;
|
||||
}
|
||||
else if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
|
||||
{
|
||||
lockMode = ExclusiveLock;
|
||||
/*
|
||||
* An INSERT commutes with other INSERT commands, since performing them
|
||||
* out-of-order only affects the table order on disk, but not the
|
||||
* contents.
|
||||
*
|
||||
* When a unique constraint exists, INSERTs are not strictly commutative,
|
||||
* but whichever INSERT comes last will error out and thus has no effect.
|
||||
* INSERT is not commutative with UPDATE/DELETE/UPSERT, since the
|
||||
* UPDATE/DELETE/UPSERT may consider the INSERT, depending on execution
|
||||
* order.
|
||||
*
|
||||
* A RowExclusiveLock does not conflict with itself and therefore allows
|
||||
* multiple INSERT commands to proceed concurrently. It conflicts with
|
||||
* ExclusiveLock obtained by UPDATE/DELETE/UPSERT, ensuring those do
|
||||
* not run concurrently with INSERT.
|
||||
*/
|
||||
|
||||
lockMode = RowExclusiveLock;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errmsg("unrecognized operation code: %d", (int) commandType)));
|
||||
}
|
||||
|
||||
return lockMode;
|
||||
if (shardId != INVALID_SHARD_ID && lockMode != NoLock)
|
||||
{
|
||||
LockShardResource(shardId, lockMode);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AcquireExecutorShardLock: acquire shard lock needed for execution of
|
||||
* a single task within a distributed plan.
|
||||
* AcquireExecutorMultiShardLocks acquires shard locks need for execution
|
||||
* of writes on multiple shards.
|
||||
*
|
||||
* 1. If citus.all_modifications_commutative is set to true, then all locks
|
||||
* are acquired as ShareUpdateExclusiveLock.
|
||||
* 2. If citus.all_modifications_commutative is false, then only the shards
|
||||
* with 2 or more replicas are locked with ExclusiveLock. Otherwise, the
|
||||
* lock is acquired with ShareUpdateExclusiveLock.
|
||||
*
|
||||
* ShareUpdateExclusiveLock conflicts with itself such that only one
|
||||
* multi-shard modification at a time is allowed on a shard. It also conflicts
|
||||
* with ExclusiveLock, which ensures that updates/deletes/upserts are applied
|
||||
* in the same order on all placements. It does not conflict with ShareLock,
|
||||
* which is normally obtained by single-shard commutative writes.
|
||||
*/
|
||||
static void
|
||||
AcquireExecutorShardLock(Task *task, LOCKMODE lockMode)
|
||||
AcquireExecutorMultiShardLocks(List *shardIntervalList)
|
||||
{
|
||||
int64 shardId = task->anchorShardId;
|
||||
LOCKMODE lockMode = NoLock;
|
||||
|
||||
if (shardId != INVALID_SHARD_ID)
|
||||
if (AllModificationsCommutative || !IsReplicated(shardIntervalList))
|
||||
{
|
||||
LockShardResource(shardId, lockMode);
|
||||
/*
|
||||
* When all writes are commutative then we only need to prevent multi-shard
|
||||
* commands from running concurrently with each other and with commands
|
||||
* that are explicitly non-commutative. When there is not replication then
|
||||
* we only need to prevent concurrent multi-shard commands.
|
||||
*
|
||||
* In either case, ShareUpdateExclusive has the desired effect, since
|
||||
* it conflicts with itself and ExclusiveLock (taken by non-commutative
|
||||
* writes).
|
||||
*/
|
||||
|
||||
lockMode = ShareUpdateExclusiveLock;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* When there is replication, prevent all concurrent writes to the same
|
||||
* shards to ensure the writes are ordered.
|
||||
*/
|
||||
lockMode = ExclusiveLock;
|
||||
}
|
||||
|
||||
LockShardListResources(shardIntervalList, lockMode);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsReplicated checks whether any of the shards in the given list has more
|
||||
* than one replica.
|
||||
*/
|
||||
static bool
|
||||
IsReplicated(List *shardIntervalList)
|
||||
{
|
||||
ListCell *shardIntervalCell;
|
||||
bool hasReplication = false;
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
List *shardPlacementList = FinalizedShardPlacementList(shardId);
|
||||
if (shardPlacementList->length > 1)
|
||||
{
|
||||
hasReplication = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return hasReplication;
|
||||
}
|
||||
|
||||
|
||||
|
@ -344,8 +423,8 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
|
|||
{
|
||||
PlannedStmt *planStatement = queryDesc->plannedstmt;
|
||||
MultiPlan *multiPlan = GetMultiPlan(planStatement);
|
||||
List *taskList = multiPlan->workerJob->taskList;
|
||||
Task *task = NULL;
|
||||
Job *workerJob = multiPlan->workerJob;
|
||||
List *taskList = workerJob->taskList;
|
||||
EState *estate = queryDesc->estate;
|
||||
CmdType operation = queryDesc->operation;
|
||||
MemoryContext oldcontext = NULL;
|
||||
|
@ -353,14 +432,8 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
|
|||
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
|
||||
bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning;
|
||||
|
||||
/* router executor can only execute distributed plans with a single task */
|
||||
Assert(list_length(taskList) == 1);
|
||||
task = (Task *) linitial(taskList);
|
||||
|
||||
Assert(estate != NULL);
|
||||
Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
|
||||
Assert(task != NULL);
|
||||
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
|
||||
|
||||
|
@ -396,8 +469,8 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
|
|||
*/
|
||||
if (!routerState->eof_underlying)
|
||||
{
|
||||
bool resultsOK = false;
|
||||
bool isModificationQuery = false;
|
||||
bool requiresMasterEvaluation = workerJob->requiresMasterEvaluation;
|
||||
|
||||
if (operation == CMD_INSERT || operation == CMD_UPDATE ||
|
||||
operation == CMD_DELETE)
|
||||
|
@ -410,12 +483,47 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
|
|||
(int) operation)));
|
||||
}
|
||||
|
||||
resultsOK = ExecuteTaskAndStoreResults(queryDesc, task,
|
||||
isModificationQuery,
|
||||
sendTuples);
|
||||
if (!resultsOK)
|
||||
if (requiresMasterEvaluation)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not receive query results")));
|
||||
ListCell *taskCell = NULL;
|
||||
Query *query = workerJob->jobQuery;
|
||||
Oid relationId = ((RangeTblEntry *) linitial(query->rtable))->relid;
|
||||
|
||||
ExecuteMasterEvaluableFunctions(query);
|
||||
|
||||
foreach(taskCell, taskList)
|
||||
{
|
||||
Task *task = (Task *) lfirst(taskCell);
|
||||
StringInfo newQueryString = makeStringInfo();
|
||||
|
||||
deparse_shard_query(query, relationId, task->anchorShardId,
|
||||
newQueryString);
|
||||
|
||||
ereport(DEBUG4, (errmsg("query before master evaluation: %s",
|
||||
task->queryString)));
|
||||
ereport(DEBUG4, (errmsg("query after master evaluation: %s",
|
||||
newQueryString->data)));
|
||||
|
||||
task->queryString = newQueryString->data;
|
||||
}
|
||||
}
|
||||
|
||||
if (list_length(taskList) == 1)
|
||||
{
|
||||
Task *task = (Task *) linitial(taskList);
|
||||
bool resultsOK = false;
|
||||
|
||||
resultsOK = ExecuteSingleTask(queryDesc, task, isModificationQuery,
|
||||
sendTuples);
|
||||
if (!resultsOK)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not receive query results")));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ExecuteMultipleTasks(queryDesc, taskList, isModificationQuery,
|
||||
sendTuples);
|
||||
}
|
||||
|
||||
/* mark underlying query as having executed */
|
||||
|
@ -462,8 +570,8 @@ out:
|
|||
|
||||
|
||||
/*
|
||||
* ExecuteTaskAndStoreResults executes the task on the remote node, retrieves
|
||||
* the results and stores them, if SELECT or RETURNING is used, in a tuple
|
||||
* ExecuteSingleTask executes the task on the remote node, retrieves the
|
||||
* results and stores them, if SELECT or RETURNING is used, in a tuple
|
||||
* store.
|
||||
*
|
||||
* If the task fails on one of the placements, the function retries it on
|
||||
|
@ -472,10 +580,11 @@ out:
|
|||
* failed), or errors out (DML failed on all placements).
|
||||
*/
|
||||
static bool
|
||||
ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
||||
bool isModificationQuery,
|
||||
bool expectResults)
|
||||
ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
|
||||
bool isModificationQuery,
|
||||
bool expectResults)
|
||||
{
|
||||
CmdType operation = queryDesc->operation;
|
||||
TupleDesc tupleDescriptor = queryDesc->tupDesc;
|
||||
EState *executorState = queryDesc->estate;
|
||||
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
|
||||
|
@ -488,22 +597,28 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
|||
bool gotResults = false;
|
||||
char *queryString = task->queryString;
|
||||
|
||||
if (isModificationQuery && task->requiresMasterEvaluation)
|
||||
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
|
||||
{
|
||||
PlannedStmt *planStatement = queryDesc->plannedstmt;
|
||||
MultiPlan *multiPlan = GetMultiPlan(planStatement);
|
||||
Query *query = multiPlan->workerJob->jobQuery;
|
||||
Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid;
|
||||
StringInfo queryStringInfo = makeStringInfo();
|
||||
|
||||
ExecuteMasterEvaluableFunctions(query);
|
||||
deparse_shard_query(query, relid, task->anchorShardId, queryStringInfo);
|
||||
queryString = queryStringInfo->data;
|
||||
|
||||
elog(DEBUG4, "query before master evaluation: %s", task->queryString);
|
||||
elog(DEBUG4, "query after master evaluation: %s", queryString);
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("single-shard DML commands must not appear in "
|
||||
"transaction blocks which contain multi-shard data "
|
||||
"modifications")));
|
||||
}
|
||||
|
||||
/*
|
||||
* We could naturally handle function-based transactions (i.e. those
|
||||
* using PL/pgSQL or similar) by checking the type of queryDesc->dest,
|
||||
* but some customers already use functions that touch multiple shards
|
||||
* from within a function, so we'll ignore functions for now.
|
||||
*/
|
||||
if (operation != CMD_SELECT && xactParticipantHash == NULL && IsTransactionBlock())
|
||||
{
|
||||
InitTransactionStateForTask(task);
|
||||
}
|
||||
|
||||
/* prevent replicas of the same shard from diverging */
|
||||
AcquireExecutorShardLock(task, operation);
|
||||
|
||||
/*
|
||||
* Try to run the query to completion on one placement. If the query fails
|
||||
* attempt the query on the next placement.
|
||||
|
@ -512,6 +627,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
|||
{
|
||||
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
|
||||
bool queryOK = false;
|
||||
bool failOnError = false;
|
||||
int64 currentAffectedTupleCount = 0;
|
||||
PGconn *connection = GetConnectionForPlacement(taskPlacement,
|
||||
isModificationQuery);
|
||||
|
@ -538,11 +654,12 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
|||
if (!gotResults && expectResults)
|
||||
{
|
||||
queryOK = StoreQueryResult(routerState, connection, tupleDescriptor,
|
||||
¤tAffectedTupleCount);
|
||||
failOnError, ¤tAffectedTupleCount);
|
||||
}
|
||||
else
|
||||
{
|
||||
queryOK = ConsumeQueryResult(connection, ¤tAffectedTupleCount);
|
||||
queryOK = ConsumeQueryResult(connection, failOnError,
|
||||
¤tAffectedTupleCount);
|
||||
}
|
||||
|
||||
if (queryOK)
|
||||
|
@ -616,6 +733,233 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteMultipleTasks executes a list of tasks on remote nodes, retrieves
|
||||
* the results and, if RETURNING is used, stores them in a tuple store.
|
||||
*
|
||||
* If a task fails on one of the placements, the transaction rolls back.
|
||||
* Otherwise, the changes are committed using 2PC when the local transaction
|
||||
* commits.
|
||||
*/
|
||||
static void
|
||||
ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList,
|
||||
bool isModificationQuery, bool expectResults)
|
||||
{
|
||||
TupleDesc tupleDescriptor = queryDesc->tupDesc;
|
||||
EState *executorState = queryDesc->estate;
|
||||
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
|
||||
ParamListInfo paramListInfo = queryDesc->params;
|
||||
int64 affectedTupleCount = -1;
|
||||
|
||||
/* can only support modifications right now */
|
||||
Assert(isModificationQuery);
|
||||
|
||||
affectedTupleCount = ExecuteModifyTasks(taskList, expectResults, paramListInfo,
|
||||
routerState, tupleDescriptor);
|
||||
|
||||
executorState->es_processed = affectedTupleCount;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteModifyTasks executes a list of tasks on remote nodes, and
|
||||
* optionally retrieves the results and stores them in a tuple store.
|
||||
*
|
||||
* If a task fails on one of the placements, the transaction rolls back.
|
||||
* Otherwise, the changes are committed using 2PC when the local transaction
|
||||
* commits.
|
||||
*/
|
||||
int64
|
||||
ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo,
|
||||
MaterialState *routerState, TupleDesc tupleDescriptor)
|
||||
{
|
||||
int64 totalAffectedTupleCount = 0;
|
||||
ListCell *taskCell = NULL;
|
||||
char *userName = CurrentUserName();
|
||||
List *shardIntervalList = NIL;
|
||||
List *affectedTupleCountList = NIL;
|
||||
bool tasksPending = true;
|
||||
int placementIndex = 0;
|
||||
|
||||
if (XactModificationLevel == XACT_MODIFICATION_DATA)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("multi-shard data modifications must not appear in "
|
||||
"transaction blocks which contain single-shard DML "
|
||||
"commands")));
|
||||
}
|
||||
|
||||
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
|
||||
|
||||
shardIntervalList = TaskShardIntervalList(taskList);
|
||||
|
||||
/* ensure that there are no concurrent modifications on the same shards */
|
||||
AcquireExecutorMultiShardLocks(shardIntervalList);
|
||||
|
||||
/* open connection to all relevant placements, if not already open */
|
||||
OpenTransactionsToAllShardPlacements(shardIntervalList, userName);
|
||||
|
||||
/* iterate over placements in rounds, to ensure in-order execution */
|
||||
while (tasksPending)
|
||||
{
|
||||
int taskIndex = 0;
|
||||
|
||||
tasksPending = false;
|
||||
|
||||
/* send command to all shard placements with the current index in parallel */
|
||||
foreach(taskCell, taskList)
|
||||
{
|
||||
Task *task = (Task *) lfirst(taskCell);
|
||||
int64 shardId = task->anchorShardId;
|
||||
char *queryString = task->queryString;
|
||||
bool shardConnectionsFound = false;
|
||||
ShardConnections *shardConnections = NULL;
|
||||
List *connectionList = NIL;
|
||||
TransactionConnection *transactionConnection = NULL;
|
||||
PGconn *connection = NULL;
|
||||
bool queryOK = false;
|
||||
|
||||
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
||||
connectionList = shardConnections->connectionList;
|
||||
|
||||
if (placementIndex >= list_length(connectionList))
|
||||
{
|
||||
/* no more active placements for this task */
|
||||
continue;
|
||||
}
|
||||
|
||||
transactionConnection =
|
||||
(TransactionConnection *) list_nth(connectionList, placementIndex);
|
||||
connection = transactionConnection->connection;
|
||||
|
||||
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
|
||||
if (!queryOK)
|
||||
{
|
||||
ReraiseRemoteError(connection, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/* collects results from all relevant shard placements */
|
||||
foreach(taskCell, taskList)
|
||||
{
|
||||
Task *task = (Task *) lfirst(taskCell);
|
||||
int64 shardId = task->anchorShardId;
|
||||
bool shardConnectionsFound = false;
|
||||
ShardConnections *shardConnections = NULL;
|
||||
List *connectionList = NIL;
|
||||
TransactionConnection *transactionConnection = NULL;
|
||||
PGconn *connection = NULL;
|
||||
int64 currentAffectedTupleCount = 0;
|
||||
bool failOnError = true;
|
||||
bool queryOK PG_USED_FOR_ASSERTS_ONLY = false;
|
||||
|
||||
/* abort in case of cancellation */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
||||
connectionList = shardConnections->connectionList;
|
||||
|
||||
if (placementIndex >= list_length(connectionList))
|
||||
{
|
||||
/* no more active placements for this task */
|
||||
continue;
|
||||
}
|
||||
|
||||
transactionConnection =
|
||||
(TransactionConnection *) list_nth(connectionList, placementIndex);
|
||||
connection = transactionConnection->connection;
|
||||
|
||||
/*
|
||||
* If caller is interested, store query results the first time
|
||||
* through. The output of the query's execution on other shards is
|
||||
* discarded if we run there (because it's a modification query).
|
||||
*/
|
||||
if (placementIndex == 0 && expectResults)
|
||||
{
|
||||
Assert(routerState != NULL && tupleDescriptor != NULL);
|
||||
|
||||
queryOK = StoreQueryResult(routerState, connection, tupleDescriptor,
|
||||
failOnError, ¤tAffectedTupleCount);
|
||||
}
|
||||
else
|
||||
{
|
||||
queryOK = ConsumeQueryResult(connection, failOnError,
|
||||
¤tAffectedTupleCount);
|
||||
}
|
||||
|
||||
/* should have rolled back on error */
|
||||
Assert(queryOK);
|
||||
|
||||
if (placementIndex == 0)
|
||||
{
|
||||
totalAffectedTupleCount += currentAffectedTupleCount;
|
||||
|
||||
/* keep track of the initial affected tuple count */
|
||||
affectedTupleCountList = lappend_int(affectedTupleCountList,
|
||||
currentAffectedTupleCount);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* warn the user if shard placements have diverged */
|
||||
int64 previousAffectedTupleCount = list_nth_int(affectedTupleCountList,
|
||||
taskIndex);
|
||||
|
||||
if (currentAffectedTupleCount != previousAffectedTupleCount)
|
||||
{
|
||||
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||
|
||||
ereport(WARNING,
|
||||
(errmsg("modified "INT64_FORMAT " tuples of shard "
|
||||
UINT64_FORMAT ", but expected to modify "INT64_FORMAT,
|
||||
currentAffectedTupleCount, shardId,
|
||||
previousAffectedTupleCount),
|
||||
errdetail("modified placement on %s:%s", nodeName,
|
||||
nodePort)));
|
||||
}
|
||||
}
|
||||
|
||||
if (!tasksPending && placementIndex + 1 < list_length(connectionList))
|
||||
{
|
||||
/* more tasks to be done after thise one */
|
||||
tasksPending = true;
|
||||
}
|
||||
|
||||
taskIndex++;
|
||||
}
|
||||
|
||||
placementIndex++;
|
||||
}
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
return totalAffectedTupleCount;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TaskShardIntervalList returns a list of shard intervals for a given list of
|
||||
* tasks.
|
||||
*/
|
||||
static List *
|
||||
TaskShardIntervalList(List *taskList)
|
||||
{
|
||||
ListCell *taskCell = NULL;
|
||||
List *shardIntervalList = NIL;
|
||||
|
||||
foreach(taskCell, taskList)
|
||||
{
|
||||
Task *task = (Task *) lfirst(taskCell);
|
||||
int64 shardId = task->anchorShardId;
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
|
||||
shardIntervalList = lappend(shardIntervalList, shardInterval);
|
||||
}
|
||||
|
||||
return shardIntervalList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReturnRowsFromTuplestore moves rows from a given tuplestore into a
|
||||
* receiver. It performs the necessary limiting to support cursors.
|
||||
|
@ -903,7 +1247,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT
|
|||
*/
|
||||
static bool
|
||||
StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
||||
TupleDesc tupleDescriptor, int64 *rows)
|
||||
TupleDesc tupleDescriptor, bool failOnError, int64 *rows)
|
||||
{
|
||||
AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
|
||||
Tuplestorestate *tupleStore = NULL;
|
||||
|
@ -921,7 +1265,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
|||
{
|
||||
routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem);
|
||||
}
|
||||
else
|
||||
else if (!failOnError)
|
||||
{
|
||||
/* might have failed query execution on another placement before */
|
||||
tuplestore_clear(routerState->tuplestorestate);
|
||||
|
@ -948,7 +1292,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
|||
{
|
||||
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
|
||||
int category = 0;
|
||||
bool raiseError = false;
|
||||
bool isConstraintViolation = false;
|
||||
|
||||
/*
|
||||
* If the error code is in constraint violation class, we want to
|
||||
|
@ -956,9 +1300,9 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
|||
* placements.
|
||||
*/
|
||||
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
|
||||
raiseError = SqlStateMatchesCategory(sqlStateString, category);
|
||||
isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category);
|
||||
|
||||
if (raiseError)
|
||||
if (isConstraintViolation || failOnError)
|
||||
{
|
||||
RemoveXactConnection(connection);
|
||||
ReraiseRemoteError(connection, result);
|
||||
|
@ -1030,7 +1374,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
|
|||
* has been an error.
|
||||
*/
|
||||
static bool
|
||||
ConsumeQueryResult(PGconn *connection, int64 *rows)
|
||||
ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows)
|
||||
{
|
||||
bool commandFailed = false;
|
||||
bool gotResponse = false;
|
||||
|
@ -1060,7 +1404,7 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
|
|||
{
|
||||
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
|
||||
int category = 0;
|
||||
bool raiseError = false;
|
||||
bool isConstraintViolation = false;
|
||||
|
||||
/*
|
||||
* If the error code is in constraint violation class, we want to
|
||||
|
@ -1068,9 +1412,9 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
|
|||
* placements.
|
||||
*/
|
||||
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
|
||||
raiseError = SqlStateMatchesCategory(sqlStateString, category);
|
||||
isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category);
|
||||
|
||||
if (raiseError)
|
||||
if (isConstraintViolation || failOnError)
|
||||
{
|
||||
RemoveXactConnection(connection);
|
||||
ReraiseRemoteError(connection, result);
|
||||
|
@ -1092,8 +1436,11 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
|
|||
char *currentAffectedTupleString = PQcmdTuples(result);
|
||||
int64 currentAffectedTupleCount = 0;
|
||||
|
||||
scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount);
|
||||
Assert(currentAffectedTupleCount >= 0);
|
||||
if (*currentAffectedTupleString != '\0')
|
||||
{
|
||||
scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount);
|
||||
Assert(currentAffectedTupleCount >= 0);
|
||||
}
|
||||
|
||||
#if (PG_VERSION_NUM < 90600)
|
||||
|
||||
|
|
|
@ -1292,7 +1292,8 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
|
|||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||
errmsg("distributed DDL commands must not appear within "
|
||||
"transaction blocks containing data modifications")));
|
||||
"transaction blocks containing single-shard data "
|
||||
"modifications")));
|
||||
}
|
||||
|
||||
ShowNoticeIfNotUsing2PC();
|
||||
|
@ -1305,7 +1306,7 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
|
|||
ereport(ERROR, (errmsg("could not execute DDL command on worker node shards")));
|
||||
}
|
||||
|
||||
XactModificationLevel = XACT_MODIFICATION_SCHEMA;
|
||||
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1359,7 +1360,7 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
|
|||
Oid schemaId = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
|
||||
LockShards(shardIntervalList, ShareLock);
|
||||
LockShardListResources(shardIntervalList, ShareLock);
|
||||
OpenTransactionsToAllShardPlacements(shardIntervalList, tableOwner);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
|
|
|
@ -41,6 +41,7 @@
|
|||
#include "distributed/pg_dist_shard.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "optimizer/clauses.h"
|
||||
#include "optimizer/predtest.h"
|
||||
|
@ -55,11 +56,9 @@
|
|||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
static void LockShardsForModify(List *shardIntervalList);
|
||||
static bool HasReplication(List *shardIntervalList);
|
||||
static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId);
|
||||
static int SendQueryToPlacements(char *shardQueryString,
|
||||
ShardConnections *shardConnections);
|
||||
static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList,
|
||||
Oid relationId);
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
|
||||
|
||||
|
@ -83,14 +82,12 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
|||
Query *modifyQuery = NULL;
|
||||
Node *queryTreeNode;
|
||||
List *restrictClauseList = NIL;
|
||||
bool isTopLevel = true;
|
||||
bool failOK = false;
|
||||
List *shardIntervalList = NIL;
|
||||
List *prunedShardIntervalList = NIL;
|
||||
List *taskList = NIL;
|
||||
int32 affectedTupleCount = 0;
|
||||
|
||||
PreventTransactionChain(isTopLevel, "master_modify_multiple_shards");
|
||||
|
||||
queryTreeNode = ParseTreeNode(queryString);
|
||||
if (IsA(queryTreeNode, DeleteStmt))
|
||||
{
|
||||
|
@ -163,183 +160,50 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
|||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
LockShardsForModify(prunedShardIntervalList);
|
||||
|
||||
affectedTupleCount = SendQueryToShards(modifyQuery, prunedShardIntervalList,
|
||||
relationId);
|
||||
taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList,
|
||||
relationId);
|
||||
affectedTupleCount = ExecuteModifyTasks(taskList, false, NULL, NULL, NULL);
|
||||
|
||||
PG_RETURN_INT32(affectedTupleCount);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockShardsForModify command locks the replicas of given shard. The
|
||||
* lock logic is slightly different from LockShards function. Basically,
|
||||
*
|
||||
* 1. If citus.all_modifications_commutative is set to true, then all locks
|
||||
* are acquired as ShareLock.
|
||||
* 2. If citus.all_modifications_commutative is false, then only the shards
|
||||
* with 2 or more replicas are locked with ExclusiveLock. Otherwise, the
|
||||
* lock is acquired with ShareLock.
|
||||
* ModifyMultipleShardsTaskList builds a list of tasks to execute a query on a
|
||||
* given list of shards.
|
||||
*/
|
||||
static void
|
||||
LockShardsForModify(List *shardIntervalList)
|
||||
static List *
|
||||
ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relationId)
|
||||
{
|
||||
LOCKMODE lockMode = NoLock;
|
||||
List *taskList = NIL;
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
uint64 jobId = INVALID_JOB_ID;
|
||||
int taskId = 1;
|
||||
|
||||
if (AllModificationsCommutative)
|
||||
{
|
||||
lockMode = ShareLock;
|
||||
}
|
||||
else if (!HasReplication(shardIntervalList))
|
||||
{
|
||||
lockMode = ShareLock;
|
||||
}
|
||||
else
|
||||
{
|
||||
lockMode = ExclusiveLock;
|
||||
}
|
||||
|
||||
LockShards(shardIntervalList, lockMode);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* HasReplication checks whether any of the shards in the given list has more
|
||||
* than one replica.
|
||||
*/
|
||||
static bool
|
||||
HasReplication(List *shardIntervalList)
|
||||
{
|
||||
ListCell *shardIntervalCell;
|
||||
bool hasReplication = false;
|
||||
/* lock metadata before getting placment lists */
|
||||
LockShardListMetadata(shardIntervalList, ShareLock);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
List *shardPlacementList = FinalizedShardPlacementList(shardId);
|
||||
if (shardPlacementList->length > 1)
|
||||
{
|
||||
hasReplication = true;
|
||||
}
|
||||
}
|
||||
|
||||
return hasReplication;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendQueryToShards executes the given query in all placements of the given
|
||||
* shard list and returns the total affected tuple count. The execution is done
|
||||
* in a distributed transaction and the commit protocol is decided according to
|
||||
* the value of citus.multi_shard_commit_protocol parameter. SendQueryToShards
|
||||
* does not acquire locks for the shards so it is advised to acquire locks to
|
||||
* the shards when necessary before calling SendQueryToShards.
|
||||
*/
|
||||
static int
|
||||
SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
|
||||
{
|
||||
int affectedTupleCount = 0;
|
||||
char *relationOwner = TableOwner(relationId);
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
|
||||
OpenTransactionsToAllShardPlacements(shardIntervalList, relationOwner);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(
|
||||
shardIntervalCell);
|
||||
Oid relationId = shardInterval->relationId;
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
bool shardConnectionsFound = false;
|
||||
ShardConnections *shardConnections = NULL;
|
||||
StringInfo shardQueryString = makeStringInfo();
|
||||
char *shardQueryStringData = NULL;
|
||||
int shardAffectedTupleCount = -1;
|
||||
|
||||
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
||||
Assert(shardConnectionsFound);
|
||||
Task *task = NULL;
|
||||
|
||||
deparse_shard_query(query, relationId, shardId, shardQueryString);
|
||||
shardQueryStringData = shardQueryString->data;
|
||||
shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData,
|
||||
shardConnections);
|
||||
affectedTupleCount += shardAffectedTupleCount;
|
||||
|
||||
task = CitusMakeNode(Task);
|
||||
task->jobId = jobId;
|
||||
task->taskId = taskId++;
|
||||
task->taskType = SQL_TASK;
|
||||
task->queryString = shardQueryString->data;
|
||||
task->dependedTaskList = NULL;
|
||||
task->anchorShardId = shardId;
|
||||
task->taskPlacementList = FinalizedShardPlacementList(shardId);
|
||||
|
||||
taskList = lappend(taskList, task);
|
||||
}
|
||||
|
||||
/* check for cancellation one last time before returning */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
return affectedTupleCount;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendQueryToPlacements sends the given query string to all given placement
|
||||
* connections of a shard. CommitRemoteTransactions or AbortRemoteTransactions
|
||||
* should be called after all queries have been sent successfully.
|
||||
*/
|
||||
static int
|
||||
SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections)
|
||||
{
|
||||
uint64 shardId = shardConnections->shardId;
|
||||
List *connectionList = shardConnections->connectionList;
|
||||
ListCell *connectionCell = NULL;
|
||||
int32 shardAffectedTupleCount = -1;
|
||||
|
||||
Assert(connectionList != NIL);
|
||||
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
TransactionConnection *transactionConnection =
|
||||
(TransactionConnection *) lfirst(connectionCell);
|
||||
PGconn *connection = transactionConnection->connection;
|
||||
PGresult *result = NULL;
|
||||
char *placementAffectedTupleString = NULL;
|
||||
int32 placementAffectedTupleCount = -1;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* send the query */
|
||||
result = PQexec(connection, shardQueryString);
|
||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||
{
|
||||
WarnRemoteError(connection, result);
|
||||
ereport(ERROR, (errmsg("could not send query to shard placement")));
|
||||
}
|
||||
|
||||
placementAffectedTupleString = PQcmdTuples(result);
|
||||
|
||||
/* returned tuple count is empty for utility commands, use 0 as affected count */
|
||||
if (*placementAffectedTupleString == '\0')
|
||||
{
|
||||
placementAffectedTupleCount = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
placementAffectedTupleCount = pg_atoi(placementAffectedTupleString,
|
||||
sizeof(int32), 0);
|
||||
}
|
||||
|
||||
if ((shardAffectedTupleCount == -1) ||
|
||||
(shardAffectedTupleCount == placementAffectedTupleCount))
|
||||
{
|
||||
shardAffectedTupleCount = placementAffectedTupleCount;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errmsg("modified %d tuples, but expected to modify %d",
|
||||
placementAffectedTupleCount, shardAffectedTupleCount),
|
||||
errdetail("Affected tuple counts at placements of shard "
|
||||
UINT64_FORMAT " are different.", shardId)));
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
|
||||
}
|
||||
|
||||
return shardAffectedTupleCount;
|
||||
return taskList;
|
||||
}
|
||||
|
|
|
@ -1677,6 +1677,7 @@ BuildJob(Query *jobQuery, List *dependedJobList)
|
|||
job->jobId = UniqueJobId();
|
||||
job->jobQuery = jobQuery;
|
||||
job->dependedJobList = dependedJobList;
|
||||
job->requiresMasterEvaluation = false;
|
||||
|
||||
return job;
|
||||
}
|
||||
|
|
|
@ -757,7 +757,6 @@ RouterModifyTask(Query *originalQuery, Query *query)
|
|||
StringInfo queryString = makeStringInfo();
|
||||
Task *modifyTask = NULL;
|
||||
bool upsertQuery = false;
|
||||
bool requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
|
||||
|
||||
/* grab shared metadata lock to stop concurrent placement additions */
|
||||
LockShardDistributionMetadata(shardId, ShareLock);
|
||||
|
@ -789,7 +788,6 @@ RouterModifyTask(Query *originalQuery, Query *query)
|
|||
modifyTask->anchorShardId = shardId;
|
||||
modifyTask->dependedTaskList = NIL;
|
||||
modifyTask->upsertQuery = upsertQuery;
|
||||
modifyTask->requiresMasterEvaluation = requiresMasterEvaluation;
|
||||
|
||||
return modifyTask;
|
||||
}
|
||||
|
@ -1126,7 +1124,6 @@ RouterSelectTask(Query *originalQuery, Query *query,
|
|||
task->anchorShardId = shardId;
|
||||
task->dependedTaskList = NIL;
|
||||
task->upsertQuery = upsertQuery;
|
||||
task->requiresMasterEvaluation = false;
|
||||
|
||||
*placementList = workerList;
|
||||
|
||||
|
@ -1477,6 +1474,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList)
|
|||
Job *job = NULL;
|
||||
List *taskList = NIL;
|
||||
TaskType taskType = task->taskType;
|
||||
bool requiresMasterEvaluation = RequiresMasterEvaluation(query);
|
||||
|
||||
/*
|
||||
* We send modify task to the first replica, otherwise we choose the target shard
|
||||
|
@ -1501,6 +1499,7 @@ RouterQueryJob(Query *query, Task *task, List *placementList)
|
|||
job->subqueryPushdown = false;
|
||||
job->jobQuery = query;
|
||||
job->taskList = taskList;
|
||||
job->requiresMasterEvaluation = requiresMasterEvaluation;
|
||||
|
||||
return job;
|
||||
}
|
||||
|
|
|
@ -153,7 +153,7 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
|
|||
|
||||
transactionConnection->groupId = workerNode->groupId;
|
||||
transactionConnection->connectionId = shardConnections->shardId;
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_INVALID;
|
||||
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
|
||||
transactionConnection->connection = connection;
|
||||
transactionConnection->nodeName = shardPlacement->nodeName;
|
||||
transactionConnection->nodePort = shardPlacement->nodePort;
|
||||
|
|
|
@ -389,6 +389,7 @@ OutJobFields(StringInfo str, const Job *node)
|
|||
WRITE_NODE_FIELD(taskList);
|
||||
WRITE_NODE_FIELD(dependedJobList);
|
||||
WRITE_BOOL_FIELD(subqueryPushdown);
|
||||
WRITE_BOOL_FIELD(requiresMasterEvaluation);
|
||||
}
|
||||
|
||||
|
||||
|
@ -492,7 +493,6 @@ OutTask(OUTFUNC_ARGS)
|
|||
WRITE_BOOL_FIELD(assignmentConstrained);
|
||||
WRITE_NODE_FIELD(taskExecution);
|
||||
WRITE_BOOL_FIELD(upsertQuery);
|
||||
WRITE_BOOL_FIELD(requiresMasterEvaluation);
|
||||
}
|
||||
|
||||
#if (PG_VERSION_NUM < 90600)
|
||||
|
|
|
@ -161,6 +161,7 @@ readJobInfo(Job *local_node)
|
|||
READ_NODE_FIELD(taskList);
|
||||
READ_NODE_FIELD(dependedJobList);
|
||||
READ_BOOL_FIELD(subqueryPushdown);
|
||||
READ_BOOL_FIELD(requiresMasterEvaluation);
|
||||
}
|
||||
|
||||
|
||||
|
@ -288,7 +289,6 @@ ReadTask(READFUNC_ARGS)
|
|||
READ_BOOL_FIELD(assignmentConstrained);
|
||||
READ_NODE_FIELD(taskExecution);
|
||||
READ_BOOL_FIELD(upsertQuery);
|
||||
READ_BOOL_FIELD(requiresMasterEvaluation);
|
||||
|
||||
READ_DONE();
|
||||
}
|
||||
|
|
|
@ -125,21 +125,9 @@ void
|
|||
PurgeConnection(PGconn *connection)
|
||||
{
|
||||
NodeConnectionKey nodeConnectionKey;
|
||||
PGconn *purgedConnection = NULL;
|
||||
|
||||
BuildKeyForConnection(connection, &nodeConnectionKey);
|
||||
purgedConnection = PurgeConnectionByKey(&nodeConnectionKey);
|
||||
|
||||
/*
|
||||
* It's possible the provided connection matches the host and port for
|
||||
* an entry in the hash without being precisely the same connection. In
|
||||
* that case, we will want to close the provided connection in addition
|
||||
* to the one from the hash (which was closed by PurgeConnectionByKey).
|
||||
*/
|
||||
if (purgedConnection != connection)
|
||||
{
|
||||
PQfinish(connection);
|
||||
}
|
||||
PurgeConnectionByKey(&nodeConnectionKey);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/relay_utility.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
|
@ -28,9 +29,8 @@
|
|||
|
||||
/*
|
||||
* LockShardDistributionMetadata returns after grabbing a lock for distribution
|
||||
* metadata related to the specified shard, blocking if required. ExclusiveLock
|
||||
* and ShareLock modes are supported. Any locks acquired using this method are
|
||||
* released at transaction end.
|
||||
* metadata related to the specified shard, blocking if required. Any locks
|
||||
* acquired using this method are released at transaction end.
|
||||
*/
|
||||
void
|
||||
LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode)
|
||||
|
@ -126,12 +126,11 @@ UnlockJobResource(uint64 jobId, LOCKMODE lockmode)
|
|||
|
||||
|
||||
/*
|
||||
* LockShards takes shared locks on the metadata and the data of all shards in
|
||||
* shardIntervalList. This prevents concurrent placement changes and concurrent
|
||||
* DML statements that require an exclusive lock.
|
||||
* LockShardListMetadata takes shared locks on the metadata of all shards in
|
||||
* shardIntervalList to prevents concurrent placement changes.
|
||||
*/
|
||||
void
|
||||
LockShards(List *shardIntervalList, LOCKMODE lockMode)
|
||||
LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode)
|
||||
{
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
|
||||
|
@ -143,10 +142,28 @@ LockShards(List *shardIntervalList, LOCKMODE lockMode)
|
|||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
int64 shardId = shardInterval->shardId;
|
||||
|
||||
/* prevent concurrent changes to number of placements */
|
||||
LockShardDistributionMetadata(shardId, lockMode);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockShardListResources takes locks on all shards in shardIntervalList to
|
||||
* prevent concurrent DML statements on those shards.
|
||||
*/
|
||||
void
|
||||
LockShardListResources(List *shardIntervalList, LOCKMODE lockMode)
|
||||
{
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
|
||||
/* lock shards in order of shard id to prevent deadlock */
|
||||
shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
int64 shardId = shardInterval->shardId;
|
||||
|
||||
/* prevent concurrent update/delete statements */
|
||||
LockShardResource(shardId, lockMode);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ typedef enum
|
|||
XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */
|
||||
XACT_MODIFICATION_NONE, /* no modifications have taken place */
|
||||
XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */
|
||||
XACT_MODIFICATION_SCHEMA /* schema modifications (DDL) have occurred */
|
||||
XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */
|
||||
} XactModificationType;
|
||||
|
||||
|
||||
|
|
|
@ -120,6 +120,7 @@ typedef struct Job
|
|||
List *taskList;
|
||||
List *dependedJobList;
|
||||
bool subqueryPushdown;
|
||||
bool requiresMasterEvaluation; /* only applies to modify jobs */
|
||||
} Job;
|
||||
|
||||
|
||||
|
@ -168,7 +169,6 @@ typedef struct Task
|
|||
uint64 shardId; /* only applies to shard fetch tasks */
|
||||
TaskExecution *taskExecution; /* used by task tracker executor */
|
||||
bool upsertQuery; /* only applies to modify tasks */
|
||||
bool requiresMasterEvaluation; /* only applies to modify tasks */
|
||||
} Task;
|
||||
|
||||
|
||||
|
|
|
@ -39,4 +39,8 @@ extern void RouterExecutorFinish(QueryDesc *queryDesc);
|
|||
extern void RouterExecutorEnd(QueryDesc *queryDesc);
|
||||
extern void RegisterRouterExecutorXactCallbacks(void);
|
||||
|
||||
extern int64 ExecuteModifyTasks(List *taskList, bool expectResults,
|
||||
ParamListInfo paramListInfo, MaterialState *routerState,
|
||||
TupleDesc tupleDescriptor);
|
||||
|
||||
#endif /* MULTI_ROUTER_EXECUTOR_H_ */
|
||||
|
|
|
@ -75,7 +75,10 @@ extern void UnlockShardResource(uint64 shardId, LOCKMODE lockmode);
|
|||
extern void LockJobResource(uint64 jobId, LOCKMODE lockmode);
|
||||
extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode);
|
||||
|
||||
extern void LockShards(List *shardIntervalList, LOCKMODE lockMode);
|
||||
/* Lock multiple shards for safe modification */
|
||||
extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode);
|
||||
extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode);
|
||||
|
||||
extern void LockMetadataSnapshot(LOCKMODE lockMode);
|
||||
|
||||
#endif /* RESOURCE_LOCK_H */
|
||||
|
|
|
@ -168,13 +168,13 @@ ABORT;
|
|||
BEGIN;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
|
||||
COMMIT;
|
||||
-- whether it occurs first or second
|
||||
BEGIN;
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
ERROR: distributed data modifications must not appear in transaction blocks which contain distributed DDL commands
|
||||
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
|
||||
COMMIT;
|
||||
-- but the DDL should correctly roll back
|
||||
\d labs
|
||||
|
@ -244,7 +244,7 @@ SELECT * FROM labs WHERE id = 12;
|
|||
BEGIN;
|
||||
\copy labs from stdin delimiter ','
|
||||
ALTER TABLE labs ADD COLUMN motto text;
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications
|
||||
ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
|
||||
COMMIT;
|
||||
-- the DDL fails, but copy persists
|
||||
\d labs
|
||||
|
|
|
@ -22,11 +22,27 @@ SELECT master_create_worker_shards('multi_shard_modify_test', 4, 2);
|
|||
|
||||
COPY multi_shard_modify_test (t_key, t_name, t_value) FROM STDIN WITH (FORMAT 'csv');
|
||||
-- Testing master_modify_multiple_shards
|
||||
-- Verify that master_modify_multiple_shards cannot be called in a transaction block
|
||||
-- Verify that master_modify_multiple_shards can be rolled back
|
||||
BEGIN;
|
||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13');
|
||||
ERROR: master_modify_multiple_shards cannot run inside a transaction block
|
||||
master_modify_multiple_shards
|
||||
-------------------------------
|
||||
3
|
||||
(1 row)
|
||||
|
||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 202');
|
||||
master_modify_multiple_shards
|
||||
-------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
SELECT count(*) FROM multi_shard_modify_test;
|
||||
count
|
||||
-------
|
||||
27
|
||||
(1 row)
|
||||
|
||||
-- Check that master_modify_multiple_shards cannot be called with non-distributed tables
|
||||
CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text);
|
||||
INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder');
|
||||
|
|
|
@ -138,11 +138,15 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::re
|
|||
1210005
|
||||
(3 rows)
|
||||
|
||||
-- command can not be run inside transaction
|
||||
BEGIN; TRUNCATE TABLE test_truncate_range; COMMIT;
|
||||
ERROR: master_modify_multiple_shards cannot run inside a transaction block
|
||||
CONTEXT: SQL statement "SELECT master_modify_multiple_shards(commandText)"
|
||||
PL/pgSQL function citus_truncate_trigger() line 17 at PERFORM
|
||||
-- verify that truncate can be aborted
|
||||
INSERT INTO test_truncate_range VALUES (1);
|
||||
BEGIN; TRUNCATE TABLE test_truncate_range; ROLLBACK;
|
||||
SELECT count(*) FROM test_truncate_range;
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
DROP TABLE test_truncate_range;
|
||||
--
|
||||
-- truncate for hash distribution.
|
||||
|
@ -226,11 +230,15 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::reg
|
|||
1210009
|
||||
(4 rows)
|
||||
|
||||
-- command can not be run inside transaction
|
||||
BEGIN; TRUNCATE TABLE test_truncate_hash; COMMIT;
|
||||
ERROR: master_modify_multiple_shards cannot run inside a transaction block
|
||||
CONTEXT: SQL statement "SELECT master_modify_multiple_shards(commandText)"
|
||||
PL/pgSQL function citus_truncate_trigger() line 17 at PERFORM
|
||||
-- verify that truncate can be aborted
|
||||
INSERT INTO test_truncate_hash VALUES (1);
|
||||
BEGIN; TRUNCATE TABLE test_truncate_hash; ROLLBACK;
|
||||
SELECT count(*) FROM test_truncate_hash;
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
DROP TABLE test_truncate_hash;
|
||||
-- test with table with spaces in it
|
||||
CREATE TABLE "a b hash" (a int, b int);
|
||||
|
|
|
@ -46,11 +46,15 @@ COPY multi_shard_modify_test (t_key, t_name, t_value) FROM STDIN WITH (FORMAT 'c
|
|||
\.
|
||||
|
||||
-- Testing master_modify_multiple_shards
|
||||
-- Verify that master_modify_multiple_shards cannot be called in a transaction block
|
||||
|
||||
-- Verify that master_modify_multiple_shards can be rolled back
|
||||
BEGIN;
|
||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13');
|
||||
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 202');
|
||||
ROLLBACK;
|
||||
|
||||
SELECT count(*) FROM multi_shard_modify_test;
|
||||
|
||||
-- Check that master_modify_multiple_shards cannot be called with non-distributed tables
|
||||
CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text);
|
||||
INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder');
|
||||
|
|
|
@ -89,8 +89,10 @@ SELECT count(*) FROM test_truncate_range;
|
|||
-- verify 3 shards are still present
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
|
||||
|
||||
-- command can not be run inside transaction
|
||||
BEGIN; TRUNCATE TABLE test_truncate_range; COMMIT;
|
||||
-- verify that truncate can be aborted
|
||||
INSERT INTO test_truncate_range VALUES (1);
|
||||
BEGIN; TRUNCATE TABLE test_truncate_range; ROLLBACK;
|
||||
SELECT count(*) FROM test_truncate_range;
|
||||
|
||||
DROP TABLE test_truncate_range;
|
||||
|
||||
|
@ -136,8 +138,10 @@ SELECT count(*) FROM test_truncate_hash;
|
|||
-- verify 4 shards are still presents
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
|
||||
|
||||
-- command can not be run inside transaction
|
||||
BEGIN; TRUNCATE TABLE test_truncate_hash; COMMIT;
|
||||
-- verify that truncate can be aborted
|
||||
INSERT INTO test_truncate_hash VALUES (1);
|
||||
BEGIN; TRUNCATE TABLE test_truncate_hash; ROLLBACK;
|
||||
SELECT count(*) FROM test_truncate_hash;
|
||||
|
||||
DROP TABLE test_truncate_hash;
|
||||
|
||||
|
|
Loading…
Reference in New Issue