Merge pull request #685 from citusdata/make_1pc_default_for_ddl

Set 1PC as the Default Commit Protocol for DDL Commands
pull/677/head
Eren Başak 2016-07-29 16:54:56 +03:00 committed by GitHub
commit 2f8211a787
12 changed files with 79 additions and 97 deletions

View File

@ -118,7 +118,7 @@ static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort)
static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
bool isTopLevel);
static void SetLocalCommitProtocolTo2PC(void);
static void ShowNoticeIfNotUsing2PC(void);
static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString);
static void ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId,
ShardConnections *shardConnections);
@ -128,6 +128,9 @@ static void CheckCopyPermissions(CopyStmt *copyStatement);
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
static bool warnedUserAbout2PC = false;
/*
* Utility for handling citus specific concerns around utility statements.
*
@ -1240,11 +1243,11 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt)
/*
* ExecuteDistributedDDLCommand applies a given DDL command to the given
* distributed table in a distributed transaction. For the transaction, the value of
* citus.multi_shard_commit_protocol is set to '2pc' so that two phase commit mechanism
* is used, regardless of the actual value of citus.multi_shard_commit_protocol. In
* the commit protocol, a BEGIN is sent after connection to each shard placement
* and COMMIT/ROLLBACK is handled by CompleteShardPlacementTransactions function.
* distributed table in a distributed transaction. If the multi shard commit protocol is
* in its default value of '1pc', then a notice message indicating that '2pc' might be
* used for extra safety. In the commit protocol, a BEGIN is sent after connection to
* each shard placement and COMMIT/ROLLBACK is handled by
* CompleteShardPlacementTransactions function.
*/
static void
ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
@ -1253,7 +1256,7 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
bool executionOK = false;
PreventTransactionChain(isTopLevel, "distributed DDL commands");
SetLocalCommitProtocolTo2PC();
ShowNoticeIfNotUsing2PC();
executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString);
@ -1266,23 +1269,20 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
/*
* SwitchTo2PCForTransaction sets the citus.multi_shard_commit_protocol value
* to '2pc' for the current transaction, in order to force 2PC even the value
* of citus.multi_shard_commit_protocol is '1pc';
* ShowNoticeIfNotUsing2PC shows a notice message about using 2PC by setting
* citus.multi_shard_commit_protocol to 2PC. The notice message is shown only once in a
* session
*/
static void
SetLocalCommitProtocolTo2PC(void)
ShowNoticeIfNotUsing2PC(void)
{
if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC)
if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC && !warnedUserAbout2PC)
{
ereport(DEBUG2, (errmsg("switching to 2PC for the transaction")));
ereport(NOTICE, (errmsg("using one-phase commit for distributed DDL commands"),
errhint("You can enable two-phase commit for extra safety with: "
"SET citus.multi_shard_commit_protocol TO '2pc'")));
set_config_option("citus.multi_shard_commit_protocol",
"2pc",
PGC_USERSET,
PGC_S_SESSION,
GUC_ACTION_LOCAL,
true, 0, false);
warnedUserAbout2PC = true;
}
}

View File

@ -33,6 +33,8 @@ HINT: Consider using hash partitioning.
(1 row)
CREATE INDEX lineitem_time_index ON lineitem (l_shipdate);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
CREATE TABLE orders (
o_orderkey bigint not null,
o_custkey integer not null,

View File

@ -16,6 +16,8 @@ SELECT * FROM customer LIMIT 2;
(2 rows)
ALTER TABLE customer ADD COLUMN new_column1 INTEGER;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
ALTER TABLE customer ADD COLUMN new_column2 INTEGER;
SELECT count(*) FROM customer;
count

View File

@ -39,6 +39,8 @@ SELECT * FROM example;
-- non-immutable functions inside CASE/COALESCE aren't allowed
ALTER TABLE example DROP value;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
ALTER TABLE example ADD value timestamp;
-- this is allowed because there are no mutable funcs in the CASE
UPDATE example SET value = (CASE WHEN value > timestamp '12-12-1991' THEN timestamp '12-12-1991' ELSE value + interval '1 hour' END) WHERE key = 1;

View File

@ -65,6 +65,8 @@ SELECT master_create_empty_shard('index_test_append');
--
-- Verify that we can create different types of indexes
CREATE INDEX lineitem_orderkey_index ON lineitem (l_orderkey);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
CREATE INDEX lineitem_partkey_desc_index ON lineitem (l_partkey DESC);
CREATE INDEX lineitem_partial_index ON lineitem (l_shipdate)
WHERE l_shipdate < '1995-01-01';
@ -139,6 +141,8 @@ ERROR: creating unique indexes on append-partitioned tables is currently unsupp
CREATE INDEX lineitem_orderkey_index ON lineitem (l_orderkey);
ERROR: relation "lineitem_orderkey_index" already exists
CREATE INDEX try_index ON lineitem USING gist (l_orderkey);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
WARNING: data type bigint has no default operator class for access method "gist"
HINT: You must specify an operator class for the index or define a default operator class for the data type.
CONTEXT: while executing command on localhost:57638

View File

@ -42,14 +42,13 @@ SELECT master_create_worker_shards('lineitem_hash', 2, 1);
(1 row)
CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate);
DEBUG: switching to 2PC for the transaction
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
DEBUG: applied command on shard 650000 on node localhost:57637
DEBUG: applied command on shard 650001 on node localhost:57638
DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash"
DEBUG: sent PREPARE TRANSACTION over connection 650000
DEBUG: sent PREPARE TRANSACTION over connection 650001
DEBUG: sent COMMIT PREPARED over connection 650000
DEBUG: sent COMMIT PREPARED over connection 650001
DEBUG: sent COMMIT over connection 650000
DEBUG: sent COMMIT over connection 650001
CREATE TABLE orders_hash (
o_orderkey bigint not null,
o_custkey integer not null,

View File

@ -433,6 +433,8 @@ UPDATE limit_orders SET symbol = UPPER(symbol) WHERE id = 246 RETURNING id, LOWE
(1 row)
ALTER TABLE limit_orders ADD COLUMN array_of_values integer[];
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
-- updates referencing STABLE functions are allowed
UPDATE limit_orders SET placed_at = LEAST(placed_at, now()::timestamp) WHERE id = 246;
-- so are binary operators
@ -476,6 +478,8 @@ SELECT array_of_values FROM limit_orders WHERE id = 246;
(1 row)
ALTER TABLE limit_orders DROP array_of_values;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
-- even in RETURNING
UPDATE limit_orders SET placed_at = placed_at WHERE id = 246 RETURNING NOW();
ERROR: non-IMMUTABLE functions are not allowed in the RETURNING clause

View File

@ -560,6 +560,8 @@ SELECT * FROM nation_hash_composite_types WHERE test_col = '(a,a)'::new_composit
-- test ALTER TABLE ADD/DROP queries with schemas
SET search_path TO public;
ALTER TABLE test_schema_support.nation_hash ADD COLUMN new_col INT;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
-- verify column is added
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
@ -584,6 +586,8 @@ ALTER TABLE test_schema_support.nation_hash ADD COLUMN new_col INT;
\c - - - :master_port
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS non_existent_column;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped
@ -610,6 +614,8 @@ ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS new_col;
--test with search_path is set
SET search_path TO test_schema_support;
ALTER TABLE nation_hash ADD COLUMN new_col INT;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
-- verify column is added
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
@ -635,6 +641,8 @@ ALTER TABLE nation_hash ADD COLUMN new_col INT;
\c - - - :master_port
SET search_path TO test_schema_support;
ALTER TABLE nation_hash DROP COLUMN IF EXISTS non_existent_column;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping
ALTER TABLE nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped
@ -662,6 +670,8 @@ ALTER TABLE nation_hash DROP COLUMN IF EXISTS new_col;
SET search_path TO public;
-- CREATE index
CREATE INDEX index1 ON test_schema_support.nation_hash(n_name);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
--verify INDEX is created
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
@ -689,6 +699,8 @@ Indexes:
\c - - - :master_port
-- DROP index
DROP INDEX test_schema_support.index1;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
--verify INDEX is dropped
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
@ -714,6 +726,8 @@ DROP INDEX test_schema_support.index1;
SET search_path TO test_schema_support;
-- CREATE index
CREATE INDEX index1 ON nation_hash(n_name);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
--verify INDEX is created
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
@ -742,6 +756,8 @@ Indexes:
-- DROP index
SET search_path TO test_schema_support;
DROP INDEX index1;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
--verify INDEX is dropped
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"

View File

@ -225,6 +225,8 @@ SELECT master_create_worker_shards('dropcol_distributed', 4, 1);
INSERT INTO dropcol_distributed AS dropcol (key, keep1, keep2) VALUES (1, '5', 5) ON CONFLICT(key)
DO UPDATE SET keep1 = dropcol.keep1;
ALTER TABLE dropcol_distributed DROP COLUMN drop2;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
INSERT INTO dropcol_distributed (key, keep1, keep2) VALUES (1, '5', 5) ON CONFLICT(key)
DO UPDATE SET keep1 = dropcol_distributed.keep1;
ALTER TABLE dropcol_distributed DROP COLUMN keep2;

View File

@ -172,15 +172,13 @@ COMMIT;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
DROP INDEX temp_index_2;
-- verify that distributed ddl commands use 2PC, even the value of citus.multi_shard_commit_protocol is '1pc'
SET citus.multi_shard_commit_protocol TO '1pc';
SET client_min_messages TO DEBUG2;
--- verify that distributed ddl commands can be used with 2pc
SET citus.multi_shard_commit_protocol TO '2pc';
CREATE INDEX temp_index_3 ON lineitem_alter(l_orderkey);
RESET client_min_messages;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
DROP INDEX temp_index_3;
-- verify that citus.multi_shard_commit_protocol value is not changed
SHOW citus.multi_shard_commit_protocol;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
RESET citus.multi_shard_commit_protocol;
-- verify that not any of shard placements are marked as failed when a query failure occurs
CREATE TABLE test_ab (a int, b int);

View File

@ -33,6 +33,8 @@ SELECT master_create_distributed_table('lineitem_alter', 'l_orderkey', 'append')
\STAGE lineitem_alter FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
-- Verify that we can add columns
ALTER TABLE lineitem_alter ADD COLUMN float_column FLOAT;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
ALTER TABLE lineitem_alter ADD COLUMN date_column DATE;
ALTER TABLE lineitem_alter ADD COLUMN int_column1 INTEGER DEFAULT 1;
ALTER TABLE lineitem_alter ADD COLUMN int_column2 INTEGER DEFAULT 2;
@ -116,6 +118,8 @@ SELECT int_column1, count(*) FROM lineitem_alter GROUP BY int_column1;
-- Verify that SET|DROP DEFAULT works
ALTER TABLE lineitem_alter ALTER COLUMN float_column SET DEFAULT 1;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
ALTER TABLE lineitem_alter ALTER COLUMN int_column1 DROP DEFAULT;
-- \stage to verify that default values take effect
\STAGE lineitem_alter (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
@ -437,75 +441,22 @@ SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
DROP INDEX temp_index_2;
ERROR: index "temp_index_2" does not exist
-- verify that distributed ddl commands use 2PC, even the value of citus.multi_shard_commit_protocol is '1pc'
SET citus.multi_shard_commit_protocol TO '1pc';
SET client_min_messages TO DEBUG2;
--- verify that distributed ddl commands can be used with 2pc
SET citus.multi_shard_commit_protocol TO '2pc';
CREATE INDEX temp_index_3 ON lineitem_alter(l_orderkey);
DEBUG: switching to 2PC for the transaction
DEBUG: applied command on shard 220000 on node localhost:57638
DEBUG: applied command on shard 220000 on node localhost:57637
DEBUG: applied command on shard 220003 on node localhost:57637
DEBUG: applied command on shard 220003 on node localhost:57638
DEBUG: applied command on shard 220007 on node localhost:57637
DEBUG: applied command on shard 220007 on node localhost:57638
DEBUG: applied command on shard 220004 on node localhost:57638
DEBUG: applied command on shard 220004 on node localhost:57637
DEBUG: applied command on shard 220008 on node localhost:57638
DEBUG: applied command on shard 220008 on node localhost:57637
DEBUG: applied command on shard 220001 on node localhost:57637
DEBUG: applied command on shard 220001 on node localhost:57638
DEBUG: applied command on shard 220002 on node localhost:57638
DEBUG: applied command on shard 220002 on node localhost:57637
DEBUG: applied command on shard 220005 on node localhost:57637
DEBUG: applied command on shard 220005 on node localhost:57638
DEBUG: applied command on shard 220009 on node localhost:57637
DEBUG: applied command on shard 220009 on node localhost:57638
DEBUG: building index "temp_index_3" on table "lineitem_alter"
DEBUG: sent PREPARE TRANSACTION over connection 220007
DEBUG: sent PREPARE TRANSACTION over connection 220007
DEBUG: sent PREPARE TRANSACTION over connection 220002
DEBUG: sent PREPARE TRANSACTION over connection 220002
DEBUG: sent PREPARE TRANSACTION over connection 220004
DEBUG: sent PREPARE TRANSACTION over connection 220004
DEBUG: sent PREPARE TRANSACTION over connection 220000
DEBUG: sent PREPARE TRANSACTION over connection 220000
DEBUG: sent PREPARE TRANSACTION over connection 220008
DEBUG: sent PREPARE TRANSACTION over connection 220008
DEBUG: sent PREPARE TRANSACTION over connection 220009
DEBUG: sent PREPARE TRANSACTION over connection 220009
DEBUG: sent PREPARE TRANSACTION over connection 220001
DEBUG: sent PREPARE TRANSACTION over connection 220001
DEBUG: sent PREPARE TRANSACTION over connection 220005
DEBUG: sent PREPARE TRANSACTION over connection 220005
DEBUG: sent PREPARE TRANSACTION over connection 220003
DEBUG: sent PREPARE TRANSACTION over connection 220003
DEBUG: sent COMMIT PREPARED over connection 220007
DEBUG: sent COMMIT PREPARED over connection 220007
DEBUG: sent COMMIT PREPARED over connection 220002
DEBUG: sent COMMIT PREPARED over connection 220002
DEBUG: sent COMMIT PREPARED over connection 220004
DEBUG: sent COMMIT PREPARED over connection 220004
DEBUG: sent COMMIT PREPARED over connection 220000
DEBUG: sent COMMIT PREPARED over connection 220000
DEBUG: sent COMMIT PREPARED over connection 220008
DEBUG: sent COMMIT PREPARED over connection 220008
DEBUG: sent COMMIT PREPARED over connection 220009
DEBUG: sent COMMIT PREPARED over connection 220009
DEBUG: sent COMMIT PREPARED over connection 220001
DEBUG: sent COMMIT PREPARED over connection 220001
DEBUG: sent COMMIT PREPARED over connection 220005
DEBUG: sent COMMIT PREPARED over connection 220005
DEBUG: sent COMMIT PREPARED over connection 220003
DEBUG: sent COMMIT PREPARED over connection 220003
RESET client_min_messages;
DROP INDEX temp_index_3;
-- verify that citus.multi_shard_commit_protocol value is not changed
SHOW citus.multi_shard_commit_protocol;
citus.multi_shard_commit_protocol
-----------------------------------
1pc
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
indexname | tablename
--------------+----------------
temp_index_3 | lineitem_alter
(1 row)
DROP INDEX temp_index_3;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
indexname | tablename
-----------+-----------
(0 rows)
RESET citus.multi_shard_commit_protocol;
-- verify that not any of shard placements are marked as failed when a query failure occurs
CREATE TABLE test_ab (a int, b int);
SELECT master_create_distributed_table('test_ab', 'a', 'hash');

View File

@ -175,6 +175,8 @@ SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
-- Add columns to the table and perform a COPY
ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
SELECT * FROM customer_copy_hash WHERE extra1 = 1;