master_modify_multiple_shards() and TRUNCATE honors multi_shard_modification_mode

pull/2198/head
Onder Kalaci 2018-05-29 15:47:37 +03:00
parent 87270911d3
commit 336044f2a8
6 changed files with 253 additions and 17 deletions

View File

@ -82,7 +82,8 @@ static void AcquireMetadataLocks(List *taskList);
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType); ShardPlacementAccessType accessType);
static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
bool failOnError, bool expectResults); CmdType operation, bool failOnError,
bool expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * GetModifyConnections(Task *task, bool markCritical); static List * GetModifyConnections(Task *task, bool markCritical);
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
@ -479,6 +480,7 @@ RouterSequentialModifyExecScan(CustomScanState *node)
EState *executorState = scanState->customScanState.ss.ps.state; EState *executorState = scanState->customScanState.ss.ps.state;
bool taskListRequires2PC = TaskListRequires2PC(taskList); bool taskListRequires2PC = TaskListRequires2PC(taskList);
bool failOnError = false; bool failOnError = false;
CmdType operation = scanState->distributedPlan->operation;
/* /*
* We could naturally handle function-based transactions (i.e. those using * We could naturally handle function-based transactions (i.e. those using
@ -521,7 +523,8 @@ RouterSequentialModifyExecScan(CustomScanState *node)
Task *task = (Task *) lfirst(taskCell); Task *task = (Task *) lfirst(taskCell);
executorState->es_processed += executorState->es_processed +=
ExecuteSingleModifyTask(scanState, task, failOnError, hasReturning); ExecuteSingleModifyTask(scanState, task, operation, failOnError,
hasReturning);
} }
scanState->finishedRemoteScan = true; scanState->finishedRemoteScan = true;
@ -817,10 +820,9 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access
* The function returns affectedTupleCount if applicable. * The function returns affectedTupleCount if applicable.
*/ */
static int64 static int64
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError, ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation,
bool expectResults) bool failOnError, bool expectResults)
{ {
CmdType operation = CMD_UNKNOWN;
EState *executorState = NULL; EState *executorState = NULL;
ParamListInfo paramListInfo = NULL; ParamListInfo paramListInfo = NULL;
List *taskPlacementList = task->taskPlacementList; List *taskPlacementList = task->taskPlacementList;
@ -839,7 +841,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError,
if (scanState) if (scanState)
{ {
operation = scanState->distributedPlan->operation;
executorState = scanState->customScanState.ss.ps.state; executorState = scanState->customScanState.ss.ps.state;
paramListInfo = executorState->es_param_list_info; paramListInfo = executorState->es_param_list_info;
} }
@ -864,9 +865,15 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError,
LockPartitionRelations(relationId, RowExclusiveLock); LockPartitionRelations(relationId, RowExclusiveLock);
} }
if (task->taskType == MODIFY_TASK) /*
* Prevent replicas of the same shard from diverging. We don't
* need to acquire lock for TRUNCATE and DDLs since they already
* acquire the necessary locks on the relations, and blocks any
* unsafe concurrent operations.
*/
if (operation == CMD_INSERT || operation == CMD_UPDATE ||
operation == CMD_DELETE || operation == CMD_SELECT)
{ {
/* prevent replicas of the same shard from diverging */
AcquireExecutorShardLock(task, operation); AcquireExecutorShardLock(task, operation);
} }
@ -1106,7 +1113,7 @@ ExecuteModifyTasksWithoutResults(List *taskList)
* returns 0. * returns 0.
*/ */
int64 int64
ExecuteModifyTasksSequentiallyWithoutResults(List *taskList) ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
{ {
ListCell *taskCell = NULL; ListCell *taskCell = NULL;
bool multipleTasks = list_length(taskList) > 1; bool multipleTasks = list_length(taskList) > 1;
@ -1142,7 +1149,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList)
Task *task = (Task *) lfirst(taskCell); Task *task = (Task *) lfirst(taskCell);
affectedTupleCount += affectedTupleCount +=
ExecuteSingleModifyTask(NULL, task, failOnError, expectResults); ExecuteSingleModifyTask(NULL, task, operation, failOnError, expectResults);
} }
return affectedTupleCount; return affectedTupleCount;
@ -1196,6 +1203,10 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
* Ensure that there are no concurrent modifications on the same * Ensure that there are no concurrent modifications on the same
* shards. For DDL commands, we already obtained the appropriate * shards. For DDL commands, we already obtained the appropriate
* locks in ProcessUtility. * locks in ProcessUtility.
*
* We don't need to acquire lock for TRUNCATE_TASK since it already
* acquires AccessExclusiveLock on the relation, and blocks any
* concurrent operation.
*/ */
if (firstTask->taskType == MODIFY_TASK) if (firstTask->taskType == MODIFY_TASK)
{ {

View File

@ -3133,7 +3133,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
} }
else else
{ {
ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList); ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY);
} }
} }
else else
@ -3145,7 +3145,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
PG_TRY(); PG_TRY();
{ {
ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList); ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY);
if (shouldSyncMetadata) if (shouldSyncMetadata)
{ {

View File

@ -84,6 +84,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
List *prunedShardIntervalList = NIL; List *prunedShardIntervalList = NIL;
List *taskList = NIL; List *taskList = NIL;
int32 affectedTupleCount = 0; int32 affectedTupleCount = 0;
CmdType operation = CMD_UNKNOWN;
#if (PG_VERSION_NUM >= 100000) #if (PG_VERSION_NUM >= 100000)
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
queryTreeNode = rawStmt->stmt; queryTreeNode = rawStmt->stmt;
@ -147,7 +148,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
#endif #endif
modifyQuery = (Query *) linitial(queryTreeList); modifyQuery = (Query *) linitial(queryTreeList);
if (modifyQuery->commandType != CMD_UTILITY) operation = modifyQuery->commandType;
if (operation != CMD_UTILITY)
{ {
bool multiShardQuery = true; bool multiShardQuery = true;
DeferredErrorMessage *error = DeferredErrorMessage *error =
@ -176,8 +178,18 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList); taskList =
affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList); ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList);
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
affectedTupleCount =
ExecuteModifyTasksSequentiallyWithoutResults(taskList, operation);
}
else
{
affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList);
}
PG_RETURN_INT32(affectedTupleCount); PG_RETURN_INT32(affectedTupleCount);
} }

View File

@ -41,7 +41,8 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList); extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,
CmdType operation);
/* helper functions */ /* helper functions */
extern bool TaskListRequires2PC(List *taskList); extern bool TaskListRequires2PC(List *taskList);

View File

@ -272,6 +272,150 @@ SELECT no_distributed_2PCs();
t t
(1 row) (1 row)
-- test TRUNCATE on sequential and parallel modes
CREATE TABLE test_seq_truncate (a int);
INSERT INTO test_seq_truncate SELECT i FROM generate_series(0, 100) i;
SELECT create_distributed_table('test_seq_truncate', 'a');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
-- with parallel modification mode, we should see #shards records
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
TRUNCATE test_seq_truncate;
SELECT distributed_2PCs_are_equal_to_placement_count();
distributed_2pcs_are_equal_to_placement_count
-----------------------------------------------
t
(1 row)
-- with sequential modification mode, we should see #primary worker records
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
TRUNCATE test_seq_truncate;
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
-- truncate with rep > 1 should work both in parallel and seq. modes
CREATE TABLE test_seq_truncate_rep_2 (a int);
INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('test_seq_truncate_rep_2', 'a');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
TRUNCATE test_seq_truncate_rep_2;
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
TRUNCATE test_seq_truncate_rep_2;
SELECT distributed_2PCs_are_equal_to_placement_count();
distributed_2pcs_are_equal_to_placement_count
-----------------------------------------------
t
(1 row)
CREATE TABLE multi_shard_modify_test (
t_key integer not null,
t_name varchar(25) not null,
t_value integer not null);
SELECT create_distributed_table('multi_shard_modify_test', 't_key');
create_distributed_table
--------------------------
(1 row)
-- with parallel modification mode, we should see #shards records
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test');
master_modify_multiple_shards
-------------------------------
0
(1 row)
SELECT distributed_2PCs_are_equal_to_placement_count();
distributed_2pcs_are_equal_to_placement_count
-----------------------------------------------
t
(1 row)
-- with sequential modification mode, we should see #primary worker records
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test');
master_modify_multiple_shards
-------------------------------
0
(1 row)
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
-- one more realistic test with sequential inserts and truncate in the same tx
INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i;
BEGIN;
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
-- now switch to sequential mode to enable a successful TRUNCATE
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
TRUNCATE multi_shard_modify_test;
COMMIT;
-- see that all the data successfully removed
SELECT count(*) FROM multi_shard_modify_test;
count
-------
0
(1 row)
ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT;
SET citus.shard_replication_factor TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
@ -282,10 +426,13 @@ SELECT pg_reload_conf();
SET search_path TO 'public'; SET search_path TO 'public';
DROP SCHEMA test_seq_ddl CASCADE; DROP SCHEMA test_seq_ddl CASCADE;
NOTICE: drop cascades to 6 other objects NOTICE: drop cascades to 9 other objects
DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count() DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count()
drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count() drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count()
drop cascades to function test_seq_ddl.no_distributed_2pcs() drop cascades to function test_seq_ddl.no_distributed_2pcs()
drop cascades to table test_seq_ddl.test_table drop cascades to table test_seq_ddl.test_table
drop cascades to table test_seq_ddl.ref_test drop cascades to table test_seq_ddl.ref_test
drop cascades to table test_seq_ddl.test_table_rep_2 drop cascades to table test_seq_ddl.test_table_rep_2
drop cascades to table test_seq_ddl.test_seq_truncate
drop cascades to table test_seq_ddl.test_seq_truncate_rep_2
drop cascades to table test_seq_ddl.multi_shard_modify_test

View File

@ -155,6 +155,71 @@ CREATE INDEX CONCURRENTLY test_table_rep_2_i_6 ON test_table_rep_2(a);
-- we shouldn't see any distributed transactions -- we shouldn't see any distributed transactions
SELECT no_distributed_2PCs(); SELECT no_distributed_2PCs();
-- test TRUNCATE on sequential and parallel modes
CREATE TABLE test_seq_truncate (a int);
INSERT INTO test_seq_truncate SELECT i FROM generate_series(0, 100) i;
SELECT create_distributed_table('test_seq_truncate', 'a');
-- with parallel modification mode, we should see #shards records
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
TRUNCATE test_seq_truncate;
SELECT distributed_2PCs_are_equal_to_placement_count();
-- with sequential modification mode, we should see #primary worker records
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
TRUNCATE test_seq_truncate;
SELECT distributed_2PCs_are_equal_to_worker_count();
-- truncate with rep > 1 should work both in parallel and seq. modes
CREATE TABLE test_seq_truncate_rep_2 (a int);
INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('test_seq_truncate_rep_2', 'a');
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
TRUNCATE test_seq_truncate_rep_2;
SELECT distributed_2PCs_are_equal_to_worker_count();
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
TRUNCATE test_seq_truncate_rep_2;
SELECT distributed_2PCs_are_equal_to_placement_count();
CREATE TABLE multi_shard_modify_test (
t_key integer not null,
t_name varchar(25) not null,
t_value integer not null);
SELECT create_distributed_table('multi_shard_modify_test', 't_key');
-- with parallel modification mode, we should see #shards records
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test');
SELECT distributed_2PCs_are_equal_to_placement_count();
-- with sequential modification mode, we should see #primary worker records
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test');
SELECT distributed_2PCs_are_equal_to_worker_count();
-- one more realistic test with sequential inserts and truncate in the same tx
INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i;
BEGIN;
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
-- now switch to sequential mode to enable a successful TRUNCATE
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
TRUNCATE multi_shard_modify_test;
COMMIT;
-- see that all the data successfully removed
SELECT count(*) FROM multi_shard_modify_test;
ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT;
SET citus.shard_replication_factor TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT;
SELECT pg_reload_conf(); SELECT pg_reload_conf();