Parallelise master_modify_multiple_shards

pull/855/head
Marco Slot 2016-10-07 03:26:04 +02:00
parent 9d98acfb6d
commit a497e7178c
15 changed files with 410 additions and 291 deletions

View File

@ -433,7 +433,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
} }
/* prevent concurrent placement changes and non-commutative DML statements */ /* prevent concurrent placement changes and non-commutative DML statements */
LockShards(shardIntervalList, ShareLock); LockShardListMetadata(shardIntervalList, ShareLock);
LockShardListResources(shardIntervalList, ShareLock);
/* initialize the shard interval cache */ /* initialize the shard interval cache */
shardCount = cacheEntry->shardIntervalArrayLength; shardCount = cacheEntry->shardIntervalArrayLength;

View File

@ -28,6 +28,7 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/citus_clauses.h" #include "distributed/citus_clauses.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -35,6 +36,7 @@
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h" #include "distributed/multi_planner.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "executor/execdesc.h" #include "executor/execdesc.h"
@ -64,6 +66,7 @@
/* controls use of locks to enforce safe commutativity */ /* controls use of locks to enforce safe commutativity */
bool AllModificationsCommutative = false; bool AllModificationsCommutative = false;
/* /*
* The following static variables are necessary to track the progression of * The following static variables are necessary to track the progression of
* multi-statement transactions managed by the router executor. After the first * multi-statement transactions managed by the router executor. After the first
@ -84,15 +87,15 @@ static bool subXactAbortAttempted = false;
/* functions needed during start phase */ /* functions needed during start phase */
static void InitTransactionStateForTask(Task *task); static void InitTransactionStateForTask(Task *task);
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static HTAB * CreateXactParticipantHash(void); static HTAB * CreateXactParticipantHash(void);
/* functions needed during run phase */ /* functions needed during run phase */
static bool ExecuteTaskAndStoreResults(QueryDesc *queryDesc, static bool ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
Task *task, bool isModificationQuery, bool expectResults);
bool isModificationQuery, static void ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList,
bool requiresMasterModification, bool isModificationQuery, bool expectResults);
bool expectResults); static List * TaskShardIntervalList(List *taskList);
static void AcquireExecutorShardLock(Task *task, CmdType commandType);
static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor, static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescriptor,
DestReceiver *destination, DestReceiver *destination,
Tuplestorestate *tupleStore); Tuplestorestate *tupleStore);
@ -106,8 +109,8 @@ static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
static bool SendQueryInSingleRowMode(PGconn *connection, char *query, static bool SendQueryInSingleRowMode(PGconn *connection, char *query,
ParamListInfo paramListInfo); ParamListInfo paramListInfo);
static bool StoreQueryResult(MaterialState *routerState, PGconn *connection, static bool StoreQueryResult(MaterialState *routerState, PGconn *connection,
TupleDesc tupleDescriptor, int64 *rows); TupleDesc tupleDescriptor, bool failOnError, int64 *rows);
static bool ConsumeQueryResult(PGconn *connection, int64 *rows); static bool ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows);
static void RecordShardIdParticipant(uint64 affectedShardId, static void RecordShardIdParticipant(uint64 affectedShardId,
NodeConnectionEntry *participantEntry); NodeConnectionEntry *participantEntry);
@ -136,25 +139,6 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
if (commandType != CMD_SELECT) if (commandType != CMD_SELECT)
{ {
eflags |= EXEC_FLAG_SKIP_TRIGGERS; eflags |= EXEC_FLAG_SKIP_TRIGGERS;
if (XactModificationLevel == XACT_MODIFICATION_SCHEMA)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed data modifications must not appear in "
"transaction blocks which contain distributed DDL "
"commands")));
}
/*
* We could naturally handle function-based transactions (i.e. those
* using PL/pgSQL or similar) by checking the type of queryDesc->dest,
* but some customers already use functions that touch multiple shards
* from within a function, so we'll ignore functions for now.
*/
if (IsTransactionBlock() && xactParticipantHash == NULL)
{
InitTransactionStateForTask(task);
}
} }
/* signal that it is a router execution */ /* signal that it is a router execution */
@ -364,7 +348,6 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
MultiPlan *multiPlan = GetMultiPlan(planStatement); MultiPlan *multiPlan = GetMultiPlan(planStatement);
Job *workerJob = multiPlan->workerJob; Job *workerJob = multiPlan->workerJob;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
Task *task = NULL;
EState *estate = queryDesc->estate; EState *estate = queryDesc->estate;
CmdType operation = queryDesc->operation; CmdType operation = queryDesc->operation;
MemoryContext oldcontext = NULL; MemoryContext oldcontext = NULL;
@ -372,14 +355,8 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
MaterialState *routerState = (MaterialState *) queryDesc->planstate; MaterialState *routerState = (MaterialState *) queryDesc->planstate;
bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning; bool sendTuples = operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning;
/* router executor can only execute distributed plans with a single task */
Assert(list_length(taskList) == 1);
task = (Task *) linitial(taskList);
Assert(estate != NULL); Assert(estate != NULL);
Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY)); Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
Assert(task != NULL);
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
@ -415,7 +392,6 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
*/ */
if (!routerState->eof_underlying) if (!routerState->eof_underlying)
{ {
bool resultsOK = false;
bool isModificationQuery = false; bool isModificationQuery = false;
bool requiresMasterEvaluation = workerJob->requiresMasterEvaluation; bool requiresMasterEvaluation = workerJob->requiresMasterEvaluation;
@ -430,13 +406,45 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
(int) operation))); (int) operation)));
} }
resultsOK = ExecuteTaskAndStoreResults(queryDesc, task, if (requiresMasterEvaluation)
isModificationQuery,
requiresMasterEvaluation,
sendTuples);
if (!resultsOK)
{ {
ereport(ERROR, (errmsg("could not receive query results"))); ListCell *taskCell = NULL;
Query *query = workerJob->jobQuery;
Oid relationId = ((RangeTblEntry *) linitial(query->rtable))->relid;
ExecuteMasterEvaluableFunctions(query);
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
StringInfo newQueryString = makeStringInfo();
deparse_shard_query(query, relationId, task->anchorShardId,
newQueryString);
elog(DEBUG4, "query before master evaluation: %s", task->queryString);
elog(DEBUG4, "query after master evaluation: %s", newQueryString->data);
task->queryString = newQueryString->data;
}
}
if (list_length(taskList) == 1)
{
Task *task = (Task *) linitial(taskList);
bool resultsOK = false;
resultsOK = ExecuteSingleTask(queryDesc, task, isModificationQuery,
sendTuples);
if (!resultsOK)
{
ereport(ERROR, (errmsg("could not receive query results")));
}
}
else
{
ExecuteMultipleTasks(queryDesc, taskList, isModificationQuery,
sendTuples);
} }
/* mark underlying query as having executed */ /* mark underlying query as having executed */
@ -483,8 +491,8 @@ out:
/* /*
* ExecuteTaskAndStoreResults executes the task on the remote node, retrieves * ExecuteSingleTask executes the task on the remote node, retrieves the
* the results and stores them, if SELECT or RETURNING is used, in a tuple * results and stores them, if SELECT or RETURNING is used, in a tuple
* store. * store.
* *
* If the task fails on one of the placements, the function retries it on * If the task fails on one of the placements, the function retries it on
@ -493,12 +501,11 @@ out:
* failed), or errors out (DML failed on all placements). * failed), or errors out (DML failed on all placements).
*/ */
static bool static bool
ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
bool isModificationQuery, bool isModificationQuery,
bool requiresMasterEvaluation, bool expectResults)
bool expectResults)
{ {
CmdType commandType = queryDesc->operation; CmdType operation = queryDesc->operation;
TupleDesc tupleDescriptor = queryDesc->tupDesc; TupleDesc tupleDescriptor = queryDesc->tupDesc;
EState *executorState = queryDesc->estate; EState *executorState = queryDesc->estate;
MaterialState *routerState = (MaterialState *) queryDesc->planstate; MaterialState *routerState = (MaterialState *) queryDesc->planstate;
@ -511,24 +518,27 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
bool gotResults = false; bool gotResults = false;
char *queryString = task->queryString; char *queryString = task->queryString;
if (isModificationQuery && requiresMasterEvaluation) if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
{ {
PlannedStmt *planStatement = queryDesc->plannedstmt; ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
MultiPlan *multiPlan = GetMultiPlan(planStatement); errmsg("single-shard DML commands must not appear in "
Query *query = multiPlan->workerJob->jobQuery; "transaction blocks which contain multi-shard data "
Oid relid = ((RangeTblEntry *) linitial(query->rtable))->relid; "modifications")));
StringInfo queryStringInfo = makeStringInfo(); }
ExecuteMasterEvaluableFunctions(query); /*
deparse_shard_query(query, relid, task->anchorShardId, queryStringInfo); * We could naturally handle function-based transactions (i.e. those
queryString = queryStringInfo->data; * using PL/pgSQL or similar) by checking the type of queryDesc->dest,
* but some customers already use functions that touch multiple shards
elog(DEBUG4, "query before master evaluation: %s", task->queryString); * from within a function, so we'll ignore functions for now.
elog(DEBUG4, "query after master evaluation: %s", queryString); */
if (operation != CMD_SELECT && xactParticipantHash == NULL && IsTransactionBlock())
{
InitTransactionStateForTask(task);
} }
/* prevent replicas of the same shard from diverging */ /* prevent replicas of the same shard from diverging */
AcquireExecutorShardLock(task, commandType); AcquireExecutorShardLock(task, operation);
/* /*
* Try to run the query to completion on one placement. If the query fails * Try to run the query to completion on one placement. If the query fails
@ -538,6 +548,7 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
{ {
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
bool queryOK = false; bool queryOK = false;
bool failOnError = false;
int64 currentAffectedTupleCount = 0; int64 currentAffectedTupleCount = 0;
PGconn *connection = GetConnectionForPlacement(taskPlacement, PGconn *connection = GetConnectionForPlacement(taskPlacement,
isModificationQuery); isModificationQuery);
@ -564,11 +575,12 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
if (!gotResults && expectResults) if (!gotResults && expectResults)
{ {
queryOK = StoreQueryResult(routerState, connection, tupleDescriptor, queryOK = StoreQueryResult(routerState, connection, tupleDescriptor,
&currentAffectedTupleCount); failOnError, &currentAffectedTupleCount);
} }
else else
{ {
queryOK = ConsumeQueryResult(connection, &currentAffectedTupleCount); queryOK = ConsumeQueryResult(connection, failOnError,
&currentAffectedTupleCount);
} }
if (queryOK) if (queryOK)
@ -642,6 +654,200 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
} }
/*
* ExecuteMultipleTasks executes a list of tasks on remote nodes, retrieves
* the results and, if RETURNING is used, stores them in a tuple store.
*
* If a task fails on one of the placements, the transaction rolls back.
* Otherwise, the changes are committed using 2PC when the local transaction
* commits.
*/
static void
ExecuteMultipleTasks(QueryDesc *queryDesc, List *taskList,
bool isModificationQuery, bool expectResults)
{
TupleDesc tupleDescriptor = queryDesc->tupDesc;
EState *executorState = queryDesc->estate;
MaterialState *routerState = (MaterialState *) queryDesc->planstate;
ParamListInfo paramListInfo = queryDesc->params;
int64 affectedTupleCount = -1;
/* can only support modifications right now */
Assert(isModificationQuery);
affectedTupleCount = ExecuteModifyTasks(taskList, expectResults, paramListInfo,
routerState, tupleDescriptor);
executorState->es_processed = affectedTupleCount;
}
/*
* ExecuteModifyTasks executes a list of tasks on remote nodes, and
* optionally retrieves the results and stores them in a tuple store.
*
* If a task fails on one of the placements, the transaction rolls back.
* Otherwise, the changes are committed using 2PC when the local transaction
* commits.
*/
int64
ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo,
MaterialState *routerState, TupleDesc tupleDescriptor)
{
int64 totalAffectedTupleCount = 0;
ListCell *taskCell = NULL;
char *userName = CurrentUserName();
List *shardIntervalList = NIL;
if (XactModificationLevel == XACT_MODIFICATION_DATA)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("multi-shard data modifications must not appear in "
"transaction blocks which contain single-shard DML "
"commands")));
}
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
shardIntervalList = TaskShardIntervalList(taskList);
/* ensure that there are no concurrent modifications on the same shards */
LockShardListResources(shardIntervalList, ExclusiveLock);
/* open connection to all relevant placements, if not already open */
OpenTransactionsToAllShardPlacements(shardIntervalList, userName);
/* send command to all relevant shard placements */
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
int64 shardId = task->anchorShardId;
char *queryString = task->queryString;
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
List *connectionList = NIL;
ListCell *connectionCell = NULL;
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
Assert(shardConnectionsFound);
connectionList = shardConnections->connectionList;
Assert(connectionList != NIL);
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
bool queryOK = false;
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK)
{
ReraiseRemoteError(connection, NULL);
}
}
}
/* collects results from all relevant shard placements */
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
int64 shardId = task->anchorShardId;
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
List *connectionList = NIL;
ListCell *connectionCell = NULL;
int64 affectedTupleCount = 0;
bool gotResults = false;
/* abort in case of cancellation */
CHECK_FOR_INTERRUPTS();
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
connectionList = shardConnections->connectionList;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 currentAffectedTupleCount = 0;
bool failOnError = true;
bool queryOK PG_USED_FOR_ASSERTS_ONLY = false;
/*
* If caller is interested, store query results the first time
* through. The output of the query's execution on other shards is
* discarded if we run there (because it's a modification query).
*/
if (!gotResults && expectResults)
{
Assert(routerState != NULL && tupleDescriptor != NULL);
queryOK = StoreQueryResult(routerState, connection, tupleDescriptor,
failOnError, &currentAffectedTupleCount);
}
else
{
queryOK = ConsumeQueryResult(connection, failOnError,
&currentAffectedTupleCount);
}
/* should have rolled back on error */
Assert(queryOK);
if (!gotResults)
{
affectedTupleCount = currentAffectedTupleCount;
totalAffectedTupleCount += affectedTupleCount;
}
else if (currentAffectedTupleCount != affectedTupleCount)
{
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(WARNING,
(errmsg("modified "INT64_FORMAT " tuples, but expected "
"to modify "INT64_FORMAT,
currentAffectedTupleCount, affectedTupleCount),
errdetail("modified placement on %s:%s", nodeName,
nodePort)));
}
gotResults = true;
}
}
CHECK_FOR_INTERRUPTS();
return totalAffectedTupleCount;
}
/*
* TaskShardIntervalList returns a list of shard intervals for a given list of
* tasks.
*/
static List *
TaskShardIntervalList(List *taskList)
{
ListCell *taskCell = NULL;
List *shardIntervalList = NIL;
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
int64 shardId = task->anchorShardId;
ShardInterval *shardInterval = LoadShardInterval(shardId);
shardIntervalList = lappend(shardIntervalList, shardInterval);
}
return shardIntervalList;
}
/* /*
* ReturnRowsFromTuplestore moves rows from a given tuplestore into a * ReturnRowsFromTuplestore moves rows from a given tuplestore into a
* receiver. It performs the necessary limiting to support cursors. * receiver. It performs the necessary limiting to support cursors.
@ -929,7 +1135,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT
*/ */
static bool static bool
StoreQueryResult(MaterialState *routerState, PGconn *connection, StoreQueryResult(MaterialState *routerState, PGconn *connection,
TupleDesc tupleDescriptor, int64 *rows) TupleDesc tupleDescriptor, bool failOnError, int64 *rows)
{ {
AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
Tuplestorestate *tupleStore = NULL; Tuplestorestate *tupleStore = NULL;
@ -947,7 +1153,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
{ {
routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); routerState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem);
} }
else else if (!failOnError)
{ {
/* might have failed query execution on another placement before */ /* might have failed query execution on another placement before */
tuplestore_clear(routerState->tuplestorestate); tuplestore_clear(routerState->tuplestorestate);
@ -974,7 +1180,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
{ {
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
int category = 0; int category = 0;
bool raiseError = false; bool isConstraintViolation = false;
/* /*
* If the error code is in constraint violation class, we want to * If the error code is in constraint violation class, we want to
@ -982,9 +1188,9 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
* placements. * placements.
*/ */
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
raiseError = SqlStateMatchesCategory(sqlStateString, category); isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category);
if (raiseError) if (isConstraintViolation || failOnError)
{ {
RemoveXactConnection(connection); RemoveXactConnection(connection);
ReraiseRemoteError(connection, result); ReraiseRemoteError(connection, result);
@ -1056,7 +1262,7 @@ StoreQueryResult(MaterialState *routerState, PGconn *connection,
* has been an error. * has been an error.
*/ */
static bool static bool
ConsumeQueryResult(PGconn *connection, int64 *rows) ConsumeQueryResult(PGconn *connection, bool failOnError, int64 *rows)
{ {
bool commandFailed = false; bool commandFailed = false;
bool gotResponse = false; bool gotResponse = false;
@ -1086,7 +1292,7 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
{ {
char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE); char *sqlStateString = PQresultErrorField(result, PG_DIAG_SQLSTATE);
int category = 0; int category = 0;
bool raiseError = false; bool isConstraintViolation = false;
/* /*
* If the error code is in constraint violation class, we want to * If the error code is in constraint violation class, we want to
@ -1094,9 +1300,9 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
* placements. * placements.
*/ */
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
raiseError = SqlStateMatchesCategory(sqlStateString, category); isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category);
if (raiseError) if (isConstraintViolation || failOnError)
{ {
RemoveXactConnection(connection); RemoveXactConnection(connection);
ReraiseRemoteError(connection, result); ReraiseRemoteError(connection, result);
@ -1118,8 +1324,11 @@ ConsumeQueryResult(PGconn *connection, int64 *rows)
char *currentAffectedTupleString = PQcmdTuples(result); char *currentAffectedTupleString = PQcmdTuples(result);
int64 currentAffectedTupleCount = 0; int64 currentAffectedTupleCount = 0;
scanint8(currentAffectedTupleString, false, &currentAffectedTupleCount); if (*currentAffectedTupleString != '\0')
Assert(currentAffectedTupleCount >= 0); {
scanint8(currentAffectedTupleString, false, &currentAffectedTupleCount);
Assert(currentAffectedTupleCount >= 0);
}
#if (PG_VERSION_NUM < 90600) #if (PG_VERSION_NUM < 90600)

View File

@ -1292,7 +1292,8 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
{ {
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed DDL commands must not appear within " errmsg("distributed DDL commands must not appear within "
"transaction blocks containing data modifications"))); "transaction blocks containing single-shard data "
"modifications")));
} }
ShowNoticeIfNotUsing2PC(); ShowNoticeIfNotUsing2PC();
@ -1305,7 +1306,7 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
ereport(ERROR, (errmsg("could not execute DDL command on worker node shards"))); ereport(ERROR, (errmsg("could not execute DDL command on worker node shards")));
} }
XactModificationLevel = XACT_MODIFICATION_SCHEMA; XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
} }
@ -1359,7 +1360,7 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
Oid schemaId = get_rel_namespace(relationId); Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
LockShards(shardIntervalList, ShareLock); LockShardListResources(shardIntervalList, ShareLock);
OpenTransactionsToAllShardPlacements(shardIntervalList, tableOwner); OpenTransactionsToAllShardPlacements(shardIntervalList, tableOwner);
foreach(shardIntervalCell, shardIntervalList) foreach(shardIntervalCell, shardIntervalList)

View File

@ -41,6 +41,7 @@
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "optimizer/clauses.h" #include "optimizer/clauses.h"
#include "optimizer/predtest.h" #include "optimizer/predtest.h"
@ -55,11 +56,9 @@
#include "utils/memutils.h" #include "utils/memutils.h"
static void LockShardsForModify(List *shardIntervalList); static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList,
static bool HasReplication(List *shardIntervalList); Oid relationId);
static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId);
static int SendQueryToPlacements(char *shardQueryString,
ShardConnections *shardConnections);
PG_FUNCTION_INFO_V1(master_modify_multiple_shards); PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
@ -83,14 +82,12 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
Query *modifyQuery = NULL; Query *modifyQuery = NULL;
Node *queryTreeNode; Node *queryTreeNode;
List *restrictClauseList = NIL; List *restrictClauseList = NIL;
bool isTopLevel = true;
bool failOK = false; bool failOK = false;
List *shardIntervalList = NIL; List *shardIntervalList = NIL;
List *prunedShardIntervalList = NIL; List *prunedShardIntervalList = NIL;
List *taskList = NIL;
int32 affectedTupleCount = 0; int32 affectedTupleCount = 0;
PreventTransactionChain(isTopLevel, "master_modify_multiple_shards");
queryTreeNode = ParseTreeNode(queryString); queryTreeNode = ParseTreeNode(queryString);
if (IsA(queryTreeNode, DeleteStmt)) if (IsA(queryTreeNode, DeleteStmt))
{ {
@ -163,183 +160,50 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
LockShardsForModify(prunedShardIntervalList); taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList,
relationId);
affectedTupleCount = SendQueryToShards(modifyQuery, prunedShardIntervalList, affectedTupleCount = ExecuteModifyTasks(taskList, false, NULL, NULL, NULL);
relationId);
PG_RETURN_INT32(affectedTupleCount); PG_RETURN_INT32(affectedTupleCount);
} }
/* /*
* LockShardsForModify command locks the replicas of given shard. The * ModifyMultipleShardsTaskList builds a list of tasks to execute a query on a
* lock logic is slightly different from LockShards function. Basically, * given list of shards.
*
* 1. If citus.all_modifications_commutative is set to true, then all locks
* are acquired as ShareLock.
* 2. If citus.all_modifications_commutative is false, then only the shards
* with 2 or more replicas are locked with ExclusiveLock. Otherwise, the
* lock is acquired with ShareLock.
*/ */
static void static List *
LockShardsForModify(List *shardIntervalList) ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, Oid relationId)
{ {
LOCKMODE lockMode = NoLock; List *taskList = NIL;
ListCell *shardIntervalCell = NULL;
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
if (AllModificationsCommutative) /* lock metadata before getting placment lists */
{ LockShardListMetadata(shardIntervalList, ShareLock);
lockMode = ShareLock;
}
else if (!HasReplication(shardIntervalList))
{
lockMode = ShareLock;
}
else
{
lockMode = ExclusiveLock;
}
LockShards(shardIntervalList, lockMode);
}
/*
* HasReplication checks whether any of the shards in the given list has more
* than one replica.
*/
static bool
HasReplication(List *shardIntervalList)
{
ListCell *shardIntervalCell;
bool hasReplication = false;
foreach(shardIntervalCell, shardIntervalList) foreach(shardIntervalCell, shardIntervalList)
{ {
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
List *shardPlacementList = FinalizedShardPlacementList(shardId);
if (shardPlacementList->length > 1)
{
hasReplication = true;
}
}
return hasReplication;
}
/*
* SendQueryToShards executes the given query in all placements of the given
* shard list and returns the total affected tuple count. The execution is done
* in a distributed transaction and the commit protocol is decided according to
* the value of citus.multi_shard_commit_protocol parameter. SendQueryToShards
* does not acquire locks for the shards so it is advised to acquire locks to
* the shards when necessary before calling SendQueryToShards.
*/
static int
SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
{
int affectedTupleCount = 0;
char *relationOwner = TableOwner(relationId);
ListCell *shardIntervalCell = NULL;
OpenTransactionsToAllShardPlacements(shardIntervalList, relationOwner);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(
shardIntervalCell);
Oid relationId = shardInterval->relationId; Oid relationId = shardInterval->relationId;
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
StringInfo shardQueryString = makeStringInfo(); StringInfo shardQueryString = makeStringInfo();
char *shardQueryStringData = NULL; Task *task = NULL;
int shardAffectedTupleCount = -1;
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
Assert(shardConnectionsFound);
deparse_shard_query(query, relationId, shardId, shardQueryString); deparse_shard_query(query, relationId, shardId, shardQueryString);
shardQueryStringData = shardQueryString->data;
shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData, task = CitusMakeNode(Task);
shardConnections); task->jobId = jobId;
affectedTupleCount += shardAffectedTupleCount; task->taskId = taskId++;
task->taskType = SQL_TASK;
task->queryString = shardQueryString->data;
task->dependedTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
taskList = lappend(taskList, task);
} }
/* check for cancellation one last time before returning */ return taskList;
CHECK_FOR_INTERRUPTS();
return affectedTupleCount;
}
/*
* SendQueryToPlacements sends the given query string to all given placement
* connections of a shard. CommitRemoteTransactions or AbortRemoteTransactions
* should be called after all queries have been sent successfully.
*/
static int
SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections)
{
uint64 shardId = shardConnections->shardId;
List *connectionList = shardConnections->connectionList;
ListCell *connectionCell = NULL;
int32 shardAffectedTupleCount = -1;
Assert(connectionList != NIL);
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
PGresult *result = NULL;
char *placementAffectedTupleString = NULL;
int32 placementAffectedTupleCount = -1;
CHECK_FOR_INTERRUPTS();
/* send the query */
result = PQexec(connection, shardQueryString);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
WarnRemoteError(connection, result);
ereport(ERROR, (errmsg("could not send query to shard placement")));
}
placementAffectedTupleString = PQcmdTuples(result);
/* returned tuple count is empty for utility commands, use 0 as affected count */
if (*placementAffectedTupleString == '\0')
{
placementAffectedTupleCount = 0;
}
else
{
placementAffectedTupleCount = pg_atoi(placementAffectedTupleString,
sizeof(int32), 0);
}
if ((shardAffectedTupleCount == -1) ||
(shardAffectedTupleCount == placementAffectedTupleCount))
{
shardAffectedTupleCount = placementAffectedTupleCount;
}
else
{
ereport(ERROR,
(errmsg("modified %d tuples, but expected to modify %d",
placementAffectedTupleCount, shardAffectedTupleCount),
errdetail("Affected tuple counts at placements of shard "
UINT64_FORMAT " are different.", shardId)));
}
PQclear(result);
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
}
return shardAffectedTupleCount;
} }

View File

@ -153,7 +153,7 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
transactionConnection->groupId = workerNode->groupId; transactionConnection->groupId = workerNode->groupId;
transactionConnection->connectionId = shardConnections->shardId; transactionConnection->connectionId = shardConnections->shardId;
transactionConnection->transactionState = TRANSACTION_STATE_INVALID; transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
transactionConnection->connection = connection; transactionConnection->connection = connection;
transactionConnection->nodeName = shardPlacement->nodeName; transactionConnection->nodeName = shardPlacement->nodeName;
transactionConnection->nodePort = shardPlacement->nodePort; transactionConnection->nodePort = shardPlacement->nodePort;

View File

@ -125,21 +125,9 @@ void
PurgeConnection(PGconn *connection) PurgeConnection(PGconn *connection)
{ {
NodeConnectionKey nodeConnectionKey; NodeConnectionKey nodeConnectionKey;
PGconn *purgedConnection = NULL;
BuildKeyForConnection(connection, &nodeConnectionKey); BuildKeyForConnection(connection, &nodeConnectionKey);
purgedConnection = PurgeConnectionByKey(&nodeConnectionKey); PurgeConnectionByKey(&nodeConnectionKey);
/*
* It's possible the provided connection matches the host and port for
* an entry in the hash without being precisely the same connection. In
* that case, we will want to close the provided connection in addition
* to the one from the hash (which was closed by PurgeConnectionByKey).
*/
if (purgedConnection != connection)
{
PQfinish(connection);
}
} }

View File

@ -20,6 +20,7 @@
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_router_executor.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
@ -28,9 +29,8 @@
/* /*
* LockShardDistributionMetadata returns after grabbing a lock for distribution * LockShardDistributionMetadata returns after grabbing a lock for distribution
* metadata related to the specified shard, blocking if required. ExclusiveLock * metadata related to the specified shard, blocking if required. Any locks
* and ShareLock modes are supported. Any locks acquired using this method are * acquired using this method are released at transaction end.
* released at transaction end.
*/ */
void void
LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode) LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode)
@ -126,12 +126,11 @@ UnlockJobResource(uint64 jobId, LOCKMODE lockmode)
/* /*
* LockShards takes shared locks on the metadata and the data of all shards in * LockShardListMetadata takes shared locks on the metadata of all shards in
* shardIntervalList. This prevents concurrent placement changes and concurrent * shardIntervalList to prevents concurrent placement changes.
* DML statements that require an exclusive lock.
*/ */
void void
LockShards(List *shardIntervalList, LOCKMODE lockMode) LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode)
{ {
ListCell *shardIntervalCell = NULL; ListCell *shardIntervalCell = NULL;
@ -143,10 +142,28 @@ LockShards(List *shardIntervalList, LOCKMODE lockMode)
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
int64 shardId = shardInterval->shardId; int64 shardId = shardInterval->shardId;
/* prevent concurrent changes to number of placements */
LockShardDistributionMetadata(shardId, lockMode); LockShardDistributionMetadata(shardId, lockMode);
}
}
/*
* LockShardListResources takes locks on all shards in shardIntervalList to
* prevent concurrent DML statements on those shards.
*/
void
LockShardListResources(List *shardIntervalList, LOCKMODE lockMode)
{
ListCell *shardIntervalCell = NULL;
/* lock shards in order of shard id to prevent deadlock */
shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
int64 shardId = shardInterval->shardId;
/* prevent concurrent update/delete statements */
LockShardResource(shardId, lockMode); LockShardResource(shardId, lockMode);
} }
} }

View File

@ -58,7 +58,7 @@ typedef enum
XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */ XACT_MODIFICATION_INVALID = 0, /* placeholder initial value */
XACT_MODIFICATION_NONE, /* no modifications have taken place */ XACT_MODIFICATION_NONE, /* no modifications have taken place */
XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */ XACT_MODIFICATION_DATA, /* data modifications (DML) have occurred */
XACT_MODIFICATION_SCHEMA /* schema modifications (DDL) have occurred */ XACT_MODIFICATION_MULTI_SHARD /* multi-shard modifications have occurred */
} XactModificationType; } XactModificationType;

View File

@ -39,4 +39,8 @@ extern void RouterExecutorFinish(QueryDesc *queryDesc);
extern void RouterExecutorEnd(QueryDesc *queryDesc); extern void RouterExecutorEnd(QueryDesc *queryDesc);
extern void RegisterRouterExecutorXactCallbacks(void); extern void RegisterRouterExecutorXactCallbacks(void);
extern int64 ExecuteModifyTasks(List *taskList, bool expectResults,
ParamListInfo paramListInfo, MaterialState *routerState,
TupleDesc tupleDescriptor);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */ #endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -75,7 +75,10 @@ extern void UnlockShardResource(uint64 shardId, LOCKMODE lockmode);
extern void LockJobResource(uint64 jobId, LOCKMODE lockmode); extern void LockJobResource(uint64 jobId, LOCKMODE lockmode);
extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode);
extern void LockShards(List *shardIntervalList, LOCKMODE lockMode); /* Lock multiple shards for safe modification */
extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode);
extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode);
extern void LockMetadataSnapshot(LOCKMODE lockMode); extern void LockMetadataSnapshot(LOCKMODE lockMode);
#endif /* RESOURCE_LOCK_H */ #endif /* RESOURCE_LOCK_H */

View File

@ -168,13 +168,13 @@ ABORT;
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
ALTER TABLE labs ADD COLUMN motto text; ALTER TABLE labs ADD COLUMN motto text;
ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
COMMIT; COMMIT;
-- whether it occurs first or second -- whether it occurs first or second
BEGIN; BEGIN;
ALTER TABLE labs ADD COLUMN motto text; ALTER TABLE labs ADD COLUMN motto text;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
ERROR: distributed data modifications must not appear in transaction blocks which contain distributed DDL commands ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
COMMIT; COMMIT;
-- but the DDL should correctly roll back -- but the DDL should correctly roll back
\d labs \d labs
@ -244,7 +244,7 @@ SELECT * FROM labs WHERE id = 12;
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ALTER TABLE labs ADD COLUMN motto text; ALTER TABLE labs ADD COLUMN motto text;
ERROR: distributed DDL commands must not appear within transaction blocks containing data modifications ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
COMMIT; COMMIT;
-- the DDL fails, but copy persists -- the DDL fails, but copy persists
\d labs \d labs

View File

@ -22,11 +22,27 @@ SELECT master_create_worker_shards('multi_shard_modify_test', 4, 2);
COPY multi_shard_modify_test (t_key, t_name, t_value) FROM STDIN WITH (FORMAT 'csv'); COPY multi_shard_modify_test (t_key, t_name, t_value) FROM STDIN WITH (FORMAT 'csv');
-- Testing master_modify_multiple_shards -- Testing master_modify_multiple_shards
-- Verify that master_modify_multiple_shards cannot be called in a transaction block -- Verify that master_modify_multiple_shards can be rolled back
BEGIN; BEGIN;
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13'); SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13');
ERROR: master_modify_multiple_shards cannot run inside a transaction block master_modify_multiple_shards
-------------------------------
3
(1 row)
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 202');
master_modify_multiple_shards
-------------------------------
1
(1 row)
ROLLBACK; ROLLBACK;
SELECT count(*) FROM multi_shard_modify_test;
count
-------
27
(1 row)
-- Check that master_modify_multiple_shards cannot be called with non-distributed tables -- Check that master_modify_multiple_shards cannot be called with non-distributed tables
CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text); CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text);
INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder'); INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder');

View File

@ -138,11 +138,15 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::re
1210005 1210005
(3 rows) (3 rows)
-- command can not be run inside transaction -- verify that truncate can be aborted
BEGIN; TRUNCATE TABLE test_truncate_range; COMMIT; INSERT INTO test_truncate_range VALUES (1);
ERROR: master_modify_multiple_shards cannot run inside a transaction block BEGIN; TRUNCATE TABLE test_truncate_range; ROLLBACK;
CONTEXT: SQL statement "SELECT master_modify_multiple_shards(commandText)" SELECT count(*) FROM test_truncate_range;
PL/pgSQL function citus_truncate_trigger() line 17 at PERFORM count
-------
1
(1 row)
DROP TABLE test_truncate_range; DROP TABLE test_truncate_range;
-- --
-- truncate for hash distribution. -- truncate for hash distribution.
@ -226,11 +230,15 @@ SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::reg
1210009 1210009
(4 rows) (4 rows)
-- command can not be run inside transaction -- verify that truncate can be aborted
BEGIN; TRUNCATE TABLE test_truncate_hash; COMMIT; INSERT INTO test_truncate_hash VALUES (1);
ERROR: master_modify_multiple_shards cannot run inside a transaction block BEGIN; TRUNCATE TABLE test_truncate_hash; ROLLBACK;
CONTEXT: SQL statement "SELECT master_modify_multiple_shards(commandText)" SELECT count(*) FROM test_truncate_hash;
PL/pgSQL function citus_truncate_trigger() line 17 at PERFORM count
-------
1
(1 row)
DROP TABLE test_truncate_hash; DROP TABLE test_truncate_hash;
-- test with table with spaces in it -- test with table with spaces in it
CREATE TABLE "a b hash" (a int, b int); CREATE TABLE "a b hash" (a int, b int);

View File

@ -46,11 +46,15 @@ COPY multi_shard_modify_test (t_key, t_name, t_value) FROM STDIN WITH (FORMAT 'c
\. \.
-- Testing master_modify_multiple_shards -- Testing master_modify_multiple_shards
-- Verify that master_modify_multiple_shards cannot be called in a transaction block
-- Verify that master_modify_multiple_shards can be rolled back
BEGIN; BEGIN;
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13'); SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key > 10 AND t_key <= 13');
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = 202');
ROLLBACK; ROLLBACK;
SELECT count(*) FROM multi_shard_modify_test;
-- Check that master_modify_multiple_shards cannot be called with non-distributed tables -- Check that master_modify_multiple_shards cannot be called with non-distributed tables
CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text); CREATE TEMPORARY TABLE temporary_nondistributed_table (col_1 integer,col_2 text);
INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder'); INSERT INTO temporary_nondistributed_table VALUES (37, 'eren'), (31, 'onder');

View File

@ -89,8 +89,10 @@ SELECT count(*) FROM test_truncate_range;
-- verify 3 shards are still present -- verify 3 shards are still present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
-- command can not be run inside transaction -- verify that truncate can be aborted
BEGIN; TRUNCATE TABLE test_truncate_range; COMMIT; INSERT INTO test_truncate_range VALUES (1);
BEGIN; TRUNCATE TABLE test_truncate_range; ROLLBACK;
SELECT count(*) FROM test_truncate_range;
DROP TABLE test_truncate_range; DROP TABLE test_truncate_range;
@ -136,8 +138,10 @@ SELECT count(*) FROM test_truncate_hash;
-- verify 4 shards are still presents -- verify 4 shards are still presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
-- command can not be run inside transaction -- verify that truncate can be aborted
BEGIN; TRUNCATE TABLE test_truncate_hash; COMMIT; INSERT INTO test_truncate_hash VALUES (1);
BEGIN; TRUNCATE TABLE test_truncate_hash; ROLLBACK;
SELECT count(*) FROM test_truncate_hash;
DROP TABLE test_truncate_hash; DROP TABLE test_truncate_hash;
@ -173,4 +177,4 @@ TRUNCATE TABLE "a b append";
-- verify all shards are dropped -- verify all shards are dropped
SELECT shardid FROM pg_dist_shard where logicalrelid = '"a b append"'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = '"a b append"'::regclass;
DROP TABLE "a b append"; DROP TABLE "a b append";