mirror of https://github.com/citusdata/citus.git
Add Support for Single Shard Tables in update_distributed_table_colocation (#6924)
Adds Support for Single Shard Tables in `update_distributed_table_colocation`. This PR changes checks that make sure tables should be hash distributed table to hash or single shard distributed tables.pull/6941/head
parent
1ca80813f6
commit
321fcfcdb5
|
@ -3708,12 +3708,14 @@ citus_internal_update_relation_colocation(PG_FUNCTION_ARGS)
|
|||
"entry in pg_dist_partition.",
|
||||
get_rel_name(relationId))));
|
||||
}
|
||||
else if (partitionMethod != DISTRIBUTE_BY_HASH)
|
||||
else if (!IsCitusTableType(relationId, HASH_DISTRIBUTED) &&
|
||||
!IsCitusTableType(relationId, SINGLE_SHARD_DISTRIBUTED))
|
||||
{
|
||||
/* connection from the coordinator operating on a shard */
|
||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("Updating colocation ids are only allowed for hash "
|
||||
"distributed tables: %c", partitionMethod)));
|
||||
"and single shard distributed tables: %c",
|
||||
partitionMethod)));
|
||||
}
|
||||
|
||||
int count = 1;
|
||||
|
|
|
@ -2288,6 +2288,24 @@ EnsureHashDistributedTable(Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureHashOrSingleShardDistributedTable error out if the given relation is not a
|
||||
* hash or single shard distributed table with the given message.
|
||||
*/
|
||||
void
|
||||
EnsureHashOrSingleShardDistributedTable(Oid relationId)
|
||||
{
|
||||
if (!IsCitusTableType(relationId, HASH_DISTRIBUTED) &&
|
||||
!IsCitusTableType(relationId, SINGLE_SHARD_DISTRIBUTED))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("relation %s should be a "
|
||||
"hash or single shard distributed table",
|
||||
get_rel_name(relationId))));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureSuperUser check that the current user is a superuser and errors out if not.
|
||||
*/
|
||||
|
|
|
@ -120,7 +120,7 @@ update_distributed_table_colocation(PG_FUNCTION_ARGS)
|
|||
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
|
||||
if (IsColocateWithNone(colocateWithTableName))
|
||||
{
|
||||
EnsureHashDistributedTable(targetRelationId);
|
||||
EnsureHashOrSingleShardDistributedTable(targetRelationId);
|
||||
BreakColocation(targetRelationId);
|
||||
}
|
||||
else
|
||||
|
@ -264,8 +264,8 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
|
|||
"other tables")));
|
||||
}
|
||||
|
||||
EnsureHashDistributedTable(sourceRelationId);
|
||||
EnsureHashDistributedTable(targetRelationId);
|
||||
EnsureHashOrSingleShardDistributedTable(sourceRelationId);
|
||||
EnsureHashOrSingleShardDistributedTable(targetRelationId);
|
||||
CheckReplicationModel(sourceRelationId, targetRelationId);
|
||||
CheckDistributionColumnType(sourceRelationId, targetRelationId);
|
||||
|
||||
|
|
|
@ -381,6 +381,7 @@ extern char * TableOwner(Oid relationId);
|
|||
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
|
||||
extern void EnsureTableOwner(Oid relationId);
|
||||
extern void EnsureHashDistributedTable(Oid relationId);
|
||||
extern void EnsureHashOrSingleShardDistributedTable(Oid relationId);
|
||||
extern void EnsureFunctionOwner(Oid functionId);
|
||||
extern void EnsureSuperUser(void);
|
||||
extern void ErrorIfTableIsACatalogTable(Relation relation);
|
||||
|
|
|
@ -11,6 +11,7 @@ s/localhost:[0-9]+/localhost:xxxxx/g
|
|||
s/ port=[0-9]+ / port=xxxxx /g
|
||||
s/placement [0-9]+/placement xxxxx/g
|
||||
s/shard [0-9]+/shard xxxxx/g
|
||||
s/Shard [0-9]+/Shard xxxxx/g
|
||||
s/assigned task [0-9]+ to node/assigned task to node/
|
||||
s/node group [12] (but|does)/node group \1/
|
||||
|
||||
|
|
|
@ -693,7 +693,7 @@ BEGIN;
|
|||
ROLLBACK;
|
||||
-- should fail --
|
||||
SELECT update_distributed_table_colocation('citus_local_table_4', colocate_with => 'none');
|
||||
ERROR: relation citus_local_table_4 should be a hash distributed table
|
||||
ERROR: relation citus_local_table_4 should be a hash or single shard distributed table
|
||||
SELECT master_create_empty_shard('citus_local_table_4');
|
||||
ERROR: relation "citus_local_table_4" is a local table
|
||||
-- return true
|
||||
|
|
|
@ -1317,7 +1317,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
|||
UPDATE pg_dist_partition SET partmethod = 'a'
|
||||
WHERE logicalrelid = 'test_2'::regclass;
|
||||
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||
ERROR: Updating colocation ids are only allowed for hash distributed tables: a
|
||||
ERROR: Updating colocation ids are only allowed for hash and single shard distributed tables: a
|
||||
ROLLBACK;
|
||||
-- colocated hash distributed table should have the same dist key columns
|
||||
CREATE TABLE test_5(int_col int, text_col text);
|
||||
|
|
|
@ -868,9 +868,9 @@ ERROR: cannot colocate tables table1_groupd and table1_groupb
|
|||
DETAIL: Shard counts don't match for table1_groupd and table1_groupb.
|
||||
SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupE');
|
||||
ERROR: cannot colocate tables table1_groupe and table1_groupb
|
||||
DETAIL: Shard 1300050 of table1_groupe and shard xxxxx of table1_groupb have different number of shard placements.
|
||||
DETAIL: Shard xxxxx of table1_groupe and shard xxxxx of table1_groupb have different number of shard placements.
|
||||
SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupF');
|
||||
ERROR: relation table1_groupf should be a hash distributed table
|
||||
ERROR: relation table1_groupf should be a hash or single shard distributed table
|
||||
SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupD');
|
||||
ERROR: cannot colocate tables table1_groupd and table1_groupb
|
||||
DETAIL: Shard counts don't match for table1_groupd and table1_groupb.
|
||||
|
@ -1369,9 +1369,9 @@ SELECT tables_colocated('d2', 'none');
|
|||
|
||||
-- make sure reference and local tables return an error.
|
||||
SELECT update_distributed_table_colocation('ref', colocate_with => 'none');
|
||||
ERROR: relation ref should be a hash distributed table
|
||||
ERROR: relation ref should be a hash or single shard distributed table
|
||||
SELECT update_distributed_table_colocation('local_table', colocate_with => 'none');
|
||||
ERROR: relation local_table should be a hash distributed table
|
||||
ERROR: relation local_table should be a hash or single shard distributed table
|
||||
-- make sure that different types cannot be colocated
|
||||
SELECT update_distributed_table_colocation('different_d1', colocate_with => 'd1');
|
||||
ERROR: cannot colocate tables d1 and different_d1
|
||||
|
@ -1381,13 +1381,13 @@ ERROR: cannot colocate tables different_d1 and d1
|
|||
DETAIL: Distribution column types don't match for different_d1 and d1.
|
||||
-- make sure that append distributed tables cannot be colocated
|
||||
SELECT update_distributed_table_colocation('append_table', colocate_with => 'd1');
|
||||
ERROR: relation append_table should be a hash distributed table
|
||||
ERROR: relation append_table should be a hash or single shard distributed table
|
||||
SELECT update_distributed_table_colocation('d1', colocate_with => 'append_table');
|
||||
ERROR: relation append_table should be a hash distributed table
|
||||
ERROR: relation append_table should be a hash or single shard distributed table
|
||||
SELECT update_distributed_table_colocation('range_table', colocate_with => 'd1');
|
||||
ERROR: relation range_table should be a hash distributed table
|
||||
ERROR: relation range_table should be a hash or single shard distributed table
|
||||
SELECT update_distributed_table_colocation('d1', colocate_with => 'range_table');
|
||||
ERROR: relation range_table should be a hash distributed table
|
||||
ERROR: relation range_table should be a hash or single shard distributed table
|
||||
-- drop tables to clean test space
|
||||
DROP TABLE table1_groupb;
|
||||
DROP TABLE table2_groupb;
|
||||
|
|
|
@ -1203,9 +1203,9 @@ RESET client_min_messages;
|
|||
-- some tests for mark_tables_colocated
|
||||
-- should error out
|
||||
SELECT update_distributed_table_colocation('colocated_table_test_2', colocate_with => 'reference_table_test');
|
||||
ERROR: relation reference_table_test should be a hash distributed table
|
||||
ERROR: relation reference_table_test should be a hash or single shard distributed table
|
||||
SELECT update_distributed_table_colocation('reference_table_test', colocate_with => 'reference_table_test_fifth');
|
||||
ERROR: relation reference_table_test_fifth should be a hash distributed table
|
||||
ERROR: relation reference_table_test_fifth should be a hash or single shard distributed table
|
||||
-- ensure that reference tables on
|
||||
-- different queries works as expected
|
||||
CREATE SCHEMA reference_schema;
|
||||
|
|
|
@ -131,9 +131,6 @@ BEGIN;
|
|||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- should fail --
|
||||
SELECT update_distributed_table_colocation('null_dist_key_table', colocate_with => 'none');
|
||||
ERROR: relation null_dist_key_table should be a hash distributed table
|
||||
SELECT master_create_empty_shard('null_dist_key_table');
|
||||
ERROR: relation "null_dist_key_table" is a single shard table
|
||||
DETAIL: We currently don't support creating shards on single shard tables
|
||||
|
@ -236,5 +233,126 @@ SELECT COUNT(*) = 0 FROM pg_dist_shard WHERE logicalrelid::text LIKE '%null_dist
|
|||
t
|
||||
(1 row)
|
||||
|
||||
-- test update_distributed_table_colocation
|
||||
CREATE TABLE update_col_1 (a INT);
|
||||
CREATE TABLE update_col_2 (a INT);
|
||||
CREATE TABLE update_col_3 (a INT);
|
||||
-- create colocated single shard distributed tables, so the shards will be
|
||||
-- in the same worker node
|
||||
SELECT create_distributed_table ('update_col_1', null, colocate_with:='none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table ('update_col_2', null, colocate_with:='update_col_1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- now create a third single shard distributed table that is not colocated,
|
||||
-- with the new colocation id the new table will be in the other worker node
|
||||
SELECT create_distributed_table ('update_col_3', null, colocate_with:='none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- make sure nodes are correct
|
||||
SELECT c1.nodeport = c2.nodeport AS same_node
|
||||
FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2' AND
|
||||
p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND
|
||||
p1.noderole = 'primary' AND p2.noderole = 'primary';
|
||||
same_node
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT c1.nodeport = c2.nodeport AS same_node
|
||||
FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_3' AND
|
||||
p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND
|
||||
p1.noderole = 'primary' AND p2.noderole = 'primary';
|
||||
same_node
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
-- and the update_col_1 and update_col_2 are colocated
|
||||
SELECT c1.colocation_id = c2.colocation_id AS colocated
|
||||
FROM public.citus_tables c1, public.citus_tables c2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2';
|
||||
colocated
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- break the colocation
|
||||
SELECT update_distributed_table_colocation('update_col_2', colocate_with:='none');
|
||||
update_distributed_table_colocation
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT c1.colocation_id = c2.colocation_id AS colocated
|
||||
FROM public.citus_tables c1, public.citus_tables c2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2';
|
||||
colocated
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
-- re-colocate, the shards were already in the same node
|
||||
SELECT update_distributed_table_colocation('update_col_2', colocate_with:='update_col_1');
|
||||
update_distributed_table_colocation
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT c1.colocation_id = c2.colocation_id AS colocated
|
||||
FROM public.citus_tables c1, public.citus_tables c2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2';
|
||||
colocated
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- update_col_1 and update_col_3 are not colocated, because they are not in the some node
|
||||
SELECT c1.colocation_id = c2.colocation_id AS colocated
|
||||
FROM public.citus_tables c1, public.citus_tables c2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_3';
|
||||
colocated
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
-- they should not be able to be colocated since the shards are in different nodes
|
||||
SELECT update_distributed_table_colocation('update_col_3', colocate_with:='update_col_1');
|
||||
ERROR: cannot colocate tables update_col_1 and update_col_3
|
||||
DETAIL: Shard xxxxx of update_col_1 and shard xxxxx of update_col_3 are not colocated.
|
||||
SELECT c1.colocation_id = c2.colocation_id AS colocated
|
||||
FROM public.citus_tables c1, public.citus_tables c2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_3';
|
||||
colocated
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
-- hash distributed and single shard distributed tables cannot be colocated
|
||||
CREATE TABLE update_col_4 (a INT);
|
||||
SELECT create_distributed_table ('update_col_4', 'a', colocate_with:='none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT update_distributed_table_colocation('update_col_1', colocate_with:='update_col_4');
|
||||
ERROR: cannot colocate tables update_col_4 and update_col_1
|
||||
DETAIL: Distribution column types don't match for update_col_4 and update_col_1.
|
||||
SELECT update_distributed_table_colocation('update_col_4', colocate_with:='update_col_1');
|
||||
ERROR: cannot colocate tables update_col_1 and update_col_4
|
||||
DETAIL: Distribution column types don't match for update_col_1 and update_col_4.
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA null_dist_key_udfs CASCADE;
|
||||
NOTICE: drop cascades to table null_dist_key_table
|
||||
|
|
|
@ -33,6 +33,7 @@ test: ref_citus_local_fkeys
|
|||
test: alter_database_owner
|
||||
test: distributed_triggers
|
||||
test: create_single_shard_table
|
||||
# don't parallelize single_shard_table_udfs to make sure colocation ids are sequential
|
||||
test: single_shard_table_udfs
|
||||
test: schema_based_sharding
|
||||
|
||||
|
|
|
@ -53,10 +53,6 @@ BEGIN;
|
|||
select count(*) from pg_dist_partition where logicalrelid='null_dist_key_table'::regclass;
|
||||
ROLLBACK;
|
||||
|
||||
-- should fail --
|
||||
|
||||
SELECT update_distributed_table_colocation('null_dist_key_table', colocate_with => 'none');
|
||||
|
||||
SELECT master_create_empty_shard('null_dist_key_table');
|
||||
|
||||
-- return true
|
||||
|
@ -102,4 +98,70 @@ SELECT COUNT(*) = 0 FROM pg_dist_partition WHERE logicalrelid::text LIKE '%null_
|
|||
SELECT COUNT(*) = 0 FROM pg_dist_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text LIKE '%null_dist_key_table%');
|
||||
SELECT COUNT(*) = 0 FROM pg_dist_shard WHERE logicalrelid::text LIKE '%null_dist_key_table%';
|
||||
|
||||
-- test update_distributed_table_colocation
|
||||
CREATE TABLE update_col_1 (a INT);
|
||||
CREATE TABLE update_col_2 (a INT);
|
||||
CREATE TABLE update_col_3 (a INT);
|
||||
|
||||
-- create colocated single shard distributed tables, so the shards will be
|
||||
-- in the same worker node
|
||||
SELECT create_distributed_table ('update_col_1', null, colocate_with:='none');
|
||||
SELECT create_distributed_table ('update_col_2', null, colocate_with:='update_col_1');
|
||||
|
||||
-- now create a third single shard distributed table that is not colocated,
|
||||
-- with the new colocation id the new table will be in the other worker node
|
||||
SELECT create_distributed_table ('update_col_3', null, colocate_with:='none');
|
||||
|
||||
-- make sure nodes are correct
|
||||
SELECT c1.nodeport = c2.nodeport AS same_node
|
||||
FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2' AND
|
||||
p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND
|
||||
p1.noderole = 'primary' AND p2.noderole = 'primary';
|
||||
|
||||
SELECT c1.nodeport = c2.nodeport AS same_node
|
||||
FROM citus_shards c1, citus_shards c2, pg_dist_node p1, pg_dist_node p2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_3' AND
|
||||
p1.nodeport = c1.nodeport AND p2.nodeport = c2.nodeport AND
|
||||
p1.noderole = 'primary' AND p2.noderole = 'primary';
|
||||
|
||||
-- and the update_col_1 and update_col_2 are colocated
|
||||
SELECT c1.colocation_id = c2.colocation_id AS colocated
|
||||
FROM public.citus_tables c1, public.citus_tables c2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2';
|
||||
|
||||
-- break the colocation
|
||||
SELECT update_distributed_table_colocation('update_col_2', colocate_with:='none');
|
||||
|
||||
SELECT c1.colocation_id = c2.colocation_id AS colocated
|
||||
FROM public.citus_tables c1, public.citus_tables c2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2';
|
||||
|
||||
-- re-colocate, the shards were already in the same node
|
||||
SELECT update_distributed_table_colocation('update_col_2', colocate_with:='update_col_1');
|
||||
|
||||
SELECT c1.colocation_id = c2.colocation_id AS colocated
|
||||
FROM public.citus_tables c1, public.citus_tables c2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_2';
|
||||
|
||||
-- update_col_1 and update_col_3 are not colocated, because they are not in the some node
|
||||
SELECT c1.colocation_id = c2.colocation_id AS colocated
|
||||
FROM public.citus_tables c1, public.citus_tables c2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_3';
|
||||
|
||||
-- they should not be able to be colocated since the shards are in different nodes
|
||||
SELECT update_distributed_table_colocation('update_col_3', colocate_with:='update_col_1');
|
||||
|
||||
SELECT c1.colocation_id = c2.colocation_id AS colocated
|
||||
FROM public.citus_tables c1, public.citus_tables c2
|
||||
WHERE c1.table_name::text = 'update_col_1' AND c2.table_name::text = 'update_col_3';
|
||||
|
||||
-- hash distributed and single shard distributed tables cannot be colocated
|
||||
CREATE TABLE update_col_4 (a INT);
|
||||
SELECT create_distributed_table ('update_col_4', 'a', colocate_with:='none');
|
||||
|
||||
SELECT update_distributed_table_colocation('update_col_1', colocate_with:='update_col_4');
|
||||
SELECT update_distributed_table_colocation('update_col_4', colocate_with:='update_col_1');
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA null_dist_key_udfs CASCADE;
|
||||
|
|
Loading…
Reference in New Issue