Make sure that sequential DDL opens a single connection to each node

After this commit DDL commands honour `citus.multi_shard_modify_mode`.

We preferred using the code-path that executes single task router
queries (e.g., ExecuteSingleModifyTask()) in order not to invent
a new executor that is only applicable for DDL commands that require
sequential execution.
pull/2189/head
Onder Kalaci 2018-05-28 14:43:16 +03:00
parent 98b99634f3
commit df44956dc3
11 changed files with 711 additions and 55 deletions

View File

@ -81,8 +81,8 @@ bool EnableDeadlockPrevention = true;
static void AcquireMetadataLocks(List *taskList);
static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType);
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
bool multipleTasks, bool expectResults);
static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
bool failOnError, bool expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * GetModifyConnections(Task *task, bool markCritical);
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
@ -476,6 +476,9 @@ RouterSequentialModifyExecScan(CustomScanState *node)
List *taskList = workerJob->taskList;
ListCell *taskCell = NULL;
bool multipleTasks = list_length(taskList) > 1;
EState *executorState = scanState->customScanState.ss.ps.state;
bool taskListRequires2PC = TaskListRequires2PC(taskList);
bool failOnError = false;
/*
* We could naturally handle function-based transactions (i.e. those using
@ -483,9 +486,32 @@ RouterSequentialModifyExecScan(CustomScanState *node)
* customers already use functions that touch multiple shards from within
* a function, so we'll ignore functions for now.
*/
if (IsTransactionBlock() || multipleTasks)
if (IsTransactionBlock() || multipleTasks || taskListRequires2PC)
{
BeginOrContinueCoordinatedTransaction();
/*
* Although using two phase commit protocol is an independent decision than
* failing on any error, we prefer to couple them. Our motivation is that
* the failures are rare, and we prefer to avoid marking placements invalid
* in case of failures.
*
* For reference tables, we always failOnError since we absolutely want to avoid
* marking any placements invalid.
*
* We also cannot handle faulures when there is RETURNING and there are more than
* one task to execute.
*/
if (taskListRequires2PC)
{
CoordinatedTransactionUse2PC();
failOnError = true;
}
else if (multipleTasks && hasReturning)
{
failOnError = true;
}
}
ExecuteSubPlans(distributedPlan);
@ -494,7 +520,8 @@ RouterSequentialModifyExecScan(CustomScanState *node)
{
Task *task = (Task *) lfirst(taskCell);
ExecuteSingleModifyTask(scanState, task, multipleTasks, hasReturning);
executorState->es_processed +=
ExecuteSingleModifyTask(scanState, task, failOnError, hasReturning);
}
scanState->finishedRemoteScan = true;
@ -506,6 +533,55 @@ RouterSequentialModifyExecScan(CustomScanState *node)
}
/*
* TaskListRequires2PC determines whether the given task list requires 2PC
* because the tasks provided operates on a reference table or there are multiple
* tasks and the commit protocol is 2PC.
*
* Note that we currently do not generate tasks lists that involves multiple different
* tables, thus we only check the first task in the list for reference tables.
*/
bool
TaskListRequires2PC(List *taskList)
{
Task *task = NULL;
bool multipleTasks = false;
uint64 anchorShardId = INVALID_SHARD_ID;
if (taskList == NIL)
{
return false;
}
task = (Task *) linitial(taskList);
if (task->replicationModel == REPLICATION_MODEL_2PC)
{
return true;
}
/*
* Some tasks don't set replicationModel thus we rely on
* the anchorShardId as well replicationModel.
*
* TODO: Do we ever need replicationModel in the Task structure?
* Can't we always rely on anchorShardId?
*/
anchorShardId = task->anchorShardId;
if (ReferenceTableShardId(anchorShardId))
{
return true;
}
multipleTasks = list_length(taskList) > 1;
if (multipleTasks && MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
return true;
}
return false;
}
/*
* RouterMultiModifyExecScan executes a list of tasks on remote nodes, retrieves
* the results and, if RETURNING is used, stores them in custom scan's tuple store.
@ -729,20 +805,24 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access
/*
* ExecuteSingleModifyTask executes the task on the remote node, retrieves the
* results and stores them, if RETURNING is used, in a tuple store.
* results and stores them, if RETURNING is used, in a tuple store. The function
* can execute both DDL and DML tasks. When a DDL task is passed, the function
* does not expect scanState to be present.
*
* If the task fails on one of the placements, the function reraises the
* remote error (constraint violation in DML), marks the affected placement as
* invalid (other error on some placements, via the placement connection
* framework), or errors out (failed on all placements).
*
* The function returns affectedTupleCount if applicable.
*/
static void
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTasks,
static int64
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool failOnError,
bool expectResults)
{
CmdType operation = scanState->distributedPlan->operation;
EState *executorState = scanState->customScanState.ss.ps.state;
ParamListInfo paramListInfo = executorState->es_param_list_info;
CmdType operation = CMD_UNKNOWN;
EState *executorState = NULL;
ParamListInfo paramListInfo = NULL;
List *taskPlacementList = task->taskPlacementList;
List *connectionList = NIL;
ListCell *taskPlacementCell = NULL;
@ -753,29 +833,24 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask
bool gotResults = false;
char *queryString = task->queryString;
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId);
Oid relationId = shardInterval->relationId;
/*
* Modifications for reference tables are always done using 2PC. First
* ensure that distributed transaction is started. Then force the
* transaction manager to use 2PC while running the task on the
* placements.
*/
if (taskRequiresTwoPhaseCommit)
if (scanState)
{
BeginOrContinueCoordinatedTransaction();
CoordinatedTransactionUse2PC();
operation = scanState->distributedPlan->operation;
executorState = scanState->customScanState.ss.ps.state;
paramListInfo = executorState->es_param_list_info;
}
/*
* Get connections required to execute task. This will, if necessary,
* establish the connection, mark as critical (when modifying reference
* table) and start a transaction (when in a transaction).
* table or multi-shard command) and start a transaction (when in a
* transaction).
*/
connectionList = GetModifyConnections(task, taskRequiresTwoPhaseCommit);
connectionList = GetModifyConnections(task, failOnError);
/*
* If we are dealing with a partitioned table, we also need to lock its
@ -789,8 +864,11 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask
LockPartitionRelations(relationId, RowExclusiveLock);
}
/* prevent replicas of the same shard from diverging */
AcquireExecutorShardLock(task, operation);
if (task->taskType == MODIFY_TASK)
{
/* prevent replicas of the same shard from diverging */
AcquireExecutorShardLock(task, operation);
}
/* try to execute modification on all placements */
forboth(taskPlacementCell, taskPlacementList, connectionCell, connectionList)
@ -798,7 +876,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
bool queryOK = false;
bool failOnError = false;
int64 currentAffectedTupleCount = 0;
if (connection->remoteTransaction.transactionFailed)
@ -821,18 +898,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask
continue;
}
/* if we're running a 2PC, the query should fail on error */
failOnError = taskRequiresTwoPhaseCommit;
if (multipleTasks && expectResults)
{
/*
* If we have multiple tasks and one fails, we cannot clear
* the tuple store and start over. Error out instead.
*/
failOnError = true;
}
if (failureCount + 1 == list_length(taskPlacementList))
{
/*
@ -898,12 +963,12 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool multipleTask
/* if some placements failed, ensure future statements don't access them */
MarkFailedShardPlacements();
executorState->es_processed += affectedTupleCount;
if (IsTransactionBlock())
{
XactModificationLevel = XACT_MODIFICATION_DATA;
}
return affectedTupleCount;
}
@ -926,10 +991,22 @@ GetModifyConnections(Task *task, bool markCritical)
foreach(taskPlacementCell, taskPlacementList)
{
ShardPlacement *taskPlacement = (ShardPlacement *) lfirst(taskPlacementCell);
int connectionFlags = SESSION_LIFESPAN | FOR_DML;
int connectionFlags = SESSION_LIFESPAN;
MultiConnection *multiConnection = NULL;
List *placementAccessList = NIL;
ShardPlacementAccess *placementModification = NULL;
ShardPlacementAccessType accessType = PLACEMENT_ACCESS_DML;
if (task->taskType == DDL_TASK)
{
connectionFlags = connectionFlags | FOR_DDL;
accessType = PLACEMENT_ACCESS_DDL;
}
else
{
connectionFlags = connectionFlags | FOR_DML;
accessType = PLACEMENT_ACCESS_DML;
}
/* create placement accesses for placements that appear in a subselect */
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
@ -938,8 +1015,7 @@ GetModifyConnections(Task *task, bool markCritical)
Assert(list_length(placementAccessList) == list_length(relationShardList));
/* create placement access for the placement that we're modifying */
placementModification = CreatePlacementAccess(taskPlacement,
PLACEMENT_ACCESS_DML);
placementModification = CreatePlacementAccess(taskPlacement, accessType);
placementAccessList = lappend(placementAccessList, placementModification);
/* get an appropriate connection for the DML statement */
@ -1020,23 +1096,56 @@ ExecuteModifyTasksWithoutResults(List *taskList)
/*
* ExecuteTasksSequentiallyWithoutResults basically calls ExecuteModifyTasks in
* ExecuteModifyTasksSequentiallyWithoutResults basically calls ExecuteSingleModifyTask in
* a loop in order to simulate sequential execution of a list of tasks. Useful
* in cases where issuing commands in parallel before waiting for results could
* result in deadlocks (such as CREATE INDEX CONCURRENTLY).
* result in deadlocks (such as CREATE INDEX CONCURRENTLY or foreign key creation to
* reference tables).
*
* The function returns the affectedTupleCount if applicable. Otherwise, the function
* returns 0.
*/
void
ExecuteTasksSequentiallyWithoutResults(List *taskList)
int64
ExecuteModifyTasksSequentiallyWithoutResults(List *taskList)
{
ListCell *taskCell = NULL;
bool multipleTasks = list_length(taskList) > 1;
bool expectResults = false;
int64 affectedTupleCount = 0;
bool failOnError = true;
bool taskListRequires2PC = TaskListRequires2PC(taskList);
/* decide on whether to use coordinated transaction and 2PC */
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
{
/* we don't run CREATE INDEX CONCURRENTLY in a distributed transaction */
}
else if (IsTransactionBlock() || multipleTasks)
{
BeginOrContinueCoordinatedTransaction();
if (taskListRequires2PC)
{
CoordinatedTransactionUse2PC();
}
}
else if (!multipleTasks && taskListRequires2PC)
{
/* DDL on a reference table should also use 2PC */
BeginOrContinueCoordinatedTransaction();
CoordinatedTransactionUse2PC();
}
/* now that we've decided on the transaction status, execute the tasks */
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
List *singleTask = list_make1(task);
ExecuteModifyTasksWithoutResults(singleTask);
affectedTupleCount +=
ExecuteSingleModifyTask(NULL, task, failOnError, expectResults);
}
return affectedTupleCount;
}

View File

@ -3127,7 +3127,14 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
}
ExecuteModifyTasksWithoutResults(ddlJob->taskList);
if (MultiShardConnectionType == PARALLEL_CONNECTION)
{
ExecuteModifyTasksWithoutResults(ddlJob->taskList);
}
else
{
ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList);
}
}
else
{
@ -3138,7 +3145,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
PG_TRY();
{
ExecuteTasksSequentiallyWithoutResults(ddlJob->taskList);
ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList);
if (shouldSyncMetadata)
{

View File

@ -358,6 +358,20 @@ LoadShardInterval(uint64 shardId)
}
/*
* ReferenceTableShardId returns true if the given shardId belongs to
* a reference table.
*/
bool
ReferenceTableShardId(uint64 shardId)
{
ShardCacheEntry *shardEntry = LookupShardCacheEntry(shardId);
DistTableCacheEntry *tableEntry = shardEntry->tableEntry;
return (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE);
}
/*
* LoadGroupShardPlacement returns the cached shard placement metadata
*

View File

@ -78,6 +78,7 @@ typedef struct
extern bool IsDistributedTable(Oid relationId);
extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);
extern bool ReferenceTableShardId(uint64 shardId);
extern ShardPlacement * FindShardPlacementOnGroup(uint32 groupId, uint64 shardId);
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);

View File

@ -41,8 +41,10 @@ extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node);
extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern void ExecuteTasksSequentiallyWithoutResults(List *taskList);
extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList);
/* helper functions */
extern bool TaskListRequires2PC(List *taskList);
extern List * BuildPlacementSelectList(uint32 groupId, List *relationShardList);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -192,6 +192,12 @@ BEGIN;
ALTER TABLE labs ADD COLUMN motto text;
INSERT INTO labs VALUES (6, 'Bell Labs');
ABORT;
-- this should work find with sequential DDL as well
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
ALTER TABLE labs ADD COLUMN motto text;
INSERT INTO labs VALUES (6, 'Bell Labs');
ABORT;
-- but the DDL should correctly roll back
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
Column | Type | Modifiers
@ -418,7 +424,7 @@ BEGIN;
\copy labs from stdin delimiter ','
ALTER TABLE labs ADD COLUMN motto text;
ABORT;
-- cannot perform DDL once a connection is used for multiple shards
-- cannot perform parallel DDL once a connection is used for multiple shards
BEGIN;
SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0;
lab_id
@ -433,6 +439,21 @@ SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0;
ALTER TABLE researchers ADD COLUMN motto text;
ERROR: cannot perform a parallel DDL command because multiple placements have been accessed over the same connection
ROLLBACK;
-- can perform sequential DDL once a connection is used for multiple shards
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0;
lab_id
--------
(0 rows)
SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0;
lab_id
--------
(0 rows)
ALTER TABLE researchers ADD COLUMN motto text;
ROLLBACK;
-- multi-shard operations can co-exist with DDL in a transactional way
BEGIN;
ALTER TABLE labs ADD COLUMN motto text;
@ -1422,7 +1443,7 @@ INSERT INTO users VALUES (3, 'burak');
\COPY items FROM STDIN WITH CSV
ERROR: cannot establish a new connection for placement 1200042, since DML has been executed on a connection that is in use
END;
-- cannot perform DDL after a co-located table has been read over 1 connection
-- cannot perform parallel DDL after a co-located table has been read over 1 connection
BEGIN;
SELECT id FROM users WHERE id = 1;
id
@ -1439,6 +1460,23 @@ SELECT id FROM users WHERE id = 6;
ALTER TABLE items ADD COLUMN last_update timestamptz;
ERROR: cannot perform a parallel DDL command because multiple placements have been accessed over the same connection
END;
-- can perform sequential DDL after a co-located table has been read over 1 connection
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT id FROM users WHERE id = 1;
id
----
1
(1 row)
SELECT id FROM users WHERE id = 6;
id
----
6
(1 row)
ALTER TABLE items ADD COLUMN last_update timestamptz;
ROLLBACK;
-- but the other way around is fine
BEGIN;
ALTER TABLE items ADD COLUMN last_update timestamptz;

View File

@ -0,0 +1,291 @@
--
-- Tests sequential and parallel DDL command execution
-- in combination with 1PC and 2PC
-- Note: this test should not be executed in parallel with
-- other tests since we're relying on disabling 2PC recovery
--
CREATE SCHEMA test_seq_ddl;
SET search_path TO 'test_seq_ddl';
-- this function simply checks the equality of the number of transactions in the
-- pg_dist_transaction and number of primary worker nodes
-- The function is useful to ensure that a single connection is opened per worker
-- in a distributed transaction
CREATE OR REPLACE FUNCTION distributed_2PCs_are_equal_to_worker_count()
RETURNS bool AS
$$
DECLARE
result bool;
BEGIN
SELECT tx_count = worker_count FROM (SELECT count(*) as tx_count FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%') as s1, (SELECT count(*) as worker_count FROM pg_dist_node WHERE noderole = 'primary') as s2 INTO result;
RETURN result;
END;
$$
LANGUAGE 'plpgsql' IMMUTABLE;
-- this function simply checks the equality of the number of transactions in the
-- pg_dist_transaction and number of shard placements for a distributed table
-- The function is useful to ensure that a single connection is opened per
-- shard placement in a distributed transaction
CREATE OR REPLACE FUNCTION distributed_2PCs_are_equal_to_placement_count()
RETURNS bool AS
$$
DECLARE
result bool;
BEGIN
SELECT count(*) = current_setting('citus.shard_count')::bigint * current_setting('citus.shard_replication_factor')::bigint FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%'
INTO result;
RETURN result;
END;
$$
LANGUAGE 'plpgsql' IMMUTABLE;
-- this function simply checks existence of distributed transcations in
-- pg_dist_transaction
CREATE OR REPLACE FUNCTION no_distributed_2PCs()
RETURNS bool AS
$$
DECLARE
result bool;
BEGIN
SELECT tx_count = 0 FROM (SELECT count(*) as tx_count FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%') as s1
INTO result;
RETURN result;
END;
$$
LANGUAGE 'plpgsql' IMMUTABLE;
-- disbable 2PC recovery since our tests will check that
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
CREATE TABLE test_table(a int, b int);
SELECT create_distributed_table('test_table', 'a');
create_distributed_table
--------------------------
(1 row)
-- we should see #worker transactions
-- when sequential mode is used
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
ALTER TABLE test_table ADD CONSTRAINT a_check CHECK(a > 0);
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
-- we should see placement count # transactions
-- when parallel mode is used
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
ALTER TABLE test_table ADD CONSTRAINT b_check CHECK(b > 0);
SELECT distributed_2PCs_are_equal_to_placement_count();
distributed_2pcs_are_equal_to_placement_count
-----------------------------------------------
t
(1 row)
-- with 1PC, we should not see and distributed TXs in the pg_dist_transaction
SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
ALTER TABLE test_table ADD CONSTRAINT c_check CHECK(a > 0);
SELECT no_distributed_2PCs();
no_distributed_2pcs
---------------------
t
(1 row)
SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
ALTER TABLE test_table ADD CONSTRAINT d_check CHECK(a > 0);
SELECT no_distributed_2PCs();
no_distributed_2pcs
---------------------
t
(1 row)
CREATE TABLE ref_test(a int);
SELECT create_reference_table('ref_test');
create_reference_table
------------------------
(1 row)
SET citus.multi_shard_commit_protocol TO '1pc';
-- reference tables should always use 2PC
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE INDEX ref_test_seq_index ON ref_test(a);
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
-- reference tables should always use 2PC
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE INDEX ref_test_seq_index_2 ON ref_test(a);
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
-- tables with replication factor > 1 should also obey
-- both multi_shard_commit_protocol and multi_shard_modify_mode
SET citus.shard_replication_factor TO 2;
CREATE TABLE test_table_rep_2 (a int);
SELECT create_distributed_table('test_table_rep_2', 'a');
create_distributed_table
--------------------------
(1 row)
-- 1PC should never use 2PC with rep > 1
SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE INDEX test_table_rep_2_i_1 ON test_table_rep_2(a);
SELECT no_distributed_2PCs();
no_distributed_2pcs
---------------------
t
(1 row)
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE INDEX test_table_rep_2_i_2 ON test_table_rep_2(a);
SELECT no_distributed_2PCs();
no_distributed_2pcs
---------------------
t
(1 row)
-- 2PC should always use 2PC with rep > 1
SET citus.multi_shard_commit_protocol TO '2pc';
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE INDEX test_table_rep_2_i_3 ON test_table_rep_2(a);
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE INDEX test_table_rep_2_i_4 ON test_table_rep_2(a);
SELECT distributed_2PCs_are_equal_to_placement_count();
distributed_2pcs_are_equal_to_placement_count
-----------------------------------------------
t
(1 row)
-- CREATE INDEX CONCURRENTLY should work fine with rep > 1
-- with both 2PC and different parallel modes
SET citus.multi_shard_commit_protocol TO '2pc';
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE INDEX CONCURRENTLY test_table_rep_2_i_5 ON test_table_rep_2(a);
-- we shouldn't see any distributed transactions
SELECT no_distributed_2PCs();
no_distributed_2pcs
---------------------
t
(1 row)
SET citus.multi_shard_commit_protocol TO '2pc';
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE INDEX CONCURRENTLY test_table_rep_2_i_6 ON test_table_rep_2(a);
-- we shouldn't see any distributed transactions
SELECT no_distributed_2PCs();
no_distributed_2pcs
---------------------
t
(1 row)
ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT;
SET citus.shard_replication_factor TO DEFAULT;
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
SET search_path TO 'public';
DROP SCHEMA test_seq_ddl CASCADE;
NOTICE: drop cascades to 6 other objects
DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count()
drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count()
drop cascades to function test_seq_ddl.no_distributed_2pcs()
drop cascades to table test_seq_ddl.test_table
drop cascades to table test_seq_ddl.ref_test
drop cascades to table test_seq_ddl.test_table_rep_2

View File

@ -133,6 +133,13 @@ test: multi_create_schema
# ----------
test: multi_utility_warnings
# ----------
# Tests to check the sequential and parallel executions of DDL and modification
# commands
# Should not be executed in parallel with other tests
# ----------
test: sequential_modifications
# ---------
# multi_append_table_to_shard loads data to create shards in a way that forces
# shard caching.

View File

@ -505,6 +505,7 @@ SELECT (run_command_on_workers($$
$$)).*
ORDER BY
1,2,3,4;
SET search_path TO 'public';
DROP SCHEMA sc1 CASCADE;

View File

@ -158,6 +158,13 @@ ALTER TABLE labs ADD COLUMN motto text;
INSERT INTO labs VALUES (6, 'Bell Labs');
ABORT;
-- this should work find with sequential DDL as well
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
ALTER TABLE labs ADD COLUMN motto text;
INSERT INTO labs VALUES (6, 'Bell Labs');
ABORT;
-- but the DDL should correctly roll back
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::regclass;
SELECT * FROM labs WHERE id = 6;
@ -331,13 +338,21 @@ BEGIN;
ALTER TABLE labs ADD COLUMN motto text;
ABORT;
-- cannot perform DDL once a connection is used for multiple shards
-- cannot perform parallel DDL once a connection is used for multiple shards
BEGIN;
SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0;
SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0;
ALTER TABLE researchers ADD COLUMN motto text;
ROLLBACK;
-- can perform sequential DDL once a connection is used for multiple shards
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT lab_id FROM researchers WHERE lab_id = 1 AND id = 0;
SELECT lab_id FROM researchers WHERE lab_id = 2 AND id = 0;
ALTER TABLE researchers ADD COLUMN motto text;
ROLLBACK;
-- multi-shard operations can co-exist with DDL in a transactional way
BEGIN;
ALTER TABLE labs ADD COLUMN motto text;
@ -1057,13 +1072,21 @@ INSERT INTO users VALUES (3, 'burak');
\.
END;
-- cannot perform DDL after a co-located table has been read over 1 connection
-- cannot perform parallel DDL after a co-located table has been read over 1 connection
BEGIN;
SELECT id FROM users WHERE id = 1;
SELECT id FROM users WHERE id = 6;
ALTER TABLE items ADD COLUMN last_update timestamptz;
END;
-- can perform sequential DDL after a co-located table has been read over 1 connection
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT id FROM users WHERE id = 1;
SELECT id FROM users WHERE id = 6;
ALTER TABLE items ADD COLUMN last_update timestamptz;
ROLLBACK;
-- but the other way around is fine
BEGIN;
ALTER TABLE items ADD COLUMN last_update timestamptz;

View File

@ -0,0 +1,163 @@
--
-- Tests sequential and parallel DDL command execution
-- in combination with 1PC and 2PC
-- Note: this test should not be executed in parallel with
-- other tests since we're relying on disabling 2PC recovery
--
CREATE SCHEMA test_seq_ddl;
SET search_path TO 'test_seq_ddl';
-- this function simply checks the equality of the number of transactions in the
-- pg_dist_transaction and number of primary worker nodes
-- The function is useful to ensure that a single connection is opened per worker
-- in a distributed transaction
CREATE OR REPLACE FUNCTION distributed_2PCs_are_equal_to_worker_count()
RETURNS bool AS
$$
DECLARE
result bool;
BEGIN
SELECT tx_count = worker_count FROM (SELECT count(*) as tx_count FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%') as s1, (SELECT count(*) as worker_count FROM pg_dist_node WHERE noderole = 'primary') as s2 INTO result;
RETURN result;
END;
$$
LANGUAGE 'plpgsql' IMMUTABLE;
-- this function simply checks the equality of the number of transactions in the
-- pg_dist_transaction and number of shard placements for a distributed table
-- The function is useful to ensure that a single connection is opened per
-- shard placement in a distributed transaction
CREATE OR REPLACE FUNCTION distributed_2PCs_are_equal_to_placement_count()
RETURNS bool AS
$$
DECLARE
result bool;
BEGIN
SELECT count(*) = current_setting('citus.shard_count')::bigint * current_setting('citus.shard_replication_factor')::bigint FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%'
INTO result;
RETURN result;
END;
$$
LANGUAGE 'plpgsql' IMMUTABLE;
-- this function simply checks existence of distributed transcations in
-- pg_dist_transaction
CREATE OR REPLACE FUNCTION no_distributed_2PCs()
RETURNS bool AS
$$
DECLARE
result bool;
BEGIN
SELECT tx_count = 0 FROM (SELECT count(*) as tx_count FROM pg_dist_transaction WHERE gid LIKE 'citus_%_' || pg_backend_pid() || '%_%') as s1
INTO result;
RETURN result;
END;
$$
LANGUAGE 'plpgsql' IMMUTABLE;
-- disbable 2PC recovery since our tests will check that
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
SELECT pg_reload_conf();
CREATE TABLE test_table(a int, b int);
SELECT create_distributed_table('test_table', 'a');
-- we should see #worker transactions
-- when sequential mode is used
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
ALTER TABLE test_table ADD CONSTRAINT a_check CHECK(a > 0);
SELECT distributed_2PCs_are_equal_to_worker_count();
-- we should see placement count # transactions
-- when parallel mode is used
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
ALTER TABLE test_table ADD CONSTRAINT b_check CHECK(b > 0);
SELECT distributed_2PCs_are_equal_to_placement_count();
-- with 1PC, we should not see and distributed TXs in the pg_dist_transaction
SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
ALTER TABLE test_table ADD CONSTRAINT c_check CHECK(a > 0);
SELECT no_distributed_2PCs();
SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
ALTER TABLE test_table ADD CONSTRAINT d_check CHECK(a > 0);
SELECT no_distributed_2PCs();
CREATE TABLE ref_test(a int);
SELECT create_reference_table('ref_test');
SET citus.multi_shard_commit_protocol TO '1pc';
-- reference tables should always use 2PC
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
CREATE INDEX ref_test_seq_index ON ref_test(a);
SELECT distributed_2PCs_are_equal_to_worker_count();
-- reference tables should always use 2PC
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
CREATE INDEX ref_test_seq_index_2 ON ref_test(a);
SELECT distributed_2PCs_are_equal_to_worker_count();
-- tables with replication factor > 1 should also obey
-- both multi_shard_commit_protocol and multi_shard_modify_mode
SET citus.shard_replication_factor TO 2;
CREATE TABLE test_table_rep_2 (a int);
SELECT create_distributed_table('test_table_rep_2', 'a');
-- 1PC should never use 2PC with rep > 1
SET citus.multi_shard_commit_protocol TO '1pc';
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
CREATE INDEX test_table_rep_2_i_1 ON test_table_rep_2(a);
SELECT no_distributed_2PCs();
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
CREATE INDEX test_table_rep_2_i_2 ON test_table_rep_2(a);
SELECT no_distributed_2PCs();
-- 2PC should always use 2PC with rep > 1
SET citus.multi_shard_commit_protocol TO '2pc';
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
CREATE INDEX test_table_rep_2_i_3 ON test_table_rep_2(a);
SELECT distributed_2PCs_are_equal_to_worker_count();
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
CREATE INDEX test_table_rep_2_i_4 ON test_table_rep_2(a);
SELECT distributed_2PCs_are_equal_to_placement_count();
-- CREATE INDEX CONCURRENTLY should work fine with rep > 1
-- with both 2PC and different parallel modes
SET citus.multi_shard_commit_protocol TO '2pc';
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
CREATE INDEX CONCURRENTLY test_table_rep_2_i_5 ON test_table_rep_2(a);
-- we shouldn't see any distributed transactions
SELECT no_distributed_2PCs();
SET citus.multi_shard_commit_protocol TO '2pc';
SET citus.multi_shard_modify_mode TO 'parallel';
SELECT recover_prepared_transactions();
CREATE INDEX CONCURRENTLY test_table_rep_2_i_6 ON test_table_rep_2(a);
-- we shouldn't see any distributed transactions
SELECT no_distributed_2PCs();
ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT;
SET citus.shard_replication_factor TO DEFAULT;
SELECT pg_reload_conf();
SET search_path TO 'public';
DROP SCHEMA test_seq_ddl CASCADE;