diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 62379fc3a..25fb34e43 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -213,11 +213,10 @@ RouterCreateScan(CustomScan *scan) Assert(isModificationQuery); if (IsMultiRowInsert(workerJob->jobQuery) || - (IsUpdateOrDelete(distributedPlan) && - MultiShardConnectionType == SEQUENTIAL_CONNECTION)) + MultiShardConnectionType == SEQUENTIAL_CONNECTION) { /* - * Multi shard update deletes while multi_shard_modify_mode equals + * Multi shard modifications while multi_shard_modify_mode equals * to 'sequential' or Multi-row INSERT are executed sequentially * instead of using parallel connections. */ diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index a28d1008c..118104299 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -20,6 +20,7 @@ #include "commands/dbcommands.h" #include "distributed/metadata_cache.h" #include "distributed/connection_management.h" +#include "distributed/multi_executor.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" #include "distributed/placement_connection.h" @@ -188,7 +189,17 @@ MultiClientPlacementConnectStart(List *placementAccessList, const char *userName MultiConnection *connection = NULL; ConnStatusType connStatusType = CONNECTION_OK; int32 connectionId = AllocateConnectionId(); - int connectionFlags = CONNECTION_PER_PLACEMENT; /* no cached connections for now */ + int connectionFlags = 0; + + /* + * Although we're opening connections for SELECT queries, we're relying + * on multi_shard_modify_mode GUC. The name of the GUC is unfortunate, but, + * adding one more GUC (or renaming the GUC) would make the UX even worse. + */ + if (MultiShardConnectionType == PARALLEL_CONNECTION) + { + connectionFlags = CONNECTION_PER_PLACEMENT; + } if (connectionId == INVALID_CONNECTION_ID) { diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 7a2f256ce..a89193656 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -40,7 +40,10 @@ #include "utils/memutils.h" -/* controls the connection type for multi shard update/delete queries */ +/* + * Controls the connection type for multi shard modifications, DDLs + * TRUNCATE and real-time SELECT queries. + */ int MultiShardConnectionType = PARALLEL_CONNECTION; diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 4d4f94993..26509b018 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -901,23 +901,33 @@ LookupWorkerForTask(HTAB *workerHash, Task *task, TaskExecution *taskExecution) /* * WorkerConnectionsExhausted determines if the current query has exhausted the * maximum number of open connections that can be made to a worker. + * + * Note that the function takes sequential exection of the queries into account + * as well. In other words, in the sequential mode, the connections are considered + * to be exahusted when there is already a connection opened to the given worker. */ static bool WorkerConnectionsExhausted(WorkerNodeState *workerNodeState) { bool reachedLimit = false; - /* - * A worker cannot accept more than max_connections connections. If we have a - * small number of workers with many shards, then a single query could exhaust - * max_connections unless we throttle here. We use the value of max_connections - * on the master as a proxy for the worker configuration to avoid introducing a - * new configuration value. - */ - if (workerNodeState->openConnectionCount >= MaxConnections) + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION && + workerNodeState->openConnectionCount >= 1) { reachedLimit = true; } + else if (MultiShardConnectionType == PARALLEL_CONNECTION && + workerNodeState->openConnectionCount >= MaxConnections) + { + /* + * A worker cannot accept more than max_connections connections. If we have a + * small number of workers with many shards, then a single query could exhaust + * max_connections unless we throttle here. We use the value of max_connections + * on the master as a proxy for the worker configuration to avoid introducing a + * new configuration value. + */ + reachedLimit = true; + } return reachedLimit; } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 64b188530..1689eddfa 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -82,7 +82,8 @@ static void AcquireMetadataLocks(List *taskList); static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, - bool failOnError, bool expectResults); + CmdType operation, bool failOnError, + bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static List * GetModifyConnections(Task *task, bool markCritical); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, @@ -479,6 +480,7 @@ RouterSequentialModifyExecScan(CustomScanState *node) EState *executorState = scanState->customScanState.ss.ps.state; bool taskListRequires2PC = TaskListRequires2PC(taskList); bool failOnError = false; + CmdType operation = scanState->distributedPlan->operation; /* * We could naturally handle function-based transactions (i.e. those using @@ -521,7 +523,8 @@ RouterSequentialModifyExecScan(CustomScanState *node) Task *task = (Task *) lfirst(taskCell); executorState->es_processed += - ExecuteSingleModifyTask(scanState, task, failOnError, hasReturning); + ExecuteSingleModifyTask(scanState, task, operation, failOnError, + hasReturning); } scanState->finishedRemoteScan = true; @@ -817,10 +820,9 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access * The function returns affectedTupleCount if applicable. */ static int64 -ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError, - bool expectResults) +ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation, + bool failOnError, bool expectResults) { - CmdType operation = CMD_UNKNOWN; EState *executorState = NULL; ParamListInfo paramListInfo = NULL; List *taskPlacementList = task->taskPlacementList; @@ -839,7 +841,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError, if (scanState) { - operation = scanState->distributedPlan->operation; executorState = scanState->customScanState.ss.ps.state; paramListInfo = executorState->es_param_list_info; } @@ -864,9 +865,15 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError, LockPartitionRelations(relationId, RowExclusiveLock); } - if (task->taskType == MODIFY_TASK) + /* + * Prevent replicas of the same shard from diverging. We don't + * need to acquire lock for TRUNCATE and DDLs since they already + * acquire the necessary locks on the relations, and blocks any + * unsafe concurrent operations. + */ + if (operation == CMD_INSERT || operation == CMD_UPDATE || + operation == CMD_DELETE || operation == CMD_SELECT) { - /* prevent replicas of the same shard from diverging */ AcquireExecutorShardLock(task, operation); } @@ -1106,7 +1113,7 @@ ExecuteModifyTasksWithoutResults(List *taskList) * returns 0. */ int64 -ExecuteModifyTasksSequentiallyWithoutResults(List *taskList) +ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) { ListCell *taskCell = NULL; bool multipleTasks = list_length(taskList) > 1; @@ -1142,7 +1149,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList) Task *task = (Task *) lfirst(taskCell); affectedTupleCount += - ExecuteSingleModifyTask(NULL, task, failOnError, expectResults); + ExecuteSingleModifyTask(NULL, task, operation, failOnError, expectResults); } return affectedTupleCount; @@ -1196,6 +1203,10 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn * Ensure that there are no concurrent modifications on the same * shards. For DDL commands, we already obtained the appropriate * locks in ProcessUtility. + * + * We don't need to acquire lock for TRUNCATE_TASK since it already + * acquires AccessExclusiveLock on the relation, and blocks any + * concurrent operation. */ if (firstTask->taskType == MODIFY_TASK) { diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index debe6d44e..072485f50 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -3133,7 +3133,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } else { - ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList); + ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY); } } else @@ -3145,7 +3145,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) PG_TRY(); { - ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList); + ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY); if (shouldSyncMetadata) { diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index f438c1d09..8b4409d7f 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -84,6 +84,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) List *prunedShardIntervalList = NIL; List *taskList = NIL; int32 affectedTupleCount = 0; + CmdType operation = CMD_UNKNOWN; #if (PG_VERSION_NUM >= 100000) RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); queryTreeNode = rawStmt->stmt; @@ -147,7 +148,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) #endif modifyQuery = (Query *) linitial(queryTreeList); - if (modifyQuery->commandType != CMD_UTILITY) + operation = modifyQuery->commandType; + if (operation != CMD_UTILITY) { bool multiShardQuery = true; DeferredErrorMessage *error = @@ -176,8 +178,18 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) CHECK_FOR_INTERRUPTS(); - taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList); - affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList); + taskList = + ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList); + + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) + { + affectedTupleCount = + ExecuteModifyTasksSequentiallyWithoutResults(taskList, operation); + } + else + { + affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList); + } PG_RETURN_INT32(affectedTupleCount); } diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 45e7cd553..94f5beba7 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -41,7 +41,8 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); -extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList); +extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, + CmdType operation); /* helper functions */ extern bool TaskListRequires2PC(List *taskList); diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 7e1955dcb..af7d0577c 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1717,13 +1717,20 @@ ROLLBACK; -- Altering a reference table and then performing an INSERT ... SELECT which -- joins with the reference table is not allowed, since the INSERT ... SELECT -- would read from the reference table over others connections than the ones --- that performed the DDL. +-- that performed the parallel DDL. BEGIN; ALTER TABLE reference_table ADD COLUMN z int; INSERT INTO raw_events_first (user_id) SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); ERROR: cannot establish a new connection for placement 13300025, since DDL has been executed on a connection that is in use ROLLBACK; +-- the same test with sequential DDL should work fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +ALTER TABLE reference_table ADD COLUMN z int; +INSERT INTO raw_events_first (user_id) +SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); +ROLLBACK; -- Insert after copy is allowed BEGIN; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; diff --git a/src/test/regress/expected/multi_real_time_transaction.out b/src/test/regress/expected/multi_real_time_transaction.out index 110fc51a3..a1f0ee882 100644 --- a/src/test/regress/expected/multi_real_time_transaction.out +++ b/src/test/regress/expected/multi_real_time_transaction.out @@ -340,6 +340,22 @@ ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock +ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; + id | pg_advisory_lock +----+------------------ + 6 | + 5 | + 4 | + 3 | + 2 | + 1 | +(6 rows) + ROLLBACK; SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; diff --git a/src/test/regress/expected/multi_real_time_transaction_0.out b/src/test/regress/expected/multi_real_time_transaction_0.out index 33147487a..27a3d7603 100644 --- a/src/test/regress/expected/multi_real_time_transaction_0.out +++ b/src/test/regress/expected/multi_real_time_transaction_0.out @@ -348,6 +348,22 @@ ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock +ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; + id | pg_advisory_lock +----+------------------ + 6 | + 5 | + 4 | + 3 | + 2 | + 1 | +(6 rows) + ROLLBACK; SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; diff --git a/src/test/regress/expected/sequential_modifications.out b/src/test/regress/expected/sequential_modifications.out index 55db9818d..8483abf68 100644 --- a/src/test/regress/expected/sequential_modifications.out +++ b/src/test/regress/expected/sequential_modifications.out @@ -272,6 +272,195 @@ SELECT no_distributed_2PCs(); t (1 row) +-- test TRUNCATE on sequential and parallel modes +CREATE TABLE test_seq_truncate (a int); +INSERT INTO test_seq_truncate SELECT i FROM generate_series(0, 100) i; +SELECT create_distributed_table('test_seq_truncate', 'a'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +-- truncate with rep > 1 should work both in parallel and seq. modes +CREATE TABLE test_seq_truncate_rep_2 (a int); +INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('test_seq_truncate_rep_2', 'a'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +CREATE TABLE multi_shard_modify_test ( + t_key integer not null, + t_name varchar(25) not null, + t_value integer not null); +SELECT create_distributed_table('multi_shard_modify_test', 't_key'); + create_distributed_table +-------------------------- + +(1 row) + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); + master_modify_multiple_shards +------------------------------- + 0 +(1 row) + +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); + master_modify_multiple_shards +------------------------------- + 0 +(1 row) + +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +-- one more realistic test with sequential inserts and truncate in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + -- now switch to sequential mode to enable a successful TRUNCATE + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + TRUNCATE multi_shard_modify_test; +COMMIT; +-- see that all the data successfully removed +SELECT count(*) FROM multi_shard_modify_test; + count +------- + 0 +(1 row) + +-- test INSERT ... SELECT queries +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +-- one more realistic test with sequential inserts and INSERT .. SELECT in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + -- now switch to sequential mode to enable a successful INSERT .. SELECT + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +COMMIT; +-- see that all the data successfully inserted +SELECT count(*) FROM multi_shard_modify_test; + count +------- + 210 +(1 row) + ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT; SELECT pg_reload_conf(); @@ -282,10 +471,13 @@ SELECT pg_reload_conf(); SET search_path TO 'public'; DROP SCHEMA test_seq_ddl CASCADE; -NOTICE: drop cascades to 6 other objects +NOTICE: drop cascades to 9 other objects DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count() drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count() drop cascades to function test_seq_ddl.no_distributed_2pcs() drop cascades to table test_seq_ddl.test_table drop cascades to table test_seq_ddl.ref_test drop cascades to table test_seq_ddl.test_table_rep_2 +drop cascades to table test_seq_ddl.test_seq_truncate +drop cascades to table test_seq_ddl.test_seq_truncate_rep_2 +drop cascades to table test_seq_ddl.multi_shard_modify_test diff --git a/src/test/regress/expected/with_transactions.out b/src/test/regress/expected/with_transactions.out index 5aebf2a83..1e0759b99 100644 --- a/src/test/regress/expected/with_transactions.out +++ b/src/test/regress/expected/with_transactions.out @@ -105,6 +105,23 @@ SELECT count(*) FROM second_raw_table; 0 (1 row) +-- sequential insert followed by a sequential real-time query should be fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +WITH ids_inserted AS +( + INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id +) +SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3; +DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries +DEBUG: generating subplan 17_1 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING raw_table.tenant_id +DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT income FROM with_transactions.second_raw_table WHERE (tenant_id OPERATOR(pg_catalog.=) ANY (SELECT ids_inserted.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_inserted)) ORDER BY income DESC LIMIT 3 +DEBUG: push down of limit count: 3 + income +-------- +(0 rows) + +ROLLBACK; RESET client_min_messages; RESET citus.shard_count; DROP SCHEMA with_transactions CASCADE; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index a05d79318..ecb6d34b5 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -1361,13 +1361,21 @@ ROLLBACK; -- Altering a reference table and then performing an INSERT ... SELECT which -- joins with the reference table is not allowed, since the INSERT ... SELECT -- would read from the reference table over others connections than the ones --- that performed the DDL. +-- that performed the parallel DDL. BEGIN; ALTER TABLE reference_table ADD COLUMN z int; INSERT INTO raw_events_first (user_id) SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); ROLLBACK; +-- the same test with sequential DDL should work fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +ALTER TABLE reference_table ADD COLUMN z int; +INSERT INTO raw_events_first (user_id) +SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); +ROLLBACK; + -- Insert after copy is allowed BEGIN; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; diff --git a/src/test/regress/sql/multi_real_time_transaction.sql b/src/test/regress/sql/multi_real_time_transaction.sql index 26e40b455..4c07be7df 100644 --- a/src/test/regress/sql/multi_real_time_transaction.sql +++ b/src/test/regress/sql/multi_real_time_transaction.sql @@ -213,6 +213,13 @@ BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; +ROLLBACK; + SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; SELECT pg_reload_conf(); diff --git a/src/test/regress/sql/sequential_modifications.sql b/src/test/regress/sql/sequential_modifications.sql index 23088fd1d..3f2d8af12 100644 --- a/src/test/regress/sql/sequential_modifications.sql +++ b/src/test/regress/sql/sequential_modifications.sql @@ -155,6 +155,95 @@ CREATE INDEX CONCURRENTLY test_table_rep_2_i_6 ON test_table_rep_2(a); -- we shouldn't see any distributed transactions SELECT no_distributed_2PCs(); +-- test TRUNCATE on sequential and parallel modes +CREATE TABLE test_seq_truncate (a int); +INSERT INTO test_seq_truncate SELECT i FROM generate_series(0, 100) i; +SELECT create_distributed_table('test_seq_truncate', 'a'); + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_placement_count(); + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_worker_count(); + +-- truncate with rep > 1 should work both in parallel and seq. modes +CREATE TABLE test_seq_truncate_rep_2 (a int); +INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('test_seq_truncate_rep_2', 'a'); + +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_worker_count(); + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_placement_count(); + +CREATE TABLE multi_shard_modify_test ( + t_key integer not null, + t_name varchar(25) not null, + t_value integer not null); +SELECT create_distributed_table('multi_shard_modify_test', 't_key'); + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); +SELECT distributed_2PCs_are_equal_to_placement_count(); + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); +SELECT distributed_2PCs_are_equal_to_worker_count(); + +-- one more realistic test with sequential inserts and truncate in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + + -- now switch to sequential mode to enable a successful TRUNCATE + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + TRUNCATE multi_shard_modify_test; +COMMIT; + +-- see that all the data successfully removed +SELECT count(*) FROM multi_shard_modify_test; + +-- test INSERT ... SELECT queries +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_worker_count(); + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_placement_count(); + +-- one more realistic test with sequential inserts and INSERT .. SELECT in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + + -- now switch to sequential mode to enable a successful INSERT .. SELECT + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +COMMIT; + +-- see that all the data successfully inserted +SELECT count(*) FROM multi_shard_modify_test; + ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT; SELECT pg_reload_conf(); diff --git a/src/test/regress/sql/with_transactions.sql b/src/test/regress/sql/with_transactions.sql index d85edd6d0..bb23ce195 100644 --- a/src/test/regress/sql/with_transactions.sql +++ b/src/test/regress/sql/with_transactions.sql @@ -70,6 +70,16 @@ SELECT count(*) FROM raw_table; SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00'; SELECT count(*) FROM second_raw_table; +-- sequential insert followed by a sequential real-time query should be fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +WITH ids_inserted AS +( + INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id +) +SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3; +ROLLBACK; + RESET client_min_messages; RESET citus.shard_count; DROP SCHEMA with_transactions CASCADE;