create_distributed_table honors sequential mode

pull/2227/head
mehmet furkan şahin 2018-06-18 18:37:48 +03:00
parent 0c47d16e8e
commit 2b2ce036eb
3 changed files with 267 additions and 5 deletions

View File

@ -40,6 +40,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_utility.h"
@ -475,9 +476,34 @@ CreateHashDistributedTableShards(Oid relationId, Oid colocatedTableId,
*/
EnsureSchemaExistsOnAllNodes(relationId);
/*
* Decide whether to use exclusive connections per placement or not. Note that
* if the local table is not empty, we cannot use sequential mode since the COPY
* operation that'd load the data into shards currently requires exclusive
* connections.
*/
if (RegularTable(relationId))
{
useExclusiveConnection = IsTransactionBlock() || !localTableEmpty;
if (!localTableEmpty && MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode "
"because it is not empty", relationName),
errhint("If you have manually set "
"citus.multi_shard_modify_mode to 'sequential', "
"try with 'parallel' option. If that is not the "
"case, try distributing local tables when they "
"are empty.")));
}
else if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
useExclusiveConnection = false;
}
else if (!localTableEmpty || IsTransactionBlock())
{
useExclusiveConnection = true;
}
}
if (colocatedTableId != InvalidOid)

View File

@ -6,6 +6,8 @@
--
CREATE SCHEMA test_seq_ddl;
SET search_path TO 'test_seq_ddl';
SET citus.next_shard_id TO 1600;
SET citus.next_placement_id TO 1600;
-- this function simply checks the equality of the number of transactions in the
-- pg_dist_transaction and number of primary worker nodes
-- The function is useful to ensure that a single connection is opened per worker
@ -326,15 +328,14 @@ SELECT distributed_2PCs_are_equal_to_worker_count();
-- truncate with rep > 1 should work both in parallel and seq. modes
CREATE TABLE test_seq_truncate_rep_2 (a int);
INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('test_seq_truncate_rep_2', 'a');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i;
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
recover_prepared_transactions
@ -491,9 +492,159 @@ SELECT pg_reload_conf();
t
(1 row)
-- The following tests are added to test if create_distributed_table honors sequential mode
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
-- Check if multi_shard_update works properly after create_distributed_table in sequential mode
CREATE TABLE test_seq_multi_shard_update(a int, b int);
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_seq_multi_shard_update', 'a');
create_distributed_table
--------------------------
(1 row)
INSERT INTO test_seq_multi_shard_update VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0);
SELECT master_modify_multiple_shards('DELETE FROM test_seq_multi_shard_update WHERE b < 2');
master_modify_multiple_shards
-------------------------------
5
(1 row)
COMMIT;
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
DROP TABLE test_seq_multi_shard_update;
-- Check if truncate works properly after create_distributed_table in sequential mode
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE TABLE test_seq_truncate_after_create(a int, b int);
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_seq_truncate_after_create', 'a');
create_distributed_table
--------------------------
(1 row)
INSERT INTO test_seq_truncate_after_create VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0);
TRUNCATE test_seq_truncate_after_create;
COMMIT;
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
DROP TABLE test_seq_truncate_after_create;
-- Check if drop table works properly after create_distributed_table in sequential mode
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE TABLE test_seq_drop_table(a int, b int);
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_seq_drop_table', 'a');
create_distributed_table
--------------------------
(1 row)
DROP TABLE test_seq_drop_table;
COMMIT;
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
-- Check if copy errors out properly after create_distributed_table in sequential mode
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE TABLE test_seq_copy(a int, b int);
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_seq_copy', 'a');
create_distributed_table
--------------------------
(1 row)
\COPY test_seq_copy FROM STDIN DELIMITER AS ',';
ERROR: cannot establish a new connection for placement 1673, since DDL has been executed on a connection that is in use
CONTEXT: COPY test_seq_copy, line 2: "2,2"
ROLLBACK;
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
f
(1 row)
DROP TABLE test_seq_copy;
-- Check if DDL + CREATE INDEX works properly after create_distributed_table in sequential mode
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
CREATE TABLE test_seq_ddl_index(a int, b int);
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_seq_ddl_index', 'a');
create_distributed_table
--------------------------
(1 row)
INSERT INTO test_seq_ddl_index VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0);
ALTER TABLE test_seq_ddl_index ADD COLUMN c int;
CREATE INDEX idx ON test_seq_ddl_index(c);
COMMIT;
SELECT distributed_2PCs_are_equal_to_worker_count();
distributed_2pcs_are_equal_to_worker_count
--------------------------------------------
t
(1 row)
DROP TABLE test_seq_ddl_index;
-- create_distributed_table should fail on relations with data in sequential mode in and out transaction block
CREATE TABLE test_create_seq_table (a int);
INSERT INTO test_create_seq_table VALUES (1);
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_create_seq_table' ,'a');
ERROR: cannot distribute "test_create_seq_table" in sequential mode because it is not empty
HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', try with 'parallel' option. If that is not the case, try distributing local tables when they are empty.
RESET citus.multi_shard_modify_mode;
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
select create_distributed_table('test_create_seq_table' ,'a');
ERROR: cannot distribute "test_create_seq_table" in sequential mode because it is not empty
HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', try with 'parallel' option. If that is not the case, try distributing local tables when they are empty.
ROLLBACK;
SET search_path TO 'public';
DROP SCHEMA test_seq_ddl CASCADE;
NOTICE: drop cascades to 10 other objects
NOTICE: drop cascades to 11 other objects
DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count()
drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count()
drop cascades to function test_seq_ddl.no_distributed_2pcs()
@ -504,3 +655,4 @@ drop cascades to table test_seq_ddl.test_table_rep_2
drop cascades to table test_seq_ddl.test_seq_truncate
drop cascades to table test_seq_ddl.test_seq_truncate_rep_2
drop cascades to table test_seq_ddl.multi_shard_modify_test
drop cascades to table test_seq_ddl.test_create_seq_table

View File

@ -6,6 +6,8 @@
--
CREATE SCHEMA test_seq_ddl;
SET search_path TO 'test_seq_ddl';
SET citus.next_shard_id TO 1600;
SET citus.next_placement_id TO 1600;
-- this function simply checks the equality of the number of transactions in the
-- pg_dist_transaction and number of primary worker nodes
@ -182,9 +184,9 @@ SELECT distributed_2PCs_are_equal_to_worker_count();
-- truncate with rep > 1 should work both in parallel and seq. modes
CREATE TABLE test_seq_truncate_rep_2 (a int);
INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('test_seq_truncate_rep_2', 'a');
INSERT INTO test_seq_truncate_rep_2 SELECT i FROM generate_series(0, 100) i;
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT recover_prepared_transactions();
@ -256,5 +258,87 @@ ALTER SYSTEM SET citus.recover_2pc_interval TO DEFAULT;
SET citus.shard_replication_factor TO DEFAULT;
SELECT pg_reload_conf();
-- The following tests are added to test if create_distributed_table honors sequential mode
SELECT recover_prepared_transactions();
-- Check if multi_shard_update works properly after create_distributed_table in sequential mode
CREATE TABLE test_seq_multi_shard_update(a int, b int);
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_seq_multi_shard_update', 'a');
INSERT INTO test_seq_multi_shard_update VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0);
SELECT master_modify_multiple_shards('DELETE FROM test_seq_multi_shard_update WHERE b < 2');
COMMIT;
SELECT distributed_2PCs_are_equal_to_worker_count();
DROP TABLE test_seq_multi_shard_update;
-- Check if truncate works properly after create_distributed_table in sequential mode
SELECT recover_prepared_transactions();
CREATE TABLE test_seq_truncate_after_create(a int, b int);
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_seq_truncate_after_create', 'a');
INSERT INTO test_seq_truncate_after_create VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0);
TRUNCATE test_seq_truncate_after_create;
COMMIT;
SELECT distributed_2PCs_are_equal_to_worker_count();
DROP TABLE test_seq_truncate_after_create;
-- Check if drop table works properly after create_distributed_table in sequential mode
SELECT recover_prepared_transactions();
CREATE TABLE test_seq_drop_table(a int, b int);
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_seq_drop_table', 'a');
DROP TABLE test_seq_drop_table;
COMMIT;
SELECT distributed_2PCs_are_equal_to_worker_count();
-- Check if copy errors out properly after create_distributed_table in sequential mode
SELECT recover_prepared_transactions();
CREATE TABLE test_seq_copy(a int, b int);
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_seq_copy', 'a');
\COPY test_seq_copy FROM STDIN DELIMITER AS ',';
1,1
2,2
3,3
\.
ROLLBACK;
SELECT distributed_2PCs_are_equal_to_worker_count();
DROP TABLE test_seq_copy;
-- Check if DDL + CREATE INDEX works properly after create_distributed_table in sequential mode
SELECT recover_prepared_transactions();
CREATE TABLE test_seq_ddl_index(a int, b int);
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_seq_ddl_index', 'a');
INSERT INTO test_seq_ddl_index VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0);
ALTER TABLE test_seq_ddl_index ADD COLUMN c int;
CREATE INDEX idx ON test_seq_ddl_index(c);
COMMIT;
SELECT distributed_2PCs_are_equal_to_worker_count();
DROP TABLE test_seq_ddl_index;
-- create_distributed_table should fail on relations with data in sequential mode in and out transaction block
CREATE TABLE test_create_seq_table (a int);
INSERT INTO test_create_seq_table VALUES (1);
SET citus.multi_shard_modify_mode TO 'sequential';
SELECT create_distributed_table('test_create_seq_table' ,'a');
RESET citus.multi_shard_modify_mode;
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
select create_distributed_table('test_create_seq_table' ,'a');
ROLLBACK;
SET search_path TO 'public';
DROP SCHEMA test_seq_ddl CASCADE;