From 6b8df85a88df700fb6bdcc78b60b4d10780ab88f Mon Sep 17 00:00:00 2001 From: Vinod Sridharan <14185211+visridha@users.noreply.github.com> Date: Wed, 3 Apr 2024 19:28:26 +0000 Subject: [PATCH 1/6] Update distributed placement for single shard tables --- .../commands/create_distributed_table.c | 15 ++++-- .../distributed/operations/create_shards.c | 6 ++- .../distributed/coordinator_protocol.h | 3 +- .../regress/expected/multi_create_table.out | 48 +++++++++++++++++++ src/test/regress/sql/multi_create_table.sql | 24 ++++++++++ 5 files changed, 88 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 5ec6d6dd7..a955d09aa 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -146,7 +146,9 @@ static void ConvertCitusLocalTableToTableType(Oid relationId, DistributedTableParams * distributedTableParams); static void CreateHashDistributedTableShards(Oid relationId, int shardCount, - Oid colocatedTableId, bool localTableEmpty); + Oid colocatedTableId, + bool localTableEmpty, + uint32 colocationId); static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId, uint32 colocationId); static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType, @@ -1288,9 +1290,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, if (tableType == HASH_DISTRIBUTED) { /* create shards for hash distributed table */ - CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, + CreateHashDistributedTableShards(relationId, + distributedTableParams->shardCount, colocatedTableId, - localTableEmpty); + localTableEmpty, + colocationId); } else if (tableType == REFERENCE_TABLE) { @@ -1878,7 +1882,8 @@ DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTable */ static void CreateHashDistributedTableShards(Oid relationId, int shardCount, - Oid colocatedTableId, bool localTableEmpty) + Oid colocatedTableId, bool localTableEmpty, + uint32 colocationId) { bool useExclusiveConnection = false; @@ -1917,7 +1922,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount, * we can directly use ShardReplicationFactor global variable here. */ CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor, - useExclusiveConnection); + useExclusiveConnection, colocationId); } } diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 962547051..27fb8c04f 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -81,7 +81,8 @@ master_create_worker_shards(PG_FUNCTION_ARGS) */ void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, - int32 replicationFactor, bool useExclusiveConnections) + int32 replicationFactor, bool useExclusiveConnections, + uint32 colocationId) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); List *insertedShardPlacements = NIL; @@ -162,10 +163,11 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(distributedTableId); + int64 shardOffset = shardCount == 1 ? colocationId : 0; for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) { - uint32 roundRobinNodeIndex = shardIndex % workerNodeCount; + uint32 roundRobinNodeIndex = (shardIndex + shardOffset) % workerNodeCount; /* initialize the hash token space for this shard */ int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement); diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index b2170fd2e..8c25ae7ab 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -259,7 +259,8 @@ extern void InsertShardPlacementRows(Oid relationId, int64 shardId, extern uint64 UpdateShardStatistics(int64 shardId); extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, int32 replicationFactor, - bool useExclusiveConnections); + bool useExclusiveConnections, + uint32 colocationId); extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool useExclusiveConnections); extern void CreateReferenceTableShard(Oid distributedTableId); diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 61c3c8fe1..8d1ca713f 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -466,3 +466,51 @@ select create_reference_table('temp_table'); ERROR: cannot distribute a temporary table DROP TABLE temp_table; DROP TABLE shard_count_table_3; +-- test shard count 1 placement with colocate none. +-- create a base table instance +CREATE TABLE shard_count_table_1_inst_1 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- create another table with similar requirements +CREATE TABLE shard_count_table_1_inst_2 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Now check placement: +SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- double check shard counts +SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- check placement: These should be placed on different workers. +SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset +SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset +SELECT :'inst_1_node_endpoint' = :'inst_2_node_endpoint'; + ?column? +--------------------------------------------------------------------- + f +(1 row) + +DROP TABLE shard_count_table_1_inst_1; +DROP TABLE shard_count_table_1_inst_2; diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 9bf340909..34afb3efb 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -291,3 +291,27 @@ select create_reference_table('temp_table'); DROP TABLE temp_table; DROP TABLE shard_count_table_3; + +-- test shard count 1 placement with colocate none. +-- create a base table instance +CREATE TABLE shard_count_table_1_inst_1 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none'); + +-- create another table with similar requirements +CREATE TABLE shard_count_table_1_inst_2 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none'); + +-- Now check placement: +SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); + +-- double check shard counts +SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); +SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass; + +-- check placement: These should be placed on different workers. +SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset +SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset +SELECT :'inst_1_node_endpoint' = :'inst_2_node_endpoint'; + +DROP TABLE shard_count_table_1_inst_1; +DROP TABLE shard_count_table_1_inst_2; \ No newline at end of file From 86de575bcc299ffd8792e3c1e1ff95d1f8e753bd Mon Sep 17 00:00:00 2001 From: Vinod Sridharan <14185211+visridha@users.noreply.github.com> Date: Wed, 3 Apr 2024 19:39:20 +0000 Subject: [PATCH 2/6] Fix style checks --- src/test/regress/sql/multi_create_table.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 34afb3efb..be7df50ea 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -314,4 +314,4 @@ SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHE SELECT :'inst_1_node_endpoint' = :'inst_2_node_endpoint'; DROP TABLE shard_count_table_1_inst_1; -DROP TABLE shard_count_table_1_inst_2; \ No newline at end of file +DROP TABLE shard_count_table_1_inst_2; From e989e5b00edfac99d98e2370b0e6727bb8c094e0 Mon Sep 17 00:00:00 2001 From: Vinod Sridharan <14185211+visridha@users.noreply.github.com> Date: Wed, 3 Apr 2024 23:09:52 +0000 Subject: [PATCH 3/6] more --- .../distributed/operations/create_shards.c | 10 ++- .../expected/multi_mx_create_table.out | 20 +----- .../regress/expected/multi_router_planner.out | 67 ++++++++++++++++--- 3 files changed, 68 insertions(+), 29 deletions(-) diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 27fb8c04f..fe26d9ed4 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -163,7 +163,15 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(distributedTableId); - int64 shardOffset = shardCount == 1 ? colocationId : 0; + + int64 shardOffset = 0; + if (shardCount == 1 && shardStorageType == SHARD_STORAGE_TABLE) + { + /* For single shard distributed tables, use the colocationId to offset + * where the shard is placed. + */ + shardOffset = colocationId; + } for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) { diff --git a/src/test/regress/expected/multi_mx_create_table.out b/src/test/regress/expected/multi_mx_create_table.out index b9d3f7faa..29b1d0430 100644 --- a/src/test/regress/expected/multi_mx_create_table.out +++ b/src/test/regress/expected/multi_mx_create_table.out @@ -540,12 +540,6 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR articles_hash_mx_1220104 | articles_hash_mx | distributed | 0 articles_hash_mx_1220105 | articles_hash_mx | distributed | 0 articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 - articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 - articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 - articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 - articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 - articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 - articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0 citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0 citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0 @@ -715,12 +709,6 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR customer_mx_1220084 | customer_mx | reference | 0 customer_mx_1220084 | customer_mx | reference | 0 labs_mx_1220102 | labs_mx | distributed | 8192 - labs_mx_1220102 | labs_mx | distributed | 8192 - labs_mx_1220102 | labs_mx | distributed | 8192 - labs_mx_1220102 | labs_mx | distributed | 8192 - labs_mx_1220102 | labs_mx | distributed | 8192 - labs_mx_1220102 | labs_mx | distributed | 8192 - labs_mx_1220102 | labs_mx | distributed | 8192 limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 @@ -890,12 +878,6 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR nation_mx_1220085 | nation_mx | reference | 0 nation_mx_1220085 | nation_mx | reference | 0 objects_mx_1220103 | objects_mx | distributed | 16384 - objects_mx_1220103 | objects_mx | distributed | 16384 - objects_mx_1220103 | objects_mx | distributed | 16384 - objects_mx_1220103 | objects_mx | distributed | 16384 - objects_mx_1220103 | objects_mx | distributed | 16384 - objects_mx_1220103 | objects_mx | distributed | 16384 - objects_mx_1220103 | objects_mx | distributed | 16384 orders_mx_1220068 | orders_mx | distributed | 8192 orders_mx_1220068 | orders_mx | distributed | 8192 orders_mx_1220068 | orders_mx | distributed | 8192 @@ -984,7 +966,7 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR supplier_mx_1220087 | supplier_mx | reference | 0 supplier_mx_1220087 | supplier_mx | reference | 0 supplier_mx_1220087 | supplier_mx | reference | 0 -(469 rows) +(451 rows) -- Show that altering type name is not supported from worker node ALTER TYPE citus_mx_test_schema.order_side_mx RENAME TO temp_order_side_mx; diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index fee821a7d..2b4473987 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -774,15 +774,64 @@ SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id ORDER BY 1,2 LIMIT 3; -DEBUG: Creating router plan -DEBUG: query has a single distribution column value: 10 - first_author | second_word_count ---------------------------------------------------------------------- - 10 | 19519 - 10 | 19519 - 10 | 19519 -(3 rows) - +DEBUG: found no worker with all shard placements +DEBUG: push down of limit count: 3 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 0 and 4 +DEBUG: join prunable for task partitionId 0 and 5 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 1 and 4 +DEBUG: join prunable for task partitionId 1 and 5 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 2 and 4 +DEBUG: join prunable for task partitionId 2 and 5 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: join prunable for task partitionId 3 and 4 +DEBUG: join prunable for task partitionId 3 and 5 +DEBUG: join prunable for task partitionId 4 and 0 +DEBUG: join prunable for task partitionId 4 and 1 +DEBUG: join prunable for task partitionId 4 and 2 +DEBUG: join prunable for task partitionId 4 and 3 +DEBUG: join prunable for task partitionId 4 and 5 +DEBUG: join prunable for task partitionId 5 and 0 +DEBUG: join prunable for task partitionId 5 and 1 +DEBUG: join prunable for task partitionId 5 and 2 +DEBUG: join prunable for task partitionId 5 and 3 +DEBUG: join prunable for task partitionId 5 and 4 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 13 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 14 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 16 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 17 +DETAIL: Creating dependency on merge taskId 12 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning SET citus.enable_non_colocated_router_query_pushdown TO OFF; -- but this is not the case otherwise SELECT a.author_id as first_author, b.word_count as second_word_count From 45804c0e5d58a30745494fb24faac0226e912127 Mon Sep 17 00:00:00 2001 From: Vinod Sridharan <14185211+visridha@users.noreply.github.com> Date: Thu, 4 Apr 2024 00:33:06 +0000 Subject: [PATCH 4/6] more --- src/backend/distributed/operations/create_shards.c | 6 +++++- src/backend/distributed/shared_library_init.c | 12 ++++++++++++ src/include/distributed/pg_dist_shard.h | 1 + src/test/regress/expected/multi_create_table.out | 1 + src/test/regress/sql/multi_create_table.sql | 1 + 5 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index fe26d9ed4..59312d08b 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -54,6 +54,9 @@ #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" +/* Config variables managed via guc */ +bool EnableSingleShardTableMultiNodePlacement = false; + /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_create_worker_shards); @@ -165,7 +168,8 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, char shardStorageType = ShardStorageType(distributedTableId); int64 shardOffset = 0; - if (shardCount == 1 && shardStorageType == SHARD_STORAGE_TABLE) + if (EnableSingleShardTableMultiNodePlacement && shardCount == 1 + && shardStorageType == SHARD_STORAGE_TABLE) { /* For single shard distributed tables, use the colocationId to offset * where the shard is placed. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 45e212e8b..f004e8c43 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -86,6 +86,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/pg_dist_partition.h" +#include "distributed/pg_dist_shard.h" #include "distributed/placement_connection.h" #include "distributed/priority.h" #include "distributed/query_pushdown_planning.h" @@ -1462,6 +1463,17 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_single_shard_table_multi_node_placement", + gettext_noop("Enables placement of single shard distributed tables in" + " all nodes of the cluster"), + NULL, + &EnableSingleShardTableMultiNodePlacement, + false, + PGC_USERSET, + GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_statistics_collection", gettext_noop("Enables sending basic usage statistics to Citus."), diff --git a/src/include/distributed/pg_dist_shard.h b/src/include/distributed/pg_dist_shard.h index 5c98b755f..502e9e180 100644 --- a/src/include/distributed/pg_dist_shard.h +++ b/src/include/distributed/pg_dist_shard.h @@ -59,5 +59,6 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard; #define SHARD_STORAGE_TABLE 't' #define SHARD_STORAGE_VIRTUAL 'v' +extern bool EnableSingleShardTableMultiNodePlacement; #endif /* PG_DIST_SHARD_H */ diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 8d1ca713f..3e89fd8d0 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -468,6 +468,7 @@ DROP TABLE temp_table; DROP TABLE shard_count_table_3; -- test shard count 1 placement with colocate none. -- create a base table instance +set citus.enable_single_hash_repartition_joins to on; CREATE TABLE shard_count_table_1_inst_1 (a int); SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none'); create_distributed_table diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index be7df50ea..e2165fc78 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -294,6 +294,7 @@ DROP TABLE shard_count_table_3; -- test shard count 1 placement with colocate none. -- create a base table instance +set citus.enable_single_hash_repartition_joins to on; CREATE TABLE shard_count_table_1_inst_1 (a int); SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none'); From 80c982176e4eccce68ddc535c0c4ec78ba448aa8 Mon Sep 17 00:00:00 2001 From: Vinod Sridharan <14185211+visridha@users.noreply.github.com> Date: Thu, 4 Apr 2024 01:06:26 +0000 Subject: [PATCH 5/6] make config --- .../regress/expected/multi_create_table.out | 8 +-- .../expected/multi_mx_create_table.out | 57 +++++++++++++++++-- src/test/regress/sql/multi_create_table.sql | 4 +- .../regress/sql/multi_mx_create_table.sql | 8 +++ 4 files changed, 67 insertions(+), 10 deletions(-) diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 3e89fd8d0..72115bdc7 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -468,7 +468,7 @@ DROP TABLE temp_table; DROP TABLE shard_count_table_3; -- test shard count 1 placement with colocate none. -- create a base table instance -set citus.enable_single_hash_repartition_joins to on; +set citus.enable_single_shard_table_multi_node_placement to on; CREATE TABLE shard_count_table_1_inst_1 (a int); SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none'); create_distributed_table @@ -507,10 +507,10 @@ SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1 -- check placement: These should be placed on different workers. SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset -SELECT :'inst_1_node_endpoint' = :'inst_2_node_endpoint'; - ?column? +SELECT :'inst_1_node_endpoint', :'inst_2_node_endpoint', :'inst_1_node_endpoint' = :'inst_2_node_endpoint'; + ?column? | ?column? | ?column? --------------------------------------------------------------------- - f + localhost:xxxxx | localhost:xxxxx | f (1 row) DROP TABLE shard_count_table_1_inst_1; diff --git a/src/test/regress/expected/multi_mx_create_table.out b/src/test/regress/expected/multi_mx_create_table.out index 29b1d0430..aee8999c0 100644 --- a/src/test/regress/expected/multi_mx_create_table.out +++ b/src/test/regress/expected/multi_mx_create_table.out @@ -416,6 +416,23 @@ SELECT create_distributed_table('company_employees_mx', 'company_id'); (1 row) +CREATE TABLE articles_single_shard_hash_mx_partition_inst1 (LIKE articles_single_shard_hash_mx); +CREATE TABLE articles_single_shard_hash_mx_partition_inst2 (LIKE articles_single_shard_hash_mx); +SET citus.shard_count TO 1; +SET citus.enable_single_shard_table_multi_node_placement to on; +SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst1', 'author_id', colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst2', 'author_id', colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +set citus.enable_single_shard_table_multi_node_placement to off; WITH shard_counts AS ( SELECT logicalrelid, count(*) AS shard_count FROM pg_dist_shard GROUP BY logicalrelid ) @@ -447,7 +464,9 @@ ORDER BY colocationid, logicalrelid; labs_mx | 1220007 | 1 | h | s objects_mx | 1220007 | 1 | h | s articles_single_shard_hash_mx | 1220007 | 1 | h | s -(23 rows) + articles_single_shard_hash_mx_partition_inst1 | 1220008 | 1 | h | s + articles_single_shard_hash_mx_partition_inst2 | 1220009 | 1 | h | s +(25 rows) -- check the citus_tables view SELECT table_name, citus_table_type, distribution_column, shard_count, table_owner @@ -458,6 +477,8 @@ ORDER BY table_name::text; app_analytics_events_mx | distributed | app_id | 4 | postgres articles_hash_mx | distributed | author_id | 2 | postgres articles_single_shard_hash_mx | distributed | author_id | 1 | postgres + articles_single_shard_hash_mx_partition_inst1 | distributed | author_id | 1 | postgres + articles_single_shard_hash_mx_partition_inst2 | distributed | author_id | 1 | postgres citus_mx_test_schema.nation_hash | distributed | n_nationkey | 16 | postgres citus_mx_test_schema.nation_hash_collation_search_path | distributed | n_nationkey | 4 | postgres citus_mx_test_schema.nation_hash_composite_types | distributed | n_nationkey | 4 | postgres @@ -478,7 +499,7 @@ ORDER BY table_name::text; part_mx | reference | | 1 | postgres researchers_mx | distributed | lab_id | 2 | postgres supplier_mx | reference | | 1 | postgres -(23 rows) +(25 rows) \c - - - :worker_1_port SET client_min_messages TO WARNING; @@ -490,6 +511,8 @@ ORDER BY table_name::text; app_analytics_events_mx | distributed | app_id | 4 | postgres articles_hash_mx | distributed | author_id | 2 | postgres articles_single_shard_hash_mx | distributed | author_id | 1 | postgres + articles_single_shard_hash_mx_partition_inst1 | distributed | author_id | 1 | postgres + articles_single_shard_hash_mx_partition_inst2 | distributed | author_id | 1 | postgres citus_mx_test_schema.nation_hash | distributed | n_nationkey | 16 | postgres citus_mx_test_schema.nation_hash_collation_search_path | distributed | n_nationkey | 4 | postgres citus_mx_test_schema.nation_hash_composite_types | distributed | n_nationkey | 4 | postgres @@ -510,7 +533,7 @@ ORDER BY table_name::text; part_mx | reference | | 1 | postgres researchers_mx | distributed | lab_id | 2 | postgres supplier_mx | reference | | 1 | postgres -(23 rows) +(25 rows) SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards ORDER BY shard_name::text; shard_name | table_name | citus_table_type | shard_size @@ -540,6 +563,20 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR articles_hash_mx_1220104 | articles_hash_mx | distributed | 0 articles_hash_mx_1220105 | articles_hash_mx | distributed | 0 articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 + articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 + articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 + articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 + articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 + articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 + articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst2_1220112 | articles_single_shard_hash_mx_partition_inst2 | distributed | 0 citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0 citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0 citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0 @@ -709,6 +746,12 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR customer_mx_1220084 | customer_mx | reference | 0 customer_mx_1220084 | customer_mx | reference | 0 labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 + labs_mx_1220102 | labs_mx | distributed | 8192 limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384 @@ -878,6 +921,12 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR nation_mx_1220085 | nation_mx | reference | 0 nation_mx_1220085 | nation_mx | reference | 0 objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 + objects_mx_1220103 | objects_mx | distributed | 16384 orders_mx_1220068 | orders_mx | distributed | 8192 orders_mx_1220068 | orders_mx | distributed | 8192 orders_mx_1220068 | orders_mx | distributed | 8192 @@ -966,7 +1015,7 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR supplier_mx_1220087 | supplier_mx | reference | 0 supplier_mx_1220087 | supplier_mx | reference | 0 supplier_mx_1220087 | supplier_mx | reference | 0 -(451 rows) +(477 rows) -- Show that altering type name is not supported from worker node ALTER TYPE citus_mx_test_schema.order_side_mx RENAME TO temp_order_side_mx; diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index e2165fc78..fe26a84ca 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -294,7 +294,7 @@ DROP TABLE shard_count_table_3; -- test shard count 1 placement with colocate none. -- create a base table instance -set citus.enable_single_hash_repartition_joins to on; +set citus.enable_single_shard_table_multi_node_placement to on; CREATE TABLE shard_count_table_1_inst_1 (a int); SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none'); @@ -312,7 +312,7 @@ SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1 -- check placement: These should be placed on different workers. SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset -SELECT :'inst_1_node_endpoint' = :'inst_2_node_endpoint'; +SELECT :'inst_1_node_endpoint', :'inst_2_node_endpoint', :'inst_1_node_endpoint' = :'inst_2_node_endpoint'; DROP TABLE shard_count_table_1_inst_1; DROP TABLE shard_count_table_1_inst_2; diff --git a/src/test/regress/sql/multi_mx_create_table.sql b/src/test/regress/sql/multi_mx_create_table.sql index 4fb6eadbb..9a4545ba3 100644 --- a/src/test/regress/sql/multi_mx_create_table.sql +++ b/src/test/regress/sql/multi_mx_create_table.sql @@ -383,6 +383,14 @@ SET citus.shard_count TO 4; CREATE TABLE company_employees_mx (company_id int, employee_id int, manager_id int); SELECT create_distributed_table('company_employees_mx', 'company_id'); +CREATE TABLE articles_single_shard_hash_mx_partition_inst1 (LIKE articles_single_shard_hash_mx); +CREATE TABLE articles_single_shard_hash_mx_partition_inst2 (LIKE articles_single_shard_hash_mx); +SET citus.shard_count TO 1; +SET citus.enable_single_shard_table_multi_node_placement to on; +SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst1', 'author_id', colocate_with => 'none'); +SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst2', 'author_id', colocate_with => 'none'); +set citus.enable_single_shard_table_multi_node_placement to off; + WITH shard_counts AS ( SELECT logicalrelid, count(*) AS shard_count FROM pg_dist_shard GROUP BY logicalrelid ) From d2fc258e626e0054c9c62ae7b96dc7d228929cc9 Mon Sep 17 00:00:00 2001 From: Vinod Sridharan <14185211+visridha@users.noreply.github.com> Date: Thu, 4 Apr 2024 01:07:20 +0000 Subject: [PATCH 6/6] indent --- src/backend/distributed/operations/create_shards.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 59312d08b..85c9d59d0 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -168,8 +168,8 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, char shardStorageType = ShardStorageType(distributedTableId); int64 shardOffset = 0; - if (EnableSingleShardTableMultiNodePlacement && shardCount == 1 - && shardStorageType == SHARD_STORAGE_TABLE) + if (EnableSingleShardTableMultiNodePlacement && shardCount == 1 && + shardStorageType == SHARD_STORAGE_TABLE) { /* For single shard distributed tables, use the colocationId to offset * where the shard is placed.