diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 536f80291..9e7d0f4b0 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -148,7 +148,9 @@ static void ConvertCitusLocalTableToTableType(Oid relationId, DistributedTableParams * distributedTableParams); static void CreateHashDistributedTableShards(Oid relationId, int shardCount, - Oid colocatedTableId, bool localTableEmpty); + Oid colocatedTableId, + bool localTableEmpty, + uint32 colocationId); static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId, uint32 colocationId); static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType, @@ -1288,9 +1290,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, if (tableType == HASH_DISTRIBUTED) { /* create shards for hash distributed table */ - CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, + CreateHashDistributedTableShards(relationId, + distributedTableParams->shardCount, colocatedTableId, - localTableEmpty); + localTableEmpty, + colocationId); } else if (tableType == REFERENCE_TABLE) { @@ -1865,7 +1869,8 @@ DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTable */ static void CreateHashDistributedTableShards(Oid relationId, int shardCount, - Oid colocatedTableId, bool localTableEmpty) + Oid colocatedTableId, bool localTableEmpty, + uint32 colocationId) { bool useExclusiveConnection = false; @@ -1904,7 +1909,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount, * we can directly use ShardReplicationFactor global variable here. */ CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor, - useExclusiveConnection); + useExclusiveConnection, colocationId); } } diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 1553de92f..9b5bd3eec 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -54,6 +54,9 @@ #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" +/* Config variables managed via guc */ +bool EnableSingleShardTableMultiNodePlacement = false; + /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_create_worker_shards); @@ -81,7 +84,8 @@ master_create_worker_shards(PG_FUNCTION_ARGS) */ void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, - int32 replicationFactor, bool useExclusiveConnections) + int32 replicationFactor, bool useExclusiveConnections, + uint32 colocationId) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); List *insertedShardPlacements = NIL; @@ -163,9 +167,19 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(distributedTableId); + int64 shardOffset = 0; + if (EnableSingleShardTableMultiNodePlacement && shardCount == 1 && + shardStorageType == SHARD_STORAGE_TABLE) + { + /* For single shard distributed tables, use the colocationId to offset + * where the shard is placed. + */ + shardOffset = colocationId; + } + for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) { - uint32 roundRobinNodeIndex = shardIndex % workerNodeCount; + uint32 roundRobinNodeIndex = (shardIndex + shardOffset) % workerNodeCount; /* initialize the hash token space for this shard */ int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index a4146062e..51d58ab80 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -86,6 +86,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/pg_dist_partition.h" +#include "distributed/pg_dist_shard.h" #include "distributed/placement_connection.h" #include "distributed/priority.h" #include "distributed/query_pushdown_planning.h" @@ -1451,6 +1452,17 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_single_shard_table_multi_node_placement", + gettext_noop("Enables placement of single shard distributed tables in" + " all nodes of the cluster"), + NULL, + &EnableSingleShardTableMultiNodePlacement, + false, + PGC_USERSET, + GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_statistics_collection", gettext_noop("Enables sending basic usage statistics to Citus."), diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index b2170fd2e..8c25ae7ab 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -259,7 +259,8 @@ extern void InsertShardPlacementRows(Oid relationId, int64 shardId, extern uint64 UpdateShardStatistics(int64 shardId); extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, int32 replicationFactor, - bool useExclusiveConnections); + bool useExclusiveConnections, + uint32 colocationId); extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool useExclusiveConnections); extern void CreateReferenceTableShard(Oid distributedTableId); diff --git a/src/include/distributed/pg_dist_shard.h b/src/include/distributed/pg_dist_shard.h index 5c98b755f..502e9e180 100644 --- a/src/include/distributed/pg_dist_shard.h +++ b/src/include/distributed/pg_dist_shard.h @@ -59,5 +59,6 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard; #define SHARD_STORAGE_TABLE 't' #define SHARD_STORAGE_VIRTUAL 'v' +extern bool EnableSingleShardTableMultiNodePlacement; #endif /* PG_DIST_SHARD_H */ diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 61c3c8fe1..72115bdc7 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -466,3 +466,52 @@ select create_reference_table('temp_table'); ERROR: cannot distribute a temporary table DROP TABLE temp_table; DROP TABLE shard_count_table_3; +-- test shard count 1 placement with colocate none. +-- create a base table instance +set citus.enable_single_shard_table_multi_node_placement to on; +CREATE TABLE shard_count_table_1_inst_1 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- create another table with similar requirements +CREATE TABLE shard_count_table_1_inst_2 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Now check placement: +SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- double check shard counts +SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- check placement: These should be placed on different workers. +SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset +SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset +SELECT :'inst_1_node_endpoint', :'inst_2_node_endpoint', :'inst_1_node_endpoint' = :'inst_2_node_endpoint'; + ?column? | ?column? | ?column? +--------------------------------------------------------------------- + localhost:xxxxx | localhost:xxxxx | f +(1 row) + +DROP TABLE shard_count_table_1_inst_1; +DROP TABLE shard_count_table_1_inst_2; diff --git a/src/test/regress/expected/multi_mx_create_table.out b/src/test/regress/expected/multi_mx_create_table.out index f5882e5e7..29c6d1a96 100644 --- a/src/test/regress/expected/multi_mx_create_table.out +++ b/src/test/regress/expected/multi_mx_create_table.out @@ -423,6 +423,23 @@ SELECT create_distributed_table('company_employees_mx', 'company_id'); (1 row) +CREATE TABLE articles_single_shard_hash_mx_partition_inst1 (LIKE articles_single_shard_hash_mx); +CREATE TABLE articles_single_shard_hash_mx_partition_inst2 (LIKE articles_single_shard_hash_mx); +SET citus.shard_count TO 1; +SET citus.enable_single_shard_table_multi_node_placement to on; +SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst1', 'author_id', colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst2', 'author_id', colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +set citus.enable_single_shard_table_multi_node_placement to off; WITH shard_counts AS ( SELECT logicalrelid, count(*) AS shard_count FROM pg_dist_shard GROUP BY logicalrelid ) @@ -454,7 +471,9 @@ ORDER BY colocationid, logicalrelid; labs_mx | 1220007 | 1 | h | s objects_mx | 1220007 | 1 | h | s articles_single_shard_hash_mx | 1220007 | 1 | h | s -(23 rows) + articles_single_shard_hash_mx_partition_inst1 | 1220008 | 1 | h | s + articles_single_shard_hash_mx_partition_inst2 | 1220009 | 1 | h | s +(25 rows) -- check the citus_tables view SELECT table_name, citus_table_type, distribution_column, shard_count, table_owner @@ -465,6 +484,8 @@ ORDER BY table_name::text; app_analytics_events_mx | distributed | app_id | 4 | postgres articles_hash_mx | distributed | author_id | 2 | postgres articles_single_shard_hash_mx | distributed | author_id | 1 | postgres + articles_single_shard_hash_mx_partition_inst1 | distributed | author_id | 1 | postgres + articles_single_shard_hash_mx_partition_inst2 | distributed | author_id | 1 | postgres citus_mx_test_schema.nation_hash | distributed | n_nationkey | 16 | postgres citus_mx_test_schema.nation_hash_collation_search_path | distributed | n_nationkey | 4 | postgres citus_mx_test_schema.nation_hash_composite_types | distributed | n_nationkey | 4 | postgres @@ -485,7 +506,7 @@ ORDER BY table_name::text; part_mx | reference | | 1 | postgres researchers_mx | distributed | lab_id | 2 | postgres supplier_mx | reference | | 1 | postgres -(23 rows) +(25 rows) \c - - - :worker_1_port SET client_min_messages TO WARNING; @@ -497,6 +518,8 @@ ORDER BY table_name::text; app_analytics_events_mx | distributed | app_id | 4 | postgres articles_hash_mx | distributed | author_id | 2 | postgres articles_single_shard_hash_mx | distributed | author_id | 1 | postgres + articles_single_shard_hash_mx_partition_inst1 | distributed | author_id | 1 | postgres + articles_single_shard_hash_mx_partition_inst2 | distributed | author_id | 1 | postgres citus_mx_test_schema.nation_hash | distributed | n_nationkey | 16 | postgres citus_mx_test_schema.nation_hash_collation_search_path | distributed | n_nationkey | 4 | postgres citus_mx_test_schema.nation_hash_composite_types | distributed | n_nationkey | 4 | postgres @@ -517,7 +540,7 @@ ORDER BY table_name::text; part_mx | reference | | 1 | postgres researchers_mx | distributed | lab_id | 2 | postgres supplier_mx | reference | | 1 | postgres -(23 rows) +(25 rows) SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards ORDER BY shard_name::text; shard_name | table_name | citus_table_type | shard_size @@ -553,6 +576,14 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0 + articles_single_shard_hash_mx_partition_inst2_1220112 | articles_single_shard_hash_mx_partition_inst2 | distributed | 0 citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0 citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0 citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0 @@ -991,7 +1022,7 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR supplier_mx_1220087 | supplier_mx | reference | 0 supplier_mx_1220087 | supplier_mx | reference | 0 supplier_mx_1220087 | supplier_mx | reference | 0 -(469 rows) +(477 rows) -- Show that altering type name is not supported from worker node ALTER TYPE citus_mx_test_schema.order_side_mx RENAME TO temp_order_side_mx; diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index ce68d133d..1a95c5e6e 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -774,15 +774,64 @@ SELECT a.author_id as first_author, b.word_count as second_word_count FROM articles_hash a, articles_single_shard_hash b WHERE a.author_id = 10 and a.author_id = b.author_id ORDER BY 1,2 LIMIT 3; -DEBUG: Creating router plan -DEBUG: query has a single distribution column value: 10 - first_author | second_word_count ---------------------------------------------------------------------- - 10 | 19519 - 10 | 19519 - 10 | 19519 -(3 rows) - +DEBUG: found no worker with all shard placements +DEBUG: push down of limit count: 3 +DEBUG: join prunable for task partitionId 0 and 1 +DEBUG: join prunable for task partitionId 0 and 2 +DEBUG: join prunable for task partitionId 0 and 3 +DEBUG: join prunable for task partitionId 0 and 4 +DEBUG: join prunable for task partitionId 0 and 5 +DEBUG: join prunable for task partitionId 1 and 0 +DEBUG: join prunable for task partitionId 1 and 2 +DEBUG: join prunable for task partitionId 1 and 3 +DEBUG: join prunable for task partitionId 1 and 4 +DEBUG: join prunable for task partitionId 1 and 5 +DEBUG: join prunable for task partitionId 2 and 0 +DEBUG: join prunable for task partitionId 2 and 1 +DEBUG: join prunable for task partitionId 2 and 3 +DEBUG: join prunable for task partitionId 2 and 4 +DEBUG: join prunable for task partitionId 2 and 5 +DEBUG: join prunable for task partitionId 3 and 0 +DEBUG: join prunable for task partitionId 3 and 1 +DEBUG: join prunable for task partitionId 3 and 2 +DEBUG: join prunable for task partitionId 3 and 4 +DEBUG: join prunable for task partitionId 3 and 5 +DEBUG: join prunable for task partitionId 4 and 0 +DEBUG: join prunable for task partitionId 4 and 1 +DEBUG: join prunable for task partitionId 4 and 2 +DEBUG: join prunable for task partitionId 4 and 3 +DEBUG: join prunable for task partitionId 4 and 5 +DEBUG: join prunable for task partitionId 5 and 0 +DEBUG: join prunable for task partitionId 5 and 1 +DEBUG: join prunable for task partitionId 5 and 2 +DEBUG: join prunable for task partitionId 5 and 3 +DEBUG: join prunable for task partitionId 5 and 4 +DEBUG: pruning merge fetch taskId 1 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 2 +DETAIL: Creating dependency on merge taskId 2 +DEBUG: pruning merge fetch taskId 4 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 5 +DETAIL: Creating dependency on merge taskId 4 +DEBUG: pruning merge fetch taskId 7 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 8 +DETAIL: Creating dependency on merge taskId 6 +DEBUG: pruning merge fetch taskId 10 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 11 +DETAIL: Creating dependency on merge taskId 8 +DEBUG: pruning merge fetch taskId 13 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 14 +DETAIL: Creating dependency on merge taskId 10 +DEBUG: pruning merge fetch taskId 16 +DETAIL: Creating dependency on merge taskId 12 +DEBUG: pruning merge fetch taskId 17 +DETAIL: Creating dependency on merge taskId 12 +ERROR: the query contains a join that requires repartitioning +HINT: Set citus.enable_repartition_joins to on to enable repartitioning SET citus.enable_non_colocated_router_query_pushdown TO OFF; -- but this is not the case otherwise SELECT a.author_id as first_author, b.word_count as second_word_count diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 9bf340909..fe26a84ca 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -291,3 +291,28 @@ select create_reference_table('temp_table'); DROP TABLE temp_table; DROP TABLE shard_count_table_3; + +-- test shard count 1 placement with colocate none. +-- create a base table instance +set citus.enable_single_shard_table_multi_node_placement to on; +CREATE TABLE shard_count_table_1_inst_1 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none'); + +-- create another table with similar requirements +CREATE TABLE shard_count_table_1_inst_2 (a int); +SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none'); + +-- Now check placement: +SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); + +-- double check shard counts +SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass); +SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass; + +-- check placement: These should be placed on different workers. +SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset +SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset +SELECT :'inst_1_node_endpoint', :'inst_2_node_endpoint', :'inst_1_node_endpoint' = :'inst_2_node_endpoint'; + +DROP TABLE shard_count_table_1_inst_1; +DROP TABLE shard_count_table_1_inst_2; diff --git a/src/test/regress/sql/multi_mx_create_table.sql b/src/test/regress/sql/multi_mx_create_table.sql index 1a267b301..ad555cbfd 100644 --- a/src/test/regress/sql/multi_mx_create_table.sql +++ b/src/test/regress/sql/multi_mx_create_table.sql @@ -390,6 +390,14 @@ SET citus.shard_count TO 4; CREATE TABLE company_employees_mx (company_id int, employee_id int, manager_id int); SELECT create_distributed_table('company_employees_mx', 'company_id'); +CREATE TABLE articles_single_shard_hash_mx_partition_inst1 (LIKE articles_single_shard_hash_mx); +CREATE TABLE articles_single_shard_hash_mx_partition_inst2 (LIKE articles_single_shard_hash_mx); +SET citus.shard_count TO 1; +SET citus.enable_single_shard_table_multi_node_placement to on; +SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst1', 'author_id', colocate_with => 'none'); +SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst2', 'author_id', colocate_with => 'none'); +set citus.enable_single_shard_table_multi_node_placement to off; + WITH shard_counts AS ( SELECT logicalrelid, count(*) AS shard_count FROM pg_dist_shard GROUP BY logicalrelid )