Avoid blocking writes in create_distributed_table_concurrently (#6324)

Co-authored-by: Marco Slot <marco.slot@gmail.com>
pull/6308/head
Marco Slot 2022-09-12 21:09:37 +02:00 committed by GitHub
parent cda3686d86
commit b79111527e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 7 deletions

View File

@ -564,8 +564,10 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
* such that we can create foreign keys and joins work immediately after creation.
* We do this after applying all essential checks to error out early in case of
* user error.
*
* Use force_logical since this function is meant to not block writes.
*/
EnsureReferenceTablesExistOnAllNodes();
EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_FORCE_LOGICAL);
/*
* At this point, the table is a Citus local table, which means it does

View File

@ -356,6 +356,63 @@ WHERE colocationid IN
(1 row)
DROP TABLE replicate_reference_table_commit;
-- exercise reference table replication in create_distributed_table_concurrently
SELECT citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
CREATE TABLE replicate_reference_table_cdtc(column1 int);
SELECT create_reference_table('replicate_reference_table_cdtc');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
1370004
(1 row)
-- required for create_distributed_table_concurrently
SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SET citus.shard_replication_factor TO 1;
CREATE TABLE distributed_table_cdtc(column1 int primary key);
SELECT create_distributed_table_concurrently('distributed_table_cdtc', 'column1');
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
RESET citus.shard_replication_factor;
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement_view
WHERE
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370004 | 1 | 0 | localhost | 57638
1370005 | 1 | 0 | localhost | 57638
1370007 | 1 | 0 | localhost | 57638
(3 rows)
DROP TABLE replicate_reference_table_cdtc, distributed_table_cdtc;
-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
@ -431,8 +488,8 @@ WHERE
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370004 | 1 | 0 | localhost | 57638
1370005 | 1 | 0 | localhost | 57638
1370009 | 1 | 0 | localhost | 57638
1370010 | 1 | 0 | localhost | 57638
(2 rows)
SELECT shardcount, replicationfactor, distributioncolumntype, distributioncolumncollation FROM pg_dist_colocation
@ -739,7 +796,7 @@ WHERE
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370014 | 1 | 0 | localhost | 57637
1370019 | 1 | 0 | localhost | 57637
(1 row)
-- we should see the two shard placements after activation
@ -764,7 +821,7 @@ WHERE
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370014 | 1 | 0 | localhost | 57637
1370019 | 1 | 0 | localhost | 57637
(1 row)
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
@ -794,7 +851,7 @@ HINT: Add the target node via SELECT citus_add_node('localhost', 57638);
SELECT citus_add_secondary_node('localhost', :worker_2_port, 'localhost', :worker_1_port);
citus_add_secondary_node
---------------------------------------------------------------------
1370012
1370014
(1 row)
SELECT citus_copy_shard_placement(
@ -1162,7 +1219,7 @@ SELECT create_distributed_table('test','x');
SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
1370022
1370024
(1 row)
SELECT

View File

@ -233,6 +233,30 @@ WHERE colocationid IN
DROP TABLE replicate_reference_table_commit;
-- exercise reference table replication in create_distributed_table_concurrently
SELECT citus_remove_node('localhost', :worker_2_port);
CREATE TABLE replicate_reference_table_cdtc(column1 int);
SELECT create_reference_table('replicate_reference_table_cdtc');
SELECT citus_add_node('localhost', :worker_2_port);
-- required for create_distributed_table_concurrently
SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port);
SET citus.shard_replication_factor TO 1;
CREATE TABLE distributed_table_cdtc(column1 int primary key);
SELECT create_distributed_table_concurrently('distributed_table_cdtc', 'column1');
RESET citus.shard_replication_factor;
SELECT citus_remove_node('localhost', :master_port);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement_view
WHERE
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
DROP TABLE replicate_reference_table_cdtc, distributed_table_cdtc;
-- test adding new node + upgrading another hash distributed table to reference table + creating new reference table in TRANSACTION
SELECT master_remove_node('localhost', :worker_2_port);