diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 64b188530..1689eddfa 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -82,7 +82,8 @@ static void AcquireMetadataLocks(List *taskList); static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, - bool failOnError, bool expectResults); + CmdType operation, bool failOnError, + bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static List * GetModifyConnections(Task *task, bool markCritical); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, @@ -479,6 +480,7 @@ RouterSequentialModifyExecScan(CustomScanState *node) EState *executorState = scanState->customScanState.ss.ps.state; bool taskListRequires2PC = TaskListRequires2PC(taskList); bool failOnError = false; + CmdType operation = scanState->distributedPlan->operation; /* * We could naturally handle function-based transactions (i.e. those using @@ -521,7 +523,8 @@ RouterSequentialModifyExecScan(CustomScanState *node) Task *task = (Task *) lfirst(taskCell); executorState->es_processed += - ExecuteSingleModifyTask(scanState, task, failOnError, hasReturning); + ExecuteSingleModifyTask(scanState, task, operation, failOnError, + hasReturning); } scanState->finishedRemoteScan = true; @@ -817,10 +820,9 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access * The function returns affectedTupleCount if applicable. */ static int64 -ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError, - bool expectResults) +ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation, + bool failOnError, bool expectResults) { - CmdType operation = CMD_UNKNOWN; EState *executorState = NULL; ParamListInfo paramListInfo = NULL; List *taskPlacementList = task->taskPlacementList; @@ -839,7 +841,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError, if (scanState) { - operation = scanState->distributedPlan->operation; executorState = scanState->customScanState.ss.ps.state; paramListInfo = executorState->es_param_list_info; } @@ -864,9 +865,15 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError, LockPartitionRelations(relationId, RowExclusiveLock); } - if (task->taskType == MODIFY_TASK) + /* + * Prevent replicas of the same shard from diverging. We don't + * need to acquire lock for TRUNCATE and DDLs since they already + * acquire the necessary locks on the relations, and blocks any + * unsafe concurrent operations. + */ + if (operation == CMD_INSERT || operation == CMD_UPDATE || + operation == CMD_DELETE || operation == CMD_SELECT) { - /* prevent replicas of the same shard from diverging */ AcquireExecutorShardLock(task, operation); } @@ -1106,7 +1113,7 @@ ExecuteModifyTasksWithoutResults(List *taskList) * returns 0. */ int64 -ExecuteModifyTasksSequentiallyWithoutResults(List *taskList) +ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) { ListCell *taskCell = NULL; bool multipleTasks = list_length(taskList) > 1; @@ -1142,7 +1149,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList) Task *task = (Task *) lfirst(taskCell); affectedTupleCount += - ExecuteSingleModifyTask(NULL, task, failOnError, expectResults); + ExecuteSingleModifyTask(NULL, task, operation, failOnError, expectResults); } return affectedTupleCount; @@ -1196,6 +1203,10 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn * Ensure that there are no concurrent modifications on the same * shards. For DDL commands, we already obtained the appropriate * locks in ProcessUtility. + * + * We don't need to acquire lock for TRUNCATE_TASK since it already + * acquires AccessExclusiveLock on the relation, and blocks any + * concurrent operation. */ if (firstTask->taskType == MODIFY_TASK) { diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index debe6d44e..072485f50 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -3133,7 +3133,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } else { - ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList); + ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY); } } else @@ -3145,7 +3145,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) PG_TRY(); { - ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList); + ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY); if (shouldSyncMetadata) { diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index f438c1d09..8b4409d7f 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -84,6 +84,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) List *prunedShardIntervalList = NIL; List *taskList = NIL; int32 affectedTupleCount = 0; + CmdType operation = CMD_UNKNOWN; #if (PG_VERSION_NUM >= 100000) RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); queryTreeNode = rawStmt->stmt; @@ -147,7 +148,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) #endif modifyQuery = (Query *) linitial(queryTreeList); - if (modifyQuery->commandType != CMD_UTILITY) + operation = modifyQuery->commandType; + if (operation != CMD_UTILITY) { bool multiShardQuery = true; DeferredErrorMessage *error = @@ -176,8 +178,18 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) CHECK_FOR_INTERRUPTS(); - taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList); - affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList); + taskList = + ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList); + + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) + { + affectedTupleCount = + ExecuteModifyTasksSequentiallyWithoutResults(taskList, operation); + } + else + { + affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList); + } PG_RETURN_INT32(affectedTupleCount); } diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 45e7cd553..94f5beba7 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -41,7 +41,8 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); -extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList); +extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, + CmdType operation); /* helper functions */ extern bool TaskListRequires2PC(List *taskList); diff --git a/src/test/regress/expected/sequential_modifications.out b/src/test/regress/expected/sequential_modifications.out index 55db9818d..3c8d5a287 100644 --- a/src/test/regress/expected/sequential_modifications.out +++ b/src/test/regress/expected/sequential_modifications.out @@ -272,6 +272,150 @@ SELECT no_distributed_2PCs(); t (1 row) +-- test TRUNCATE on sequential and parallel modes +CREATE TABLE test_seq_truncate (a int); +INSERT INTO test_seq_truncate SELECT i FROM generate_series(0, 100) i; +SELECT create_distributed_table('test_seq_truncate', 'a'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +-- truncate with rep > 1 should work both in parallel and seq. modes +CREATE TABLE test_seq_truncate_rep_2 (a int); +INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('test_seq_truncate_rep_2', 'a'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +CREATE TABLE multi_shard_modify_test ( + t_key integer not null, + t_name varchar(25) not null, + t_value integer not null); +SELECT create_distributed_table('multi_shard_modify_test', 't_key'); + create_distributed_table +-------------------------- + +(1 row) + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); + master_modify_multiple_shards +------------------------------- + 0 +(1 row) + +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); + master_modify_multiple_shards +------------------------------- + 0 +(1 row) + +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +-- one more realistic test with sequential inserts and truncate in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + -- now switch to sequential mode to enable a successful TRUNCATE + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + TRUNCATE multi_shard_modify_test; +COMMIT; +-- see that all the data successfully removed +SELECT count(*) FROM multi_shard_modify_test; + count +------- + 0 +(1 row) + ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT; SELECT pg_reload_conf(); @@ -282,10 +426,13 @@ SELECT pg_reload_conf(); SET search_path TO 'public'; DROP SCHEMA test_seq_ddl CASCADE; -NOTICE: drop cascades to 6 other objects +NOTICE: drop cascades to 9 other objects DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count() drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count() drop cascades to function test_seq_ddl.no_distributed_2pcs() drop cascades to table test_seq_ddl.test_table drop cascades to table test_seq_ddl.ref_test drop cascades to table test_seq_ddl.test_table_rep_2 +drop cascades to table test_seq_ddl.test_seq_truncate +drop cascades to table test_seq_ddl.test_seq_truncate_rep_2 +drop cascades to table test_seq_ddl.multi_shard_modify_test diff --git a/src/test/regress/sql/sequential_modifications.sql b/src/test/regress/sql/sequential_modifications.sql index 23088fd1d..4ca637355 100644 --- a/src/test/regress/sql/sequential_modifications.sql +++ b/src/test/regress/sql/sequential_modifications.sql @@ -155,6 +155,71 @@ CREATE INDEX CONCURRENTLY test_table_rep_2_i_6 ON test_table_rep_2(a); -- we shouldn't see any distributed transactions SELECT no_distributed_2PCs(); +-- test TRUNCATE on sequential and parallel modes +CREATE TABLE test_seq_truncate (a int); +INSERT INTO test_seq_truncate SELECT i FROM generate_series(0, 100) i; +SELECT create_distributed_table('test_seq_truncate', 'a'); + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_placement_count(); + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_worker_count(); + +-- truncate with rep > 1 should work both in parallel and seq. modes +CREATE TABLE test_seq_truncate_rep_2 (a int); +INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('test_seq_truncate_rep_2', 'a'); + +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_worker_count(); + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_placement_count(); + +CREATE TABLE multi_shard_modify_test ( + t_key integer not null, + t_name varchar(25) not null, + t_value integer not null); +SELECT create_distributed_table('multi_shard_modify_test', 't_key'); + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); +SELECT distributed_2PCs_are_equal_to_placement_count(); + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); +SELECT distributed_2PCs_are_equal_to_worker_count(); + +-- one more realistic test with sequential inserts and truncate in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + + -- now switch to sequential mode to enable a successful TRUNCATE + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + TRUNCATE multi_shard_modify_test; +COMMIT; + +-- see that all the data successfully removed +SELECT count(*) FROM multi_shard_modify_test; + + ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT; SELECT pg_reload_conf();