From 336044f2a83ad33359c0e517c08dba4c478623bb Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 29 May 2018 15:47:37 +0300 Subject: [PATCH 1/3] master_modify_multiple_shards() and TRUNCATE honors multi_shard_modification_mode --- .../executor/multi_router_executor.c | 31 ++-- .../distributed/executor/multi_utility.c | 4 +- .../master/master_modify_multiple_shards.c | 18 ++- .../distributed/multi_router_executor.h | 3 +- .../expected/sequential_modifications.out | 149 +++++++++++++++++- .../regress/sql/sequential_modifications.sql | 65 ++++++++ 6 files changed, 253 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 64b188530..1689eddfa 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -82,7 +82,8 @@ static void AcquireMetadataLocks(List *taskList); static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, - bool failOnError, bool expectResults); + CmdType operation, bool failOnError, + bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static List * GetModifyConnections(Task *task, bool markCritical); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, @@ -479,6 +480,7 @@ RouterSequentialModifyExecScan(CustomScanState *node) EState *executorState = scanState->customScanState.ss.ps.state; bool taskListRequires2PC = TaskListRequires2PC(taskList); bool failOnError = false; + CmdType operation = scanState->distributedPlan->operation; /* * We could naturally handle function-based transactions (i.e. those using @@ -521,7 +523,8 @@ RouterSequentialModifyExecScan(CustomScanState *node) Task *task = (Task *) lfirst(taskCell); executorState->es_processed += - ExecuteSingleModifyTask(scanState, task, failOnError, hasReturning); + ExecuteSingleModifyTask(scanState, task, operation, failOnError, + hasReturning); } scanState->finishedRemoteScan = true; @@ -817,10 +820,9 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access * The function returns affectedTupleCount if applicable. */ static int64 -ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError, - bool expectResults) +ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation, + bool failOnError, bool expectResults) { - CmdType operation = CMD_UNKNOWN; EState *executorState = NULL; ParamListInfo paramListInfo = NULL; List *taskPlacementList = task->taskPlacementList; @@ -839,7 +841,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError, if (scanState) { - operation = scanState->distributedPlan->operation; executorState = scanState->customScanState.ss.ps.state; paramListInfo = executorState->es_param_list_info; } @@ -864,9 +865,15 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError, LockPartitionRelations(relationId, RowExclusiveLock); } - if (task->taskType == MODIFY_TASK) + /* + * Prevent replicas of the same shard from diverging. We don't + * need to acquire lock for TRUNCATE and DDLs since they already + * acquire the necessary locks on the relations, and blocks any + * unsafe concurrent operations. + */ + if (operation == CMD_INSERT || operation == CMD_UPDATE || + operation == CMD_DELETE || operation == CMD_SELECT) { - /* prevent replicas of the same shard from diverging */ AcquireExecutorShardLock(task, operation); } @@ -1106,7 +1113,7 @@ ExecuteModifyTasksWithoutResults(List *taskList) * returns 0. */ int64 -ExecuteModifyTasksSequentiallyWithoutResults(List *taskList) +ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) { ListCell *taskCell = NULL; bool multipleTasks = list_length(taskList) > 1; @@ -1142,7 +1149,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList) Task *task = (Task *) lfirst(taskCell); affectedTupleCount += - ExecuteSingleModifyTask(NULL, task, failOnError, expectResults); + ExecuteSingleModifyTask(NULL, task, operation, failOnError, expectResults); } return affectedTupleCount; @@ -1196,6 +1203,10 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn * Ensure that there are no concurrent modifications on the same * shards. For DDL commands, we already obtained the appropriate * locks in ProcessUtility. + * + * We don't need to acquire lock for TRUNCATE_TASK since it already + * acquires AccessExclusiveLock on the relation, and blocks any + * concurrent operation. */ if (firstTask->taskType == MODIFY_TASK) { diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index debe6d44e..072485f50 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -3133,7 +3133,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } else { - ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList); + ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY); } } else @@ -3145,7 +3145,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) PG_TRY(); { - ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList); + ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY); if (shouldSyncMetadata) { diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index f438c1d09..8b4409d7f 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -84,6 +84,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) List *prunedShardIntervalList = NIL; List *taskList = NIL; int32 affectedTupleCount = 0; + CmdType operation = CMD_UNKNOWN; #if (PG_VERSION_NUM >= 100000) RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); queryTreeNode = rawStmt->stmt; @@ -147,7 +148,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) #endif modifyQuery = (Query *) linitial(queryTreeList); - if (modifyQuery->commandType != CMD_UTILITY) + operation = modifyQuery->commandType; + if (operation != CMD_UTILITY) { bool multiShardQuery = true; DeferredErrorMessage *error = @@ -176,8 +178,18 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) CHECK_FOR_INTERRUPTS(); - taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList); - affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList); + taskList = + ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList); + + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) + { + affectedTupleCount = + ExecuteModifyTasksSequentiallyWithoutResults(taskList, operation); + } + else + { + affectedTupleCount = ExecuteModifyTasksWithoutResults(taskList); + } PG_RETURN_INT32(affectedTupleCount); } diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 45e7cd553..94f5beba7 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -41,7 +41,8 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); -extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList); +extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, + CmdType operation); /* helper functions */ extern bool TaskListRequires2PC(List *taskList); diff --git a/src/test/regress/expected/sequential_modifications.out b/src/test/regress/expected/sequential_modifications.out index 55db9818d..3c8d5a287 100644 --- a/src/test/regress/expected/sequential_modifications.out +++ b/src/test/regress/expected/sequential_modifications.out @@ -272,6 +272,150 @@ SELECT no_distributed_2PCs(); t (1 row) +-- test TRUNCATE on sequential and parallel modes +CREATE TABLE test_seq_truncate (a int); +INSERT INTO test_seq_truncate SELECT i FROM generate_series(0, 100) i; +SELECT create_distributed_table('test_seq_truncate', 'a'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +-- truncate with rep > 1 should work both in parallel and seq. modes +CREATE TABLE test_seq_truncate_rep_2 (a int); +INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('test_seq_truncate_rep_2', 'a'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +CREATE TABLE multi_shard_modify_test ( + t_key integer not null, + t_name varchar(25) not null, + t_value integer not null); +SELECT create_distributed_table('multi_shard_modify_test', 't_key'); + create_distributed_table +-------------------------- + +(1 row) + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); + master_modify_multiple_shards +------------------------------- + 0 +(1 row) + +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); + master_modify_multiple_shards +------------------------------- + 0 +(1 row) + +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +-- one more realistic test with sequential inserts and truncate in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + -- now switch to sequential mode to enable a successful TRUNCATE + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + TRUNCATE multi_shard_modify_test; +COMMIT; +-- see that all the data successfully removed +SELECT count(*) FROM multi_shard_modify_test; + count +------- + 0 +(1 row) + ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT; SELECT pg_reload_conf(); @@ -282,10 +426,13 @@ SELECT pg_reload_conf(); SET search_path TO 'public'; DROP SCHEMA test_seq_ddl CASCADE; -NOTICE: drop cascades to 6 other objects +NOTICE: drop cascades to 9 other objects DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count() drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count() drop cascades to function test_seq_ddl.no_distributed_2pcs() drop cascades to table test_seq_ddl.test_table drop cascades to table test_seq_ddl.ref_test drop cascades to table test_seq_ddl.test_table_rep_2 +drop cascades to table test_seq_ddl.test_seq_truncate +drop cascades to table test_seq_ddl.test_seq_truncate_rep_2 +drop cascades to table test_seq_ddl.multi_shard_modify_test diff --git a/src/test/regress/sql/sequential_modifications.sql b/src/test/regress/sql/sequential_modifications.sql index 23088fd1d..4ca637355 100644 --- a/src/test/regress/sql/sequential_modifications.sql +++ b/src/test/regress/sql/sequential_modifications.sql @@ -155,6 +155,71 @@ CREATE INDEX CONCURRENTLY test_table_rep_2_i_6 ON test_table_rep_2(a); -- we shouldn't see any distributed transactions SELECT no_distributed_2PCs(); +-- test TRUNCATE on sequential and parallel modes +CREATE TABLE test_seq_truncate (a int); +INSERT INTO test_seq_truncate SELECT i FROM generate_series(0, 100) i; +SELECT create_distributed_table('test_seq_truncate', 'a'); + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_placement_count(); + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate; +SELECT distributed_2PCs_are_equal_to_worker_count(); + +-- truncate with rep > 1 should work both in parallel and seq. modes +CREATE TABLE test_seq_truncate_rep_2 (a int); +INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('test_seq_truncate_rep_2', 'a'); + +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_worker_count(); + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +TRUNCATE test_seq_truncate_rep_2; +SELECT distributed_2PCs_are_equal_to_placement_count(); + +CREATE TABLE multi_shard_modify_test ( + t_key integer not null, + t_name varchar(25) not null, + t_value integer not null); +SELECT create_distributed_table('multi_shard_modify_test', 't_key'); + +-- with parallel modification mode, we should see #shards records +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); +SELECT distributed_2PCs_are_equal_to_placement_count(); + +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test'); +SELECT distributed_2PCs_are_equal_to_worker_count(); + +-- one more realistic test with sequential inserts and truncate in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + + -- now switch to sequential mode to enable a successful TRUNCATE + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + TRUNCATE multi_shard_modify_test; +COMMIT; + +-- see that all the data successfully removed +SELECT count(*) FROM multi_shard_modify_test; + + ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT; SELECT pg_reload_conf(); From d918556dca7dce13d06e651a8fecbc3a8abd1760 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 4 Jun 2018 17:16:02 +0300 Subject: [PATCH 2/3] INSERT .. SELECT pushdown honors multi_shard_modification_mode --- .../distributed/executor/citus_custom_scan.c | 5 +-- .../regress/expected/multi_insert_select.out | 9 +++- .../expected/sequential_modifications.out | 45 +++++++++++++++++++ src/test/regress/sql/multi_insert_select.sql | 10 ++++- .../regress/sql/sequential_modifications.sql | 24 ++++++++++ 5 files changed, 88 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 62379fc3a..25fb34e43 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -213,11 +213,10 @@ RouterCreateScan(CustomScan *scan) Assert(isModificationQuery); if (IsMultiRowInsert(workerJob->jobQuery) || - (IsUpdateOrDelete(distributedPlan) && - MultiShardConnectionType == SEQUENTIAL_CONNECTION)) + MultiShardConnectionType == SEQUENTIAL_CONNECTION) { /* - * Multi shard update deletes while multi_shard_modify_mode equals + * Multi shard modifications while multi_shard_modify_mode equals * to 'sequential' or Multi-row INSERT are executed sequentially * instead of using parallel connections. */ diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 7e1955dcb..af7d0577c 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1717,13 +1717,20 @@ ROLLBACK; -- Altering a reference table and then performing an INSERT ... SELECT which -- joins with the reference table is not allowed, since the INSERT ... SELECT -- would read from the reference table over others connections than the ones --- that performed the DDL. +-- that performed the parallel DDL. BEGIN; ALTER TABLE reference_table ADD COLUMN z int; INSERT INTO raw_events_first (user_id) SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); ERROR: cannot establish a new connection for placement 13300025, since DDL has been executed on a connection that is in use ROLLBACK; +-- the same test with sequential DDL should work fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +ALTER TABLE reference_table ADD COLUMN z int; +INSERT INTO raw_events_first (user_id) +SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); +ROLLBACK; -- Insert after copy is allowed BEGIN; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; diff --git a/src/test/regress/expected/sequential_modifications.out b/src/test/regress/expected/sequential_modifications.out index 3c8d5a287..8483abf68 100644 --- a/src/test/regress/expected/sequential_modifications.out +++ b/src/test/regress/expected/sequential_modifications.out @@ -416,6 +416,51 @@ SELECT count(*) FROM multi_shard_modify_test; 0 (1 row) +-- test INSERT ... SELECT queries +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +-- one more realistic test with sequential inserts and INSERT .. SELECT in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + -- now switch to sequential mode to enable a successful INSERT .. SELECT + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +COMMIT; +-- see that all the data successfully inserted +SELECT count(*) FROM multi_shard_modify_test; + count +------- + 210 +(1 row) + ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT; SELECT pg_reload_conf(); diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index a05d79318..ecb6d34b5 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -1361,13 +1361,21 @@ ROLLBACK; -- Altering a reference table and then performing an INSERT ... SELECT which -- joins with the reference table is not allowed, since the INSERT ... SELECT -- would read from the reference table over others connections than the ones --- that performed the DDL. +-- that performed the parallel DDL. BEGIN; ALTER TABLE reference_table ADD COLUMN z int; INSERT INTO raw_events_first (user_id) SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); ROLLBACK; +-- the same test with sequential DDL should work fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +ALTER TABLE reference_table ADD COLUMN z int; +INSERT INTO raw_events_first (user_id) +SELECT user_id FROM raw_events_second JOIN reference_table USING (user_id); +ROLLBACK; + -- Insert after copy is allowed BEGIN; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; diff --git a/src/test/regress/sql/sequential_modifications.sql b/src/test/regress/sql/sequential_modifications.sql index 4ca637355..3f2d8af12 100644 --- a/src/test/regress/sql/sequential_modifications.sql +++ b/src/test/regress/sql/sequential_modifications.sql @@ -219,6 +219,30 @@ COMMIT; -- see that all the data successfully removed SELECT count(*) FROM multi_shard_modify_test; +-- test INSERT ... SELECT queries +-- with sequential modification mode, we should see #primary worker records +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_worker_count(); + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +SELECT distributed_2PCs_are_equal_to_placement_count(); + +-- one more realistic test with sequential inserts and INSERT .. SELECT in the same tx +INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,100) i; +BEGIN; + INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4); + + -- now switch to sequential mode to enable a successful INSERT .. SELECT + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test; +COMMIT; + +-- see that all the data successfully inserted +SELECT count(*) FROM multi_shard_modify_test; ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; SET citus.shard_replication_factor TO DEFAULT; From a5370f5bb0ea3f68fbb3982432941c0fdbd9f69c Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 5 Jun 2018 13:52:54 +0300 Subject: [PATCH 3/3] Realtime executor honours multi_shard_modify_mode We're relying on multi_shard_modify_mode GUC for real-time SELECTs. The name of the GUC is unfortunate, but, adding one more GUC (or renaming the GUC) would make the UX even worse. Given that this mode is mostly important for transaction blocks that involve modification /DDL queries along with real-time SELECTs, we can live with the confusion. --- .../executor/multi_client_executor.c | 13 +++++++++- .../distributed/executor/multi_executor.c | 5 +++- .../executor/multi_real_time_executor.c | 26 +++++++++++++------ .../expected/multi_real_time_transaction.out | 16 ++++++++++++ .../multi_real_time_transaction_0.out | 16 ++++++++++++ .../regress/expected/with_transactions.out | 17 ++++++++++++ .../sql/multi_real_time_transaction.sql | 7 +++++ src/test/regress/sql/with_transactions.sql | 10 +++++++ 8 files changed, 100 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index a28d1008c..118104299 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -20,6 +20,7 @@ #include "commands/dbcommands.h" #include "distributed/metadata_cache.h" #include "distributed/connection_management.h" +#include "distributed/multi_executor.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_server_executor.h" #include "distributed/placement_connection.h" @@ -188,7 +189,17 @@ MultiClientPlacementConnectStart(List *placementAccessList, const char *userName MultiConnection *connection = NULL; ConnStatusType connStatusType = CONNECTION_OK; int32 connectionId = AllocateConnectionId(); - int connectionFlags = CONNECTION_PER_PLACEMENT; /* no cached connections for now */ + int connectionFlags = 0; + + /* + * Although we're opening connections for SELECT queries, we're relying + * on multi_shard_modify_mode GUC. The name of the GUC is unfortunate, but, + * adding one more GUC (or renaming the GUC) would make the UX even worse. + */ + if (MultiShardConnectionType == PARALLEL_CONNECTION) + { + connectionFlags = CONNECTION_PER_PLACEMENT; + } if (connectionId == INVALID_CONNECTION_ID) { diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 7a2f256ce..a89193656 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -40,7 +40,10 @@ #include "utils/memutils.h" -/* controls the connection type for multi shard update/delete queries */ +/* + * Controls the connection type for multi shard modifications, DDLs + * TRUNCATE and real-time SELECT queries. + */ int MultiShardConnectionType = PARALLEL_CONNECTION; diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 4d4f94993..26509b018 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -901,23 +901,33 @@ LookupWorkerForTask(HTAB *workerHash, Task *task, TaskExecution *taskExecution) /* * WorkerConnectionsExhausted determines if the current query has exhausted the * maximum number of open connections that can be made to a worker. + * + * Note that the function takes sequential exection of the queries into account + * as well. In other words, in the sequential mode, the connections are considered + * to be exahusted when there is already a connection opened to the given worker. */ static bool WorkerConnectionsExhausted(WorkerNodeState *workerNodeState) { bool reachedLimit = false; - /* - * A worker cannot accept more than max_connections connections. If we have a - * small number of workers with many shards, then a single query could exhaust - * max_connections unless we throttle here. We use the value of max_connections - * on the master as a proxy for the worker configuration to avoid introducing a - * new configuration value. - */ - if (workerNodeState->openConnectionCount >= MaxConnections) + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION && + workerNodeState->openConnectionCount >= 1) { reachedLimit = true; } + else if (MultiShardConnectionType == PARALLEL_CONNECTION && + workerNodeState->openConnectionCount >= MaxConnections) + { + /* + * A worker cannot accept more than max_connections connections. If we have a + * small number of workers with many shards, then a single query could exhaust + * max_connections unless we throttle here. We use the value of max_connections + * on the master as a proxy for the worker configuration to avoid introducing a + * new configuration value. + */ + reachedLimit = true; + } return reachedLimit; } diff --git a/src/test/regress/expected/multi_real_time_transaction.out b/src/test/regress/expected/multi_real_time_transaction.out index 110fc51a3..a1f0ee882 100644 --- a/src/test/regress/expected/multi_real_time_transaction.out +++ b/src/test/regress/expected/multi_real_time_transaction.out @@ -340,6 +340,22 @@ ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock +ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; + id | pg_advisory_lock +----+------------------ + 6 | + 5 | + 4 | + 3 | + 2 | + 1 | +(6 rows) + ROLLBACK; SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; diff --git a/src/test/regress/expected/multi_real_time_transaction_0.out b/src/test/regress/expected/multi_real_time_transaction_0.out index 33147487a..27a3d7603 100644 --- a/src/test/regress/expected/multi_real_time_transaction_0.out +++ b/src/test/regress/expected/multi_real_time_transaction_0.out @@ -348,6 +348,22 @@ ROLLBACK; BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ERROR: canceling the transaction since it was involved in a distributed deadlock +ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; + id | pg_advisory_lock +----+------------------ + 6 | + 5 | + 4 | + 3 | + 2 | + 1 | +(6 rows) + ROLLBACK; SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; diff --git a/src/test/regress/expected/with_transactions.out b/src/test/regress/expected/with_transactions.out index 5aebf2a83..1e0759b99 100644 --- a/src/test/regress/expected/with_transactions.out +++ b/src/test/regress/expected/with_transactions.out @@ -105,6 +105,23 @@ SELECT count(*) FROM second_raw_table; 0 (1 row) +-- sequential insert followed by a sequential real-time query should be fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +WITH ids_inserted AS +( + INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id +) +SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3; +DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries +DEBUG: generating subplan 17_1 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING raw_table.tenant_id +DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT income FROM with_transactions.second_raw_table WHERE (tenant_id OPERATOR(pg_catalog.=) ANY (SELECT ids_inserted.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_inserted)) ORDER BY income DESC LIMIT 3 +DEBUG: push down of limit count: 3 + income +-------- +(0 rows) + +ROLLBACK; RESET client_min_messages; RESET citus.shard_count; DROP SCHEMA with_transactions CASCADE; diff --git a/src/test/regress/sql/multi_real_time_transaction.sql b/src/test/regress/sql/multi_real_time_transaction.sql index 26e40b455..4c07be7df 100644 --- a/src/test/regress/sql/multi_real_time_transaction.sql +++ b/src/test/regress/sql/multi_real_time_transaction.sql @@ -213,6 +213,13 @@ BEGIN; SELECT id, pg_advisory_lock(15) FROM test_table; ROLLBACK; +-- sequential real-time queries should be successfully executed +-- since the queries are sent over the same connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC; +ROLLBACK; + SET client_min_messages TO DEFAULT; alter system set deadlock_timeout TO DEFAULT; SELECT pg_reload_conf(); diff --git a/src/test/regress/sql/with_transactions.sql b/src/test/regress/sql/with_transactions.sql index d85edd6d0..bb23ce195 100644 --- a/src/test/regress/sql/with_transactions.sql +++ b/src/test/regress/sql/with_transactions.sql @@ -70,6 +70,16 @@ SELECT count(*) FROM raw_table; SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00'; SELECT count(*) FROM second_raw_table; +-- sequential insert followed by a sequential real-time query should be fine +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +WITH ids_inserted AS +( + INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id +) +SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3; +ROLLBACK; + RESET client_min_messages; RESET citus.shard_count; DROP SCHEMA with_transactions CASCADE;