mirror of https://github.com/citusdata/citus.git
Merge pull request #2194 from citusdata/seq_m_m_m_shards
master_modify_multiple_shards() and TRUNCATE honour `citus.multi_shard_modify_mode`pull/2205/head
commit
39e681a0f5
|
@ -213,11 +213,10 @@ RouterCreateScan(CustomScan *scan)
|
||||||
Assert(isModificationQuery);
|
Assert(isModificationQuery);
|
||||||
|
|
||||||
if (IsMultiRowInsert(workerJob->jobQuery) ||
|
if (IsMultiRowInsert(workerJob->jobQuery) ||
|
||||||
(IsUpdateOrDelete(distributedPlan) &&
|
MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||||
MultiShardConnectionType == SEQUENTIAL_CONNECTION))
|
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Multi shard update deletes while multi_shard_modify_mode equals
|
* Multi shard modifications while multi_shard_modify_mode equals
|
||||||
* to 'sequential' or Multi-row INSERT are executed sequentially
|
* to 'sequential' or Multi-row INSERT are executed sequentially
|
||||||
* instead of using parallel connections.
|
* instead of using parallel connections.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
#include "distributed/placement_connection.h"
|
#include "distributed/placement_connection.h"
|
||||||
|
@ -188,7 +189,17 @@ MultiClientPlacementConnectStart(List *placementAccessList, const char *userName
|
||||||
MultiConnection *connection = NULL;
|
MultiConnection *connection = NULL;
|
||||||
ConnStatusType connStatusType = CONNECTION_OK;
|
ConnStatusType connStatusType = CONNECTION_OK;
|
||||||
int32 connectionId = AllocateConnectionId();
|
int32 connectionId = AllocateConnectionId();
|
||||||
int connectionFlags = CONNECTION_PER_PLACEMENT; /* no cached connections for now */
|
int connectionFlags = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Although we're opening connections for SELECT queries, we're relying
|
||||||
|
* on multi_shard_modify_mode GUC. The name of the GUC is unfortunate, but,
|
||||||
|
* adding one more GUC (or renaming the GUC) would make the UX even worse.
|
||||||
|
*/
|
||||||
|
if (MultiShardConnectionType == PARALLEL_CONNECTION)
|
||||||
|
{
|
||||||
|
connectionFlags = CONNECTION_PER_PLACEMENT;
|
||||||
|
}
|
||||||
|
|
||||||
if (connectionId == INVALID_CONNECTION_ID)
|
if (connectionId == INVALID_CONNECTION_ID)
|
||||||
{
|
{
|
||||||
|
|
|
@ -40,7 +40,10 @@
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
|
||||||
|
|
||||||
/* controls the connection type for multi shard update/delete queries */
|
/*
|
||||||
|
* Controls the connection type for multi shard modifications, DDLs
|
||||||
|
* TRUNCATE and real-time SELECT queries.
|
||||||
|
*/
|
||||||
int MultiShardConnectionType = PARALLEL_CONNECTION;
|
int MultiShardConnectionType = PARALLEL_CONNECTION;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -901,23 +901,33 @@ LookupWorkerForTask(HTAB *workerHash, Task *task, TaskExecution *taskExecution)
|
||||||
/*
|
/*
|
||||||
* WorkerConnectionsExhausted determines if the current query has exhausted the
|
* WorkerConnectionsExhausted determines if the current query has exhausted the
|
||||||
* maximum number of open connections that can be made to a worker.
|
* maximum number of open connections that can be made to a worker.
|
||||||
|
*
|
||||||
|
* Note that the function takes sequential exection of the queries into account
|
||||||
|
* as well. In other words, in the sequential mode, the connections are considered
|
||||||
|
* to be exahusted when there is already a connection opened to the given worker.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
WorkerConnectionsExhausted(WorkerNodeState *workerNodeState)
|
WorkerConnectionsExhausted(WorkerNodeState *workerNodeState)
|
||||||
{
|
{
|
||||||
bool reachedLimit = false;
|
bool reachedLimit = false;
|
||||||
|
|
||||||
/*
|
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION &&
|
||||||
* A worker cannot accept more than max_connections connections. If we have a
|
workerNodeState->openConnectionCount >= 1)
|
||||||
* small number of workers with many shards, then a single query could exhaust
|
|
||||||
* max_connections unless we throttle here. We use the value of max_connections
|
|
||||||
* on the master as a proxy for the worker configuration to avoid introducing a
|
|
||||||
* new configuration value.
|
|
||||||
*/
|
|
||||||
if (workerNodeState->openConnectionCount >= MaxConnections)
|
|
||||||
{
|
{
|
||||||
reachedLimit = true;
|
reachedLimit = true;
|
||||||
}
|
}
|
||||||
|
else if (MultiShardConnectionType == PARALLEL_CONNECTION &&
|
||||||
|
workerNodeState->openConnectionCount >= MaxConnections)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* A worker cannot accept more than max_connections connections. If we have a
|
||||||
|
* small number of workers with many shards, then a single query could exhaust
|
||||||
|
* max_connections unless we throttle here. We use the value of max_connections
|
||||||
|
* on the master as a proxy for the worker configuration to avoid introducing a
|
||||||
|
* new configuration value.
|
||||||
|
*/
|
||||||
|
reachedLimit = true;
|
||||||
|
}
|
||||||
|
|
||||||
return reachedLimit;
|
return reachedLimit;
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,7 +82,8 @@ static void AcquireMetadataLocks(List *taskList);
|
||||||
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
|
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
|
||||||
ShardPlacementAccessType accessType);
|
ShardPlacementAccessType accessType);
|
||||||
static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
|
static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
|
||||||
bool failOnError, bool expectResults);
|
CmdType operation, bool failOnError,
|
||||||
|
bool expectResults);
|
||||||
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
|
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
|
||||||
static List * GetModifyConnections(Task *task, bool markCritical);
|
static List * GetModifyConnections(Task *task, bool markCritical);
|
||||||
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
|
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
|
||||||
|
@ -479,6 +480,7 @@ RouterSequentialModifyExecScan(CustomScanState *node)
|
||||||
EState *executorState = scanState->customScanState.ss.ps.state;
|
EState *executorState = scanState->customScanState.ss.ps.state;
|
||||||
bool taskListRequires2PC = TaskListRequires2PC(taskList);
|
bool taskListRequires2PC = TaskListRequires2PC(taskList);
|
||||||
bool failOnError = false;
|
bool failOnError = false;
|
||||||
|
CmdType operation = scanState->distributedPlan->operation;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We could naturally handle function-based transactions (i.e. those using
|
* We could naturally handle function-based transactions (i.e. those using
|
||||||
|
@ -521,7 +523,8 @@ RouterSequentialModifyExecScan(CustomScanState *node)
|
||||||
Task *task = (Task *) lfirst(taskCell);
|
Task *task = (Task *) lfirst(taskCell);
|
||||||
|
|
||||||
executorState->es_processed +=
|
executorState->es_processed +=
|
||||||
ExecuteSingleModifyTask(scanState, task, failOnError, hasReturning);
|
ExecuteSingleModifyTask(scanState, task, operation, failOnError,
|
||||||
|
hasReturning);
|
||||||
}
|
}
|
||||||
|
|
||||||
scanState->finishedRemoteScan = true;
|
scanState->finishedRemoteScan = true;
|
||||||
|
@ -817,10 +820,9 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access
|
||||||
* The function returns affectedTupleCount if applicable.
|
* The function returns affectedTupleCount if applicable.
|
||||||
*/
|
*/
|
||||||
static int64
|
static int64
|
||||||
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError,
|
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation,
|
||||||
bool expectResults)
|
bool failOnError, bool expectResults)
|
||||||
{
|
{
|
||||||
CmdType operation = CMD_UNKNOWN;
|
|
||||||
EState *executorState = NULL;
|
EState *executorState = NULL;
|
||||||
ParamListInfo paramListInfo = NULL;
|
ParamListInfo paramListInfo = NULL;
|
||||||
List *taskPlacementList = task->taskPlacementList;
|
List *taskPlacementList = task->taskPlacementList;
|
||||||
|
@ -839,7 +841,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError,
|
||||||
|
|
||||||
if (scanState)
|
if (scanState)
|
||||||
{
|
{
|
||||||
operation = scanState->distributedPlan->operation;
|
|
||||||
executorState = scanState->customScanState.ss.ps.state;
|
executorState = scanState->customScanState.ss.ps.state;
|
||||||
paramListInfo = executorState->es_param_list_info;
|
paramListInfo = executorState->es_param_list_info;
|
||||||
}
|
}
|
||||||
|
@ -864,9 +865,15 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError,
|
||||||
LockPartitionRelations(relationId, RowExclusiveLock);
|
LockPartitionRelations(relationId, RowExclusiveLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (task->taskType == MODIFY_TASK)
|
/*
|
||||||
|
* Prevent replicas of the same shard from diverging. We don't
|
||||||
|
* need to acquire lock for TRUNCATE and DDLs since they already
|
||||||
|
* acquire the necessary locks on the relations, and blocks any
|
||||||
|
* unsafe concurrent operations.
|
||||||
|
*/
|
||||||
|
if (operation == CMD_INSERT || operation == CMD_UPDATE ||
|
||||||
|
operation == CMD_DELETE || operation == CMD_SELECT)
|
||||||
{
|
{
|
||||||
/* prevent replicas of the same shard from diverging */
|
|
||||||
AcquireExecutorShardLock(task, operation);
|
AcquireExecutorShardLock(task, operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1106,7 +1113,7 @@ ExecuteModifyTasksWithoutResults(List *taskList)
|
||||||
* returns 0.
|
* returns 0.
|
||||||
*/
|
*/
|
||||||
int64
|
int64
|
||||||
ExecuteModifyTasksSequentiallyWithoutResults(List *taskList)
|
ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
|
||||||
{
|
{
|
||||||
ListCell *taskCell = NULL;
|
ListCell *taskCell = NULL;
|
||||||
bool multipleTasks = list_length(taskList) > 1;
|
bool multipleTasks = list_length(taskList) > 1;
|
||||||
|
@ -1142,7 +1149,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList)
|
||||||
Task *task = (Task *) lfirst(taskCell);
|
Task *task = (Task *) lfirst(taskCell);
|
||||||
|
|
||||||
affectedTupleCount +=
|
affectedTupleCount +=
|
||||||
ExecuteSingleModifyTask(NULL, task, failOnError, expectResults);
|
ExecuteSingleModifyTask(NULL, task, operation, failOnError, expectResults);
|
||||||
}
|
}
|
||||||
|
|
||||||
return affectedTupleCount;
|
return affectedTupleCount;
|
||||||
|
@ -1196,6 +1203,10 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
* Ensure that there are no concurrent modifications on the same
|
* Ensure that there are no concurrent modifications on the same
|
||||||
* shards. For DDL commands, we already obtained the appropriate
|
* shards. For DDL commands, we already obtained the appropriate
|
||||||
* locks in ProcessUtility.
|
* locks in ProcessUtility.
|
||||||
|
*
|
||||||
|
* We don't need to acquire lock for TRUNCATE_TASK since it already
|
||||||
|
* acquires AccessExclusiveLock on the relation, and blocks any
|
||||||
|
* concurrent operation.
|
||||||
*/
|
*/
|
||||||
if (firstTask->taskType == MODIFY_TASK)
|
if (firstTask->taskType == MODIFY_TASK)
|
||||||
{
|
{
|
||||||
|
|
|
@ -3133,7 +3133,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList);
|
ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -3145,7 +3145,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
|
|
||||||
PG_TRY();
|
PG_TRY();
|
||||||
{
|
{
|
||||||
ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList);
|
ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY);
|
||||||
|
|
||||||
if (shouldSyncMetadata)
|
if (shouldSyncMetadata)
|
||||||
{
|
{
|
||||||
|
|
|
@ -84,6 +84,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
List *prunedShardIntervalList = NIL;
|
List *prunedShardIntervalList = NIL;
|
||||||
List *taskList = NIL;
|
List *taskList = NIL;
|
||||||
int32 affectedTupleCount = 0;
|
int32 affectedTupleCount = 0;
|
||||||
|
CmdType operation = CMD_UNKNOWN;
|
||||||
#if (PG_VERSION_NUM >= 100000)
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
|
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
|
||||||
queryTreeNode = rawStmt->stmt;
|
queryTreeNode = rawStmt->stmt;
|
||||||
|
@ -147,7 +148,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
#endif
|
#endif
|
||||||
modifyQuery = (Query *) linitial(queryTreeList);
|
modifyQuery = (Query *) linitial(queryTreeList);
|
||||||
|
|
||||||
if (modifyQuery->commandType != CMD_UTILITY)
|
operation = modifyQuery->commandType;
|
||||||
|
if (operation != CMD_UTILITY)
|
||||||
{
|
{
|
||||||
bool multiShardQuery = true;
|
bool multiShardQuery = true;
|
||||||
DeferredErrorMessage *error =
|
DeferredErrorMessage *error =
|
||||||
|
@ -176,8 +178,18 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList);
|
taskList =
|
||||||
affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList);
|
ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList);
|
||||||
|
|
||||||
|
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||||
|
{
|
||||||
|
affectedTupleCount =
|
||||||
|
ExecuteModifyTasksSequentiallyWithoutResults(taskList, operation);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList);
|
||||||
|
}
|
||||||
|
|
||||||
PG_RETURN_INT32(affectedTupleCount);
|
PG_RETURN_INT32(affectedTupleCount);
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,8 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
|
||||||
extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node);
|
extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node);
|
||||||
|
|
||||||
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
|
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
|
||||||
extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList);
|
extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,
|
||||||
|
CmdType operation);
|
||||||
|
|
||||||
/* helper functions */
|
/* helper functions */
|
||||||
extern bool TaskListRequires2PC(List *taskList);
|
extern bool TaskListRequires2PC(List *taskList);
|
||||||
|
|
|
@ -1717,13 +1717,20 @@ ROLLBACK;
|
||||||
-- Altering a reference table and then performing an INSERT ... SELECT which
|
-- Altering a reference table and then performing an INSERT ... SELECT which
|
||||||
-- joins with the reference table is not allowed, since the INSERT ... SELECT
|
-- joins with the reference table is not allowed, since the INSERT ... SELECT
|
||||||
-- would read from the reference table over others connections than the ones
|
-- would read from the reference table over others connections than the ones
|
||||||
-- that performed the DDL.
|
-- that performed the parallel DDL.
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE reference_table ADD COLUMN z int;
|
ALTER TABLE reference_table ADD COLUMN z int;
|
||||||
INSERT INTO raw_events_first (user_id)
|
INSERT INTO raw_events_first (user_id)
|
||||||
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
|
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
|
||||||
ERROR: cannot establish a new connection for placement 13300025, since DDL has been executed on a connection that is in use
|
ERROR: cannot establish a new connection for placement 13300025, since DDL has been executed on a connection that is in use
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
-- the same test with sequential DDL should work fine
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
ALTER TABLE reference_table ADD COLUMN z int;
|
||||||
|
INSERT INTO raw_events_first (user_id)
|
||||||
|
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
|
||||||
|
ROLLBACK;
|
||||||
-- Insert after copy is allowed
|
-- Insert after copy is allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
||||||
|
|
|
@ -340,6 +340,22 @@ ROLLBACK;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT id, pg_advisory_lock(15) FROM test_table;
|
SELECT id, pg_advisory_lock(15) FROM test_table;
|
||||||
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
||||||
|
ROLLBACK;
|
||||||
|
-- sequential real-time queries should be successfully executed
|
||||||
|
-- since the queries are sent over the same connection
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC;
|
||||||
|
id | pg_advisory_lock
|
||||||
|
----+------------------
|
||||||
|
6 |
|
||||||
|
5 |
|
||||||
|
4 |
|
||||||
|
3 |
|
||||||
|
2 |
|
||||||
|
1 |
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
alter system set deadlock_timeout TO DEFAULT;
|
alter system set deadlock_timeout TO DEFAULT;
|
||||||
|
|
|
@ -348,6 +348,22 @@ ROLLBACK;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT id, pg_advisory_lock(15) FROM test_table;
|
SELECT id, pg_advisory_lock(15) FROM test_table;
|
||||||
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
||||||
|
ROLLBACK;
|
||||||
|
-- sequential real-time queries should be successfully executed
|
||||||
|
-- since the queries are sent over the same connection
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC;
|
||||||
|
id | pg_advisory_lock
|
||||||
|
----+------------------
|
||||||
|
6 |
|
||||||
|
5 |
|
||||||
|
4 |
|
||||||
|
3 |
|
||||||
|
2 |
|
||||||
|
1 |
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
alter system set deadlock_timeout TO DEFAULT;
|
alter system set deadlock_timeout TO DEFAULT;
|
||||||
|
|
|
@ -272,6 +272,195 @@ SELECT no_distributed_2PCs();
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- test TRUNCATE on sequential and parallel modes
|
||||||
|
CREATE TABLE test_seq_truncate (a int);
|
||||||
|
INSERT INTO test_seq_truncate SELECT i FROM generate_series(0, 100) i;
|
||||||
|
SELECT create_distributed_table('test_seq_truncate', 'a');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- with parallel modification mode, we should see #shards records
|
||||||
|
SET citus.multi_shard_modify_mode TO 'parallel';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
TRUNCATE test_seq_truncate;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_placement_count();
|
||||||
|
distributed_2pcs_are_equal_to_placement_count
|
||||||
|
-----------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- with sequential modification mode, we should see #primary worker records
|
||||||
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
TRUNCATE test_seq_truncate;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_worker_count();
|
||||||
|
distributed_2pcs_are_equal_to_worker_count
|
||||||
|
--------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- truncate with rep > 1 should work both in parallel and seq. modes
|
||||||
|
CREATE TABLE test_seq_truncate_rep_2 (a int);
|
||||||
|
INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i;
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
SELECT create_distributed_table('test_seq_truncate_rep_2', 'a');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
TRUNCATE test_seq_truncate_rep_2;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_worker_count();
|
||||||
|
distributed_2pcs_are_equal_to_worker_count
|
||||||
|
--------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.multi_shard_modify_mode TO 'parallel';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
TRUNCATE test_seq_truncate_rep_2;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_placement_count();
|
||||||
|
distributed_2pcs_are_equal_to_placement_count
|
||||||
|
-----------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE multi_shard_modify_test (
|
||||||
|
t_key integer not null,
|
||||||
|
t_name varchar(25) not null,
|
||||||
|
t_value integer not null);
|
||||||
|
SELECT create_distributed_table('multi_shard_modify_test', 't_key');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- with parallel modification mode, we should see #shards records
|
||||||
|
SET citus.multi_shard_modify_mode TO 'parallel';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test');
|
||||||
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT distributed_2PCs_are_equal_to_placement_count();
|
||||||
|
distributed_2pcs_are_equal_to_placement_count
|
||||||
|
-----------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- with sequential modification mode, we should see #primary worker records
|
||||||
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test');
|
||||||
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT distributed_2PCs_are_equal_to_worker_count();
|
||||||
|
distributed_2pcs_are_equal_to_worker_count
|
||||||
|
--------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- one more realistic test with sequential inserts and truncate in the same tx
|
||||||
|
INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
||||||
|
-- now switch to sequential mode to enable a successful TRUNCATE
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
TRUNCATE multi_shard_modify_test;
|
||||||
|
COMMIT;
|
||||||
|
-- see that all the data successfully removed
|
||||||
|
SELECT count(*) FROM multi_shard_modify_test;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test INSERT ... SELECT queries
|
||||||
|
-- with sequential modification mode, we should see #primary worker records
|
||||||
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_worker_count();
|
||||||
|
distributed_2pcs_are_equal_to_worker_count
|
||||||
|
--------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.multi_shard_modify_mode TO 'parallel';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_placement_count();
|
||||||
|
distributed_2pcs_are_equal_to_placement_count
|
||||||
|
-----------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- one more realistic test with sequential inserts and INSERT .. SELECT in the same tx
|
||||||
|
INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
||||||
|
-- now switch to sequential mode to enable a successful INSERT .. SELECT
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
|
||||||
|
COMMIT;
|
||||||
|
-- see that all the data successfully inserted
|
||||||
|
SELECT count(*) FROM multi_shard_modify_test;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
210
|
||||||
|
(1 row)
|
||||||
|
|
||||||
ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT;
|
ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT;
|
||||||
SET citus.shard_replication_factor TO DEFAULT;
|
SET citus.shard_replication_factor TO DEFAULT;
|
||||||
SELECT pg_reload_conf();
|
SELECT pg_reload_conf();
|
||||||
|
@ -282,10 +471,13 @@ SELECT pg_reload_conf();
|
||||||
|
|
||||||
SET search_path TO 'public';
|
SET search_path TO 'public';
|
||||||
DROP SCHEMA test_seq_ddl CASCADE;
|
DROP SCHEMA test_seq_ddl CASCADE;
|
||||||
NOTICE: drop cascades to 6 other objects
|
NOTICE: drop cascades to 9 other objects
|
||||||
DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count()
|
DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count()
|
||||||
drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count()
|
drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count()
|
||||||
drop cascades to function test_seq_ddl.no_distributed_2pcs()
|
drop cascades to function test_seq_ddl.no_distributed_2pcs()
|
||||||
drop cascades to table test_seq_ddl.test_table
|
drop cascades to table test_seq_ddl.test_table
|
||||||
drop cascades to table test_seq_ddl.ref_test
|
drop cascades to table test_seq_ddl.ref_test
|
||||||
drop cascades to table test_seq_ddl.test_table_rep_2
|
drop cascades to table test_seq_ddl.test_table_rep_2
|
||||||
|
drop cascades to table test_seq_ddl.test_seq_truncate
|
||||||
|
drop cascades to table test_seq_ddl.test_seq_truncate_rep_2
|
||||||
|
drop cascades to table test_seq_ddl.multi_shard_modify_test
|
||||||
|
|
|
@ -105,6 +105,23 @@ SELECT count(*) FROM second_raw_table;
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- sequential insert followed by a sequential real-time query should be fine
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
WITH ids_inserted AS
|
||||||
|
(
|
||||||
|
INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id
|
||||||
|
)
|
||||||
|
SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3;
|
||||||
|
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
|
||||||
|
DEBUG: generating subplan 17_1 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING raw_table.tenant_id
|
||||||
|
DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT income FROM with_transactions.second_raw_table WHERE (tenant_id OPERATOR(pg_catalog.=) ANY (SELECT ids_inserted.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_inserted)) ORDER BY income DESC LIMIT 3
|
||||||
|
DEBUG: push down of limit count: 3
|
||||||
|
income
|
||||||
|
--------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
RESET citus.shard_count;
|
RESET citus.shard_count;
|
||||||
DROP SCHEMA with_transactions CASCADE;
|
DROP SCHEMA with_transactions CASCADE;
|
||||||
|
|
|
@ -1361,13 +1361,21 @@ ROLLBACK;
|
||||||
-- Altering a reference table and then performing an INSERT ... SELECT which
|
-- Altering a reference table and then performing an INSERT ... SELECT which
|
||||||
-- joins with the reference table is not allowed, since the INSERT ... SELECT
|
-- joins with the reference table is not allowed, since the INSERT ... SELECT
|
||||||
-- would read from the reference table over others connections than the ones
|
-- would read from the reference table over others connections than the ones
|
||||||
-- that performed the DDL.
|
-- that performed the parallel DDL.
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE reference_table ADD COLUMN z int;
|
ALTER TABLE reference_table ADD COLUMN z int;
|
||||||
INSERT INTO raw_events_first (user_id)
|
INSERT INTO raw_events_first (user_id)
|
||||||
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
|
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- the same test with sequential DDL should work fine
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
ALTER TABLE reference_table ADD COLUMN z int;
|
||||||
|
INSERT INTO raw_events_first (user_id)
|
||||||
|
SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
-- Insert after copy is allowed
|
-- Insert after copy is allowed
|
||||||
BEGIN;
|
BEGIN;
|
||||||
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
|
||||||
|
|
|
@ -213,6 +213,13 @@ BEGIN;
|
||||||
SELECT id, pg_advisory_lock(15) FROM test_table;
|
SELECT id, pg_advisory_lock(15) FROM test_table;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- sequential real-time queries should be successfully executed
|
||||||
|
-- since the queries are sent over the same connection
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
alter system set deadlock_timeout TO DEFAULT;
|
alter system set deadlock_timeout TO DEFAULT;
|
||||||
SELECT pg_reload_conf();
|
SELECT pg_reload_conf();
|
||||||
|
|
|
@ -155,6 +155,95 @@ CREATE INDEX CONCURRENTLY test_table_rep_2_i_6 ON test_table_rep_2(a);
|
||||||
-- we shouldn't see any distributed transactions
|
-- we shouldn't see any distributed transactions
|
||||||
SELECT no_distributed_2PCs();
|
SELECT no_distributed_2PCs();
|
||||||
|
|
||||||
|
-- test TRUNCATE on sequential and parallel modes
|
||||||
|
CREATE TABLE test_seq_truncate (a int);
|
||||||
|
INSERT INTO test_seq_truncate SELECT i FROM generate_series(0, 100) i;
|
||||||
|
SELECT create_distributed_table('test_seq_truncate', 'a');
|
||||||
|
|
||||||
|
-- with parallel modification mode, we should see #shards records
|
||||||
|
SET citus.multi_shard_modify_mode TO 'parallel';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
TRUNCATE test_seq_truncate;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_placement_count();
|
||||||
|
|
||||||
|
-- with sequential modification mode, we should see #primary worker records
|
||||||
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
TRUNCATE test_seq_truncate;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_worker_count();
|
||||||
|
|
||||||
|
-- truncate with rep > 1 should work both in parallel and seq. modes
|
||||||
|
CREATE TABLE test_seq_truncate_rep_2 (a int);
|
||||||
|
INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i;
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
SELECT create_distributed_table('test_seq_truncate_rep_2', 'a');
|
||||||
|
|
||||||
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
TRUNCATE test_seq_truncate_rep_2;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_worker_count();
|
||||||
|
|
||||||
|
SET citus.multi_shard_modify_mode TO 'parallel';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
TRUNCATE test_seq_truncate_rep_2;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_placement_count();
|
||||||
|
|
||||||
|
CREATE TABLE multi_shard_modify_test (
|
||||||
|
t_key integer not null,
|
||||||
|
t_name varchar(25) not null,
|
||||||
|
t_value integer not null);
|
||||||
|
SELECT create_distributed_table('multi_shard_modify_test', 't_key');
|
||||||
|
|
||||||
|
-- with parallel modification mode, we should see #shards records
|
||||||
|
SET citus.multi_shard_modify_mode TO 'parallel';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test');
|
||||||
|
SELECT distributed_2PCs_are_equal_to_placement_count();
|
||||||
|
|
||||||
|
-- with sequential modification mode, we should see #primary worker records
|
||||||
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test');
|
||||||
|
SELECT distributed_2PCs_are_equal_to_worker_count();
|
||||||
|
|
||||||
|
-- one more realistic test with sequential inserts and truncate in the same tx
|
||||||
|
INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
||||||
|
|
||||||
|
-- now switch to sequential mode to enable a successful TRUNCATE
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
TRUNCATE multi_shard_modify_test;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- see that all the data successfully removed
|
||||||
|
SELECT count(*) FROM multi_shard_modify_test;
|
||||||
|
|
||||||
|
-- test INSERT ... SELECT queries
|
||||||
|
-- with sequential modification mode, we should see #primary worker records
|
||||||
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_worker_count();
|
||||||
|
|
||||||
|
SET citus.multi_shard_modify_mode TO 'parallel';
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
|
||||||
|
SELECT distributed_2PCs_are_equal_to_placement_count();
|
||||||
|
|
||||||
|
-- one more realistic test with sequential inserts and INSERT .. SELECT in the same tx
|
||||||
|
INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
||||||
|
|
||||||
|
-- now switch to sequential mode to enable a successful INSERT .. SELECT
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- see that all the data successfully inserted
|
||||||
|
SELECT count(*) FROM multi_shard_modify_test;
|
||||||
|
|
||||||
ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT;
|
ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT;
|
||||||
SET citus.shard_replication_factor TO DEFAULT;
|
SET citus.shard_replication_factor TO DEFAULT;
|
||||||
SELECT pg_reload_conf();
|
SELECT pg_reload_conf();
|
||||||
|
|
|
@ -70,6 +70,16 @@ SELECT count(*) FROM raw_table;
|
||||||
SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00';
|
SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00';
|
||||||
SELECT count(*) FROM second_raw_table;
|
SELECT count(*) FROM second_raw_table;
|
||||||
|
|
||||||
|
-- sequential insert followed by a sequential real-time query should be fine
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
WITH ids_inserted AS
|
||||||
|
(
|
||||||
|
INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id
|
||||||
|
)
|
||||||
|
SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
RESET citus.shard_count;
|
RESET citus.shard_count;
|
||||||
DROP SCHEMA with_transactions CASCADE;
|
DROP SCHEMA with_transactions CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue