From df44956dc3e60e2587f3862a91d12aa2607c5ed3 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 28 May 2018 14:43:16 +0300 Subject: [PATCH] Make sure that sequential DDL opens a single connection to each node After this commit DDL commands honour `citus.multi_shard_modify_mode`. We preferred using the code-path that executes single task router queries (e.g., ExecuteSingleModifyTask()) in order not to invent a new executor that is only applicable for DDL commands that require sequential execution. --- .../executor/multi_router_executor.c | 205 +++++++++--- .../distributed/executor/multi_utility.c | 11 +- .../distributed/utils/metadata_cache.c | 14 + src/include/distributed/metadata_cache.h | 1 + .../distributed/multi_router_executor.h | 4 +- .../expected/multi_modifying_xacts.out | 42 ++- .../expected/sequential_modifications.out | 291 ++++++++++++++++++ src/test/regress/multi_schedule | 7 + .../sql/multi_alter_table_add_constraints.sql | 1 + .../regress/sql/multi_modifying_xacts.sql | 27 +- .../regress/sql/sequential_modifications.sql | 163 ++++++++++ 11 files changed, 711 insertions(+), 55 deletions(-) create mode 100644 src/test/regress/expected/sequential_modifications.out create mode 100644 src/test/regress/sql/sequential_modifications.sql diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index a6ed07d3c..64b188530 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -81,8 +81,8 @@ bool EnableDeadlockPrevention = true; static void AcquireMetadataLocks(List *taskList); static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); -static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, - bool multipleTasks, bool expectResults); +static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, + bool failOnError, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static List * GetModifyConnections(Task *task, bool markCritical); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, @@ -476,6 +476,9 @@ RouterSequentialModifyExecScan(CustomScanState *node) List *taskList = workerJob->taskList; ListCell *taskCell = NULL; bool multipleTasks = list_length(taskList) > 1; + EState *executorState = scanState->customScanState.ss.ps.state; + bool taskListRequires2PC = TaskListRequires2PC(taskList); + bool failOnError = false; /* * We could naturally handle function-based transactions (i.e. those using @@ -483,9 +486,32 @@ RouterSequentialModifyExecScan(CustomScanState *node) * customers already use functions that touch multiple shards from within * a function, so we'll ignore functions for now. */ - if (IsTransactionBlock() || multipleTasks) + if (IsTransactionBlock() || multipleTasks || taskListRequires2PC) { BeginOrContinueCoordinatedTransaction(); + + /* + * Although using two phase commit protocol is an independent decision than + * failing on any error, we prefer to couple them. Our motivation is that + * the failures are rare, and we prefer to avoid marking placements invalid + * in case of failures. + * + * For reference tables, we always failOnError since we absolutely want to avoid + * marking any placements invalid. + * + * We also cannot handle faulures when there is RETURNING and there are more than + * one task to execute. + */ + if (taskListRequires2PC) + { + CoordinatedTransactionUse2PC(); + + failOnError = true; + } + else if (multipleTasks && hasReturning) + { + failOnError = true; + } } ExecuteSubPlans(distributedPlan); @@ -494,7 +520,8 @@ RouterSequentialModifyExecScan(CustomScanState *node) { Task *task = (Task *) lfirst(taskCell); - ExecuteSingleModifyTask(scanState, task, multipleTasks, hasReturning); + executorState->es_processed += + ExecuteSingleModifyTask(scanState, task, failOnError, hasReturning); } scanState->finishedRemoteScan = true; @@ -506,6 +533,55 @@ RouterSequentialModifyExecScan(CustomScanState *node) } +/* + * TaskListRequires2PC determines whether the given task list requires 2PC + * because the tasks provided operates on a reference table or there are multiple + * tasks and the commit protocol is 2PC. + * + * Note that we currently do not generate tasks lists that involves multiple different + * tables, thus we only check the first task in the list for reference tables. + */ +bool +TaskListRequires2PC(List *taskList) +{ + Task *task = NULL; + bool multipleTasks = false; + uint64 anchorShardId = INVALID_SHARD_ID; + + if (taskList == NIL) + { + return false; + } + + task = (Task *) linitial(taskList); + if (task->replicationModel == REPLICATION_MODEL_2PC) + { + return true; + } + + /* + * Some tasks don't set replicationModel thus we rely on + * the anchorShardId as well replicationModel. + * + * TODO: Do we ever need replicationModel in the Task structure? + * Can't we always rely on anchorShardId? + */ + anchorShardId = task->anchorShardId; + if (ReferenceTableShardId(anchorShardId)) + { + return true; + } + + multipleTasks = list_length(taskList) > 1; + if (multipleTasks && MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) + { + return true; + } + + return false; +} + + /* * RouterMultiModifyExecScan executes a list of tasks on remote nodes, retrieves * the results and, if RETURNING is used, stores them in custom scan's tuple store. @@ -729,20 +805,24 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access /* * ExecuteSingleModifyTask executes the task on the remote node, retrieves the - * results and stores them, if RETURNING is used, in a tuple store. + * results and stores them, if RETURNING is used, in a tuple store. The function + * can execute both DDL and DML tasks. When a DDL task is passed, the function + * does not expect scanState to be present. * * If the task fails on one of the placements, the function reraises the * remote error (constraint violation in DML), marks the affected placement as * invalid (other error on some placements, via the placement connection * framework), or errors out (failed on all placements). + * + * The function returns affectedTupleCount if applicable. */ -static void -ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTasks, +static int64 +ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError, bool expectResults) { - CmdType operation = scanState->distributedPlan->operation; - EState *executorState = scanState->customScanState.ss.ps.state; - ParamListInfo paramListInfo = executorState->es_param_list_info; + CmdType operation = CMD_UNKNOWN; + EState *executorState = NULL; + ParamListInfo paramListInfo = NULL; List *taskPlacementList = task->taskPlacementList; List *connectionList = NIL; ListCell *taskPlacementCell = NULL; @@ -753,29 +833,24 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask bool gotResults = false; char *queryString = task->queryString; - bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId); Oid relationId = shardInterval->relationId; - /* - * Modifications for reference tables are always done using 2PC. First - * ensure that distributed transaction is started. Then force the - * transaction manager to use 2PC while running the task on the - * placements. - */ - if (taskRequiresTwoPhaseCommit) + if (scanState) { - BeginOrContinueCoordinatedTransaction(); - CoordinatedTransactionUse2PC(); + operation = scanState->distributedPlan->operation; + executorState = scanState->customScanState.ss.ps.state; + paramListInfo = executorState->es_param_list_info; } /* * Get connections required to execute task. This will, if necessary, * establish the connection, mark as critical (when modifying reference - * table) and start a transaction (when in a transaction). + * table or multi-shard command) and start a transaction (when in a + * transaction). */ - connectionList = GetModifyConnections(task, taskRequiresTwoPhaseCommit); + connectionList = GetModifyConnections(task, failOnError); /* * If we are dealing with a partitioned table, we also need to lock its @@ -789,8 +864,11 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask LockPartitionRelations(relationId, RowExclusiveLock); } - /* prevent replicas of the same shard from diverging */ - AcquireExecutorShardLock(task, operation); + if (task->taskType == MODIFY_TASK) + { + /* prevent replicas of the same shard from diverging */ + AcquireExecutorShardLock(task, operation); + } /* try to execute modification on all placements */ forboth(taskPlacementCell, taskPlacementList, connectionCell, connectionList) @@ -798,7 +876,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); bool queryOK = false; - bool failOnError = false; int64 currentAffectedTupleCount = 0; if (connection->remoteTransaction.transactionFailed) @@ -821,18 +898,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask continue; } - /* if we're running a 2PC, the query should fail on error */ - failOnError = taskRequiresTwoPhaseCommit; - - if (multipleTasks && expectResults) - { - /* - * If we have multiple tasks and one fails, we cannot clear - * the tuple store and start over. Error out instead. - */ - failOnError = true; - } - if (failureCount + 1 == list_length(taskPlacementList)) { /* @@ -898,12 +963,12 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask /* if some placements failed, ensure future statements don't access them */ MarkFailedShardPlacements(); - executorState->es_processed += affectedTupleCount; - if (IsTransactionBlock()) { XactModificationLevel = XACT_MODIFICATION_DATA; } + + return affectedTupleCount; } @@ -926,10 +991,22 @@ GetModifyConnections(Task *task, bool markCritical) foreach(taskPlacementCell, taskPlacementList) { ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell); - int connectionFlags = SESSION_LIFESPAN | FOR_DML; + int connectionFlags = SESSION_LIFESPAN; MultiConnection *multiConnection = NULL; List *placementAccessList = NIL; ShardPlacementAccess *placementModification = NULL; + ShardPlacementAccessType accessType = PLACEMENT_ACCESS_DML; + + if (task->taskType == DDL_TASK) + { + connectionFlags = connectionFlags | FOR_DDL; + accessType = PLACEMENT_ACCESS_DDL; + } + else + { + connectionFlags = connectionFlags | FOR_DML; + accessType = PLACEMENT_ACCESS_DML; + } /* create placement accesses for placements that appear in a subselect */ placementAccessList = BuildPlacementSelectList(taskPlacement->groupId, @@ -938,8 +1015,7 @@ GetModifyConnections(Task *task, bool markCritical) Assert(list_length(placementAccessList) == list_length(relationShardList)); /* create placement access for the placement that we're modifying */ - placementModification = CreatePlacementAccess(taskPlacement, - PLACEMENT_ACCESS_DML); + placementModification = CreatePlacementAccess(taskPlacement, accessType); placementAccessList = lappend(placementAccessList, placementModification); /* get an appropriate connection for the DML statement */ @@ -1020,23 +1096,56 @@ ExecuteModifyTasksWithoutResults(List *taskList) /* - * ExecuteTasksSequentiallyWithoutResults basically calls ExecuteModifyTasks in + * ExecuteModifyTasksSequentiallyWithoutResults basically calls ExecuteSingleModifyTask in * a loop in order to simulate sequential execution of a list of tasks. Useful * in cases where issuing commands in parallel before waiting for results could - * result in deadlocks (such as CREATE INDEX CONCURRENTLY). + * result in deadlocks (such as CREATE INDEX CONCURRENTLY or foreign key creation to + * reference tables). + * + * The function returns the affectedTupleCount if applicable. Otherwise, the function + * returns 0. */ -void -ExecuteTasksSequentiallyWithoutResults(List *taskList) +int64 +ExecuteModifyTasksSequentiallyWithoutResults(List *taskList) { ListCell *taskCell = NULL; + bool multipleTasks = list_length(taskList) > 1; + bool expectResults = false; + int64 affectedTupleCount = 0; + bool failOnError = true; + bool taskListRequires2PC = TaskListRequires2PC(taskList); + /* decide on whether to use coordinated transaction and 2PC */ + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE) + { + /* we don't run CREATE INDEX CONCURRENTLY in a distributed transaction */ + } + else if (IsTransactionBlock() || multipleTasks) + { + BeginOrContinueCoordinatedTransaction(); + + if (taskListRequires2PC) + { + CoordinatedTransactionUse2PC(); + } + } + else if (!multipleTasks && taskListRequires2PC) + { + /* DDL on a reference table should also use 2PC */ + BeginOrContinueCoordinatedTransaction(); + CoordinatedTransactionUse2PC(); + } + + /* now that we've decided on the transaction status, execute the tasks */ foreach(taskCell, taskList) { Task *task = (Task *) lfirst(taskCell); - List *singleTask = list_make1(task); - ExecuteModifyTasksWithoutResults(singleTask); + affectedTupleCount += + ExecuteSingleModifyTask(NULL, task, failOnError, expectResults); } + + return affectedTupleCount; } diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index ccbd7f2a3..debe6d44e 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -3127,7 +3127,14 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); } - ExecuteModifyTasksWithoutResults(ddlJob->taskList); + if (MultiShardConnectionType == PARALLEL_CONNECTION) + { + ExecuteModifyTasksWithoutResults(ddlJob->taskList); + } + else + { + ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList); + } } else { @@ -3138,7 +3145,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) PG_TRY(); { - ExecuteTasksSequentiallyWithoutResults(ddlJob->taskList); + ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList); if (shouldSyncMetadata) { diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index b3ee1ca97..5be9a64f1 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -358,6 +358,20 @@ LoadShardInterval(uint64 shardId) } +/* + * ReferenceTableShardId returns true if the given shardId belongs to + * a reference table. + */ +bool +ReferenceTableShardId(uint64 shardId) +{ + ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId); + DistTableCacheEntry *tableEntry = shardEntry->tableEntry; + + return (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE); +} + + /* * LoadGroupShardPlacement returns the cached shard placement metadata * diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index e8bf0dd83..c9e84a0a9 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -78,6 +78,7 @@ typedef struct extern bool IsDistributedTable(Oid relationId); extern List * DistributedTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); +extern bool ReferenceTableShardId(uint64 shardId); extern ShardPlacement * FindShardPlacementOnGroup(uint32 groupId, uint64 shardId); extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId); extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index fd1fcb658..45e7cd553 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -41,8 +41,10 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); -extern void ExecuteTasksSequentiallyWithoutResults(List *taskList); +extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList); +/* helper functions */ +extern bool TaskListRequires2PC(List *taskList); extern List * BuildPlacementSelectList(uint32 groupId, List *relationShardList); #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 0e8b421b7..88c090619 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -192,6 +192,12 @@ BEGIN; ALTER TABLE labs ADD COLUMN motto text; INSERT INTO labs VALUES (6, 'Bell Labs'); ABORT; +-- this should work find with sequential DDL as well +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +ALTER TABLE labs ADD COLUMN motto text; +INSERT INTO labs VALUES (6, 'Bell Labs'); +ABORT; -- but the DDL should correctly roll back SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass; Column | Type | Modifiers @@ -418,7 +424,7 @@ BEGIN; \copy labs from stdin delimiter ',' ALTER TABLE labs ADD COLUMN motto text; ABORT; --- cannot perform DDL once a connection is used for multiple shards +-- cannot perform parallel DDL once a connection is used for multiple shards BEGIN; SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0; lab_id @@ -433,6 +439,21 @@ SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0; ALTER TABLE researchers ADD COLUMN motto text; ERROR: cannot perform a parallel DDL command because multiple placements have been accessed over the same connection ROLLBACK; +-- can perform sequential DDL once a connection is used for multiple shards +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0; + lab_id +-------- +(0 rows) + +SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0; + lab_id +-------- +(0 rows) + +ALTER TABLE researchers ADD COLUMN motto text; +ROLLBACK; -- multi-shard operations can co-exist with DDL in a transactional way BEGIN; ALTER TABLE labs ADD COLUMN motto text; @@ -1422,7 +1443,7 @@ INSERT INTO users VALUES (3, 'burak'); \COPY items FROM STDIN WITH CSV ERROR: cannot establish a new connection for placement 1200042, since DML has been executed on a connection that is in use END; --- cannot perform DDL after a co-located table has been read over 1 connection +-- cannot perform parallel DDL after a co-located table has been read over 1 connection BEGIN; SELECT id FROM users WHERE id = 1; id @@ -1439,6 +1460,23 @@ SELECT id FROM users WHERE id = 6; ALTER TABLE items ADD COLUMN last_update timestamptz; ERROR: cannot perform a parallel DDL command because multiple placements have been accessed over the same connection END; +-- can perform sequential DDL after a co-located table has been read over 1 connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id FROM users WHERE id = 1; + id +---- + 1 +(1 row) + +SELECT id FROM users WHERE id = 6; + id +---- + 6 +(1 row) + +ALTER TABLE items ADD COLUMN last_update timestamptz; +ROLLBACK; -- but the other way around is fine BEGIN; ALTER TABLE items ADD COLUMN last_update timestamptz; diff --git a/src/test/regress/expected/sequential_modifications.out b/src/test/regress/expected/sequential_modifications.out new file mode 100644 index 000000000..55db9818d --- /dev/null +++ b/src/test/regress/expected/sequential_modifications.out @@ -0,0 +1,291 @@ +-- +-- Tests sequential and parallel DDL command execution +-- in combination with 1PC and 2PC +-- Note: this test should not be executed in parallel with +-- other tests since we're relying on disabling 2PC recovery +-- +CREATE SCHEMA test_seq_ddl; +SET search_path TO 'test_seq_ddl'; +-- this function simply checks the equality of the number of transactions in the +-- pg_dist_transaction and number of primary worker nodes +-- The function is useful to ensure that a single connection is opened per worker +-- in a distributed transaction +CREATE OR REPLACE FUNCTION distributed_2PCs_are_equal_to_worker_count() + RETURNS bool AS +$$ +DECLARE + result bool; +BEGIN + SELECT tx_count = worker_count FROM (SELECT count(*) as tx_count FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%') as s1, (SELECT count(*) as worker_count FROM pg_dist_node WHERE noderole = 'primary') as s2 INTO result; + RETURN result; +END; +$$ +LANGUAGE 'plpgsql' IMMUTABLE; +-- this function simply checks the equality of the number of transactions in the +-- pg_dist_transaction and number of shard placements for a distributed table +-- The function is useful to ensure that a single connection is opened per +-- shard placement in a distributed transaction +CREATE OR REPLACE FUNCTION distributed_2PCs_are_equal_to_placement_count() + RETURNS bool AS +$$ +DECLARE + result bool; +BEGIN + SELECT count(*) = current_setting('citus.shard_count')::bigint * current_setting('citus.shard_replication_factor')::bigint FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%' + INTO result; + RETURN result; +END; +$$ +LANGUAGE 'plpgsql' IMMUTABLE; +-- this function simply checks existence of distributed transcations in +-- pg_dist_transaction +CREATE OR REPLACE FUNCTION no_distributed_2PCs() + RETURNS bool AS +$$ +DECLARE + result bool; +BEGIN + SELECT tx_count = 0 FROM (SELECT count(*) as tx_count FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%') as s1 + INTO result; + RETURN result; +END; +$$ +LANGUAGE 'plpgsql' IMMUTABLE; +-- disbable 2PC recovery since our tests will check that +ALTER SYSTEM SET citus.recover_2pc_interval TO -1; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +CREATE TABLE test_table(a int, b int); +SELECT create_distributed_table('test_table', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +-- we should see #worker transactions +-- when sequential mode is used +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +ALTER TABLE test_table ADD CONSTRAINT a_check CHECK(a > 0); +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +-- we should see placement count # transactions +-- when parallel mode is used +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +ALTER TABLE test_table ADD CONSTRAINT b_check CHECK(b > 0); +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +-- with 1PC, we should not see and distributed TXs in the pg_dist_transaction +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +ALTER TABLE test_table ADD CONSTRAINT c_check CHECK(a > 0); +SELECT no_distributed_2PCs(); + no_distributed_2pcs +--------------------- + t +(1 row) + +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +ALTER TABLE test_table ADD CONSTRAINT d_check CHECK(a > 0); +SELECT no_distributed_2PCs(); + no_distributed_2pcs +--------------------- + t +(1 row) + +CREATE TABLE ref_test(a int); +SELECT create_reference_table('ref_test'); + create_reference_table +------------------------ + +(1 row) + +SET citus.multi_shard_commit_protocol TO '1pc'; +-- reference tables should always use 2PC +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE INDEX ref_test_seq_index ON ref_test(a); +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +-- reference tables should always use 2PC +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE INDEX ref_test_seq_index_2 ON ref_test(a); +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +-- tables with replication factor > 1 should also obey +-- both multi_shard_commit_protocol and multi_shard_modify_mode +SET citus.shard_replication_factor TO 2; +CREATE TABLE test_table_rep_2 (a int); +SELECT create_distributed_table('test_table_rep_2', 'a'); + create_distributed_table +-------------------------- + +(1 row) + + +-- 1PC should never use 2PC with rep > 1 +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE INDEX test_table_rep_2_i_1 ON test_table_rep_2(a); +SELECT no_distributed_2PCs(); + no_distributed_2pcs +--------------------- + t +(1 row) + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE INDEX test_table_rep_2_i_2 ON test_table_rep_2(a); +SELECT no_distributed_2PCs(); + no_distributed_2pcs +--------------------- + t +(1 row) + +-- 2PC should always use 2PC with rep > 1 +SET citus.multi_shard_commit_protocol TO '2pc'; +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE INDEX test_table_rep_2_i_3 ON test_table_rep_2(a); +SELECT distributed_2PCs_are_equal_to_worker_count(); + distributed_2pcs_are_equal_to_worker_count +-------------------------------------------- + t +(1 row) + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE INDEX test_table_rep_2_i_4 ON test_table_rep_2(a); +SELECT distributed_2PCs_are_equal_to_placement_count(); + distributed_2pcs_are_equal_to_placement_count +----------------------------------------------- + t +(1 row) + +-- CREATE INDEX CONCURRENTLY should work fine with rep > 1 +-- with both 2PC and different parallel modes +SET citus.multi_shard_commit_protocol TO '2pc'; +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE INDEX CONCURRENTLY test_table_rep_2_i_5 ON test_table_rep_2(a); +-- we shouldn't see any distributed transactions +SELECT no_distributed_2PCs(); + no_distributed_2pcs +--------------------- + t +(1 row) + +SET citus.multi_shard_commit_protocol TO '2pc'; +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +CREATE INDEX CONCURRENTLY test_table_rep_2_i_6 ON test_table_rep_2(a); +-- we shouldn't see any distributed transactions +SELECT no_distributed_2PCs(); + no_distributed_2pcs +--------------------- + t +(1 row) + +ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; +SET citus.shard_replication_factor TO DEFAULT; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SET search_path TO 'public'; +DROP SCHEMA test_seq_ddl CASCADE; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count() +drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count() +drop cascades to function test_seq_ddl.no_distributed_2pcs() +drop cascades to table test_seq_ddl.test_table +drop cascades to table test_seq_ddl.ref_test +drop cascades to table test_seq_ddl.test_table_rep_2 diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 224dd6863..a2b3a4428 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -133,6 +133,13 @@ test: multi_create_schema # ---------- test: multi_utility_warnings +# ---------- +# Tests to check the sequential and parallel executions of DDL and modification +# commands +# Should not be executed in parallel with other tests +# ---------- +test: sequential_modifications + # --------- # multi_append_table_to_shard loads data to create shards in a way that forces # shard caching. diff --git a/src/test/regress/sql/multi_alter_table_add_constraints.sql b/src/test/regress/sql/multi_alter_table_add_constraints.sql index 87483cfdb..f44f99699 100644 --- a/src/test/regress/sql/multi_alter_table_add_constraints.sql +++ b/src/test/regress/sql/multi_alter_table_add_constraints.sql @@ -505,6 +505,7 @@ SELECT (run_command_on_workers($$ $$)).* ORDER BY 1,2,3,4; + SET search_path TO 'public'; DROP SCHEMA sc1 CASCADE; diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index f138d616c..2302872e3 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -158,6 +158,13 @@ ALTER TABLE labs ADD COLUMN motto text; INSERT INTO labs VALUES (6, 'Bell Labs'); ABORT; +-- this should work find with sequential DDL as well +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +ALTER TABLE labs ADD COLUMN motto text; +INSERT INTO labs VALUES (6, 'Bell Labs'); +ABORT; + -- but the DDL should correctly roll back SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass; SELECT * FROM labs WHERE id = 6; @@ -331,13 +338,21 @@ BEGIN; ALTER TABLE labs ADD COLUMN motto text; ABORT; --- cannot perform DDL once a connection is used for multiple shards +-- cannot perform parallel DDL once a connection is used for multiple shards BEGIN; SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0; SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0; ALTER TABLE researchers ADD COLUMN motto text; ROLLBACK; +-- can perform sequential DDL once a connection is used for multiple shards +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0; +SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0; +ALTER TABLE researchers ADD COLUMN motto text; +ROLLBACK; + -- multi-shard operations can co-exist with DDL in a transactional way BEGIN; ALTER TABLE labs ADD COLUMN motto text; @@ -1057,13 +1072,21 @@ INSERT INTO users VALUES (3, 'burak'); \. END; --- cannot perform DDL after a co-located table has been read over 1 connection +-- cannot perform parallel DDL after a co-located table has been read over 1 connection BEGIN; SELECT id FROM users WHERE id = 1; SELECT id FROM users WHERE id = 6; ALTER TABLE items ADD COLUMN last_update timestamptz; END; +-- can perform sequential DDL after a co-located table has been read over 1 connection +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; +SELECT id FROM users WHERE id = 1; +SELECT id FROM users WHERE id = 6; +ALTER TABLE items ADD COLUMN last_update timestamptz; +ROLLBACK; + -- but the other way around is fine BEGIN; ALTER TABLE items ADD COLUMN last_update timestamptz; diff --git a/src/test/regress/sql/sequential_modifications.sql b/src/test/regress/sql/sequential_modifications.sql new file mode 100644 index 000000000..23088fd1d --- /dev/null +++ b/src/test/regress/sql/sequential_modifications.sql @@ -0,0 +1,163 @@ +-- +-- Tests sequential and parallel DDL command execution +-- in combination with 1PC and 2PC +-- Note: this test should not be executed in parallel with +-- other tests since we're relying on disabling 2PC recovery +-- +CREATE SCHEMA test_seq_ddl; +SET search_path TO 'test_seq_ddl'; + +-- this function simply checks the equality of the number of transactions in the +-- pg_dist_transaction and number of primary worker nodes +-- The function is useful to ensure that a single connection is opened per worker +-- in a distributed transaction +CREATE OR REPLACE FUNCTION distributed_2PCs_are_equal_to_worker_count() + RETURNS bool AS +$$ +DECLARE + result bool; +BEGIN + SELECT tx_count = worker_count FROM (SELECT count(*) as tx_count FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%') as s1, (SELECT count(*) as worker_count FROM pg_dist_node WHERE noderole = 'primary') as s2 INTO result; + RETURN result; +END; +$$ +LANGUAGE 'plpgsql' IMMUTABLE; + + +-- this function simply checks the equality of the number of transactions in the +-- pg_dist_transaction and number of shard placements for a distributed table +-- The function is useful to ensure that a single connection is opened per +-- shard placement in a distributed transaction +CREATE OR REPLACE FUNCTION distributed_2PCs_are_equal_to_placement_count() + RETURNS bool AS +$$ +DECLARE + result bool; +BEGIN + SELECT count(*) = current_setting('citus.shard_count')::bigint * current_setting('citus.shard_replication_factor')::bigint FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%' + INTO result; + RETURN result; +END; +$$ +LANGUAGE 'plpgsql' IMMUTABLE; + + +-- this function simply checks existence of distributed transcations in +-- pg_dist_transaction +CREATE OR REPLACE FUNCTION no_distributed_2PCs() + RETURNS bool AS +$$ +DECLARE + result bool; +BEGIN + SELECT tx_count = 0 FROM (SELECT count(*) as tx_count FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%') as s1 + INTO result; + RETURN result; +END; +$$ +LANGUAGE 'plpgsql' IMMUTABLE; + +-- disbable 2PC recovery since our tests will check that +ALTER SYSTEM SET citus.recover_2pc_interval TO -1; +SELECT pg_reload_conf(); + +CREATE TABLE test_table(a int, b int); +SELECT create_distributed_table('test_table', 'a'); + +-- we should see #worker transactions +-- when sequential mode is used +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +ALTER TABLE test_table ADD CONSTRAINT a_check CHECK(a > 0); +SELECT distributed_2PCs_are_equal_to_worker_count(); + +-- we should see placement count # transactions +-- when parallel mode is used +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +ALTER TABLE test_table ADD CONSTRAINT b_check CHECK(b > 0); +SELECT distributed_2PCs_are_equal_to_placement_count(); + +-- with 1PC, we should not see and distributed TXs in the pg_dist_transaction +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +ALTER TABLE test_table ADD CONSTRAINT c_check CHECK(a > 0); +SELECT no_distributed_2PCs(); + +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +ALTER TABLE test_table ADD CONSTRAINT d_check CHECK(a > 0); +SELECT no_distributed_2PCs(); + +CREATE TABLE ref_test(a int); +SELECT create_reference_table('ref_test'); +SET citus.multi_shard_commit_protocol TO '1pc'; + +-- reference tables should always use 2PC +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +CREATE INDEX ref_test_seq_index ON ref_test(a); +SELECT distributed_2PCs_are_equal_to_worker_count(); + +-- reference tables should always use 2PC +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +CREATE INDEX ref_test_seq_index_2 ON ref_test(a); +SELECT distributed_2PCs_are_equal_to_worker_count(); + +-- tables with replication factor > 1 should also obey +-- both multi_shard_commit_protocol and multi_shard_modify_mode +SET citus.shard_replication_factor TO 2; +CREATE TABLE test_table_rep_2 (a int); +SELECT create_distributed_table('test_table_rep_2', 'a'); + +-- 1PC should never use 2PC with rep > 1 +SET citus.multi_shard_commit_protocol TO '1pc'; + +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +CREATE INDEX test_table_rep_2_i_1 ON test_table_rep_2(a); +SELECT no_distributed_2PCs(); + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +CREATE INDEX test_table_rep_2_i_2 ON test_table_rep_2(a); +SELECT no_distributed_2PCs(); + +-- 2PC should always use 2PC with rep > 1 +SET citus.multi_shard_commit_protocol TO '2pc'; +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +CREATE INDEX test_table_rep_2_i_3 ON test_table_rep_2(a); +SELECT distributed_2PCs_are_equal_to_worker_count(); + +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +CREATE INDEX test_table_rep_2_i_4 ON test_table_rep_2(a); +SELECT distributed_2PCs_are_equal_to_placement_count(); + +-- CREATE INDEX CONCURRENTLY should work fine with rep > 1 +-- with both 2PC and different parallel modes +SET citus.multi_shard_commit_protocol TO '2pc'; +SET citus.multi_shard_modify_mode TO 'sequential'; +SELECT recover_prepared_transactions(); +CREATE INDEX CONCURRENTLY test_table_rep_2_i_5 ON test_table_rep_2(a); + +-- we shouldn't see any distributed transactions +SELECT no_distributed_2PCs(); + +SET citus.multi_shard_commit_protocol TO '2pc'; +SET citus.multi_shard_modify_mode TO 'parallel'; +SELECT recover_prepared_transactions(); +CREATE INDEX CONCURRENTLY test_table_rep_2_i_6 ON test_table_rep_2(a); +-- we shouldn't see any distributed transactions +SELECT no_distributed_2PCs(); + +ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT; +SET citus.shard_replication_factor TO DEFAULT; +SELECT pg_reload_conf(); + +SET search_path TO 'public'; +DROP SCHEMA test_seq_ddl CASCADE;