Update distributed placement for single shard tables

pull/7572/head
Vinod Sridharan 2024-04-03 19:28:26 +00:00
parent 3929a5b2a6
commit 6b8df85a88
5 changed files with 88 additions and 8 deletions

View File

@ -146,7 +146,9 @@ static void ConvertCitusLocalTableToTableType(Oid relationId,
DistributedTableParams * DistributedTableParams *
distributedTableParams); distributedTableParams);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount, static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty); Oid colocatedTableId,
bool localTableEmpty,
uint32 colocationId);
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId, static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId); uint32 colocationId);
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType, static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
@ -1288,9 +1290,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
if (tableType == HASH_DISTRIBUTED) if (tableType == HASH_DISTRIBUTED)
{ {
/* create shards for hash distributed table */ /* create shards for hash distributed table */
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, CreateHashDistributedTableShards(relationId,
distributedTableParams->shardCount,
colocatedTableId, colocatedTableId,
localTableEmpty); localTableEmpty,
colocationId);
} }
else if (tableType == REFERENCE_TABLE) else if (tableType == REFERENCE_TABLE)
{ {
@ -1878,7 +1882,8 @@ DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTable
*/ */
static void static void
CreateHashDistributedTableShards(Oid relationId, int shardCount, CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty) Oid colocatedTableId, bool localTableEmpty,
uint32 colocationId)
{ {
bool useExclusiveConnection = false; bool useExclusiveConnection = false;
@ -1917,7 +1922,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
* we can directly use ShardReplicationFactor global variable here. * we can directly use ShardReplicationFactor global variable here.
*/ */
CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor, CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor,
useExclusiveConnection); useExclusiveConnection, colocationId);
} }
} }

View File

@ -81,7 +81,8 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
*/ */
void void
CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor, bool useExclusiveConnections) int32 replicationFactor, bool useExclusiveConnections,
uint32 colocationId)
{ {
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
List *insertedShardPlacements = NIL; List *insertedShardPlacements = NIL;
@ -162,10 +163,11 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* set shard storage type according to relation type */ /* set shard storage type according to relation type */
char shardStorageType = ShardStorageType(distributedTableId); char shardStorageType = ShardStorageType(distributedTableId);
int64 shardOffset = shardCount == 1 ? colocationId : 0;
for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++) for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++)
{ {
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount; uint32 roundRobinNodeIndex = (shardIndex + shardOffset) % workerNodeCount;
/* initialize the hash token space for this shard */ /* initialize the hash token space for this shard */
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);

View File

@ -259,7 +259,8 @@ extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
extern uint64 UpdateShardStatistics(int64 shardId); extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor, int32 replicationFactor,
bool useExclusiveConnections); bool useExclusiveConnections,
uint32 colocationId);
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
bool useExclusiveConnections); bool useExclusiveConnections);
extern void CreateReferenceTableShard(Oid distributedTableId); extern void CreateReferenceTableShard(Oid distributedTableId);

View File

@ -466,3 +466,51 @@ select create_reference_table('temp_table');
ERROR: cannot distribute a temporary table ERROR: cannot distribute a temporary table
DROP TABLE temp_table; DROP TABLE temp_table;
DROP TABLE shard_count_table_3; DROP TABLE shard_count_table_3;
-- test shard count 1 placement with colocate none.
-- create a base table instance
CREATE TABLE shard_count_table_1_inst_1 (a int);
SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- create another table with similar requirements
CREATE TABLE shard_count_table_1_inst_2 (a int);
SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Now check placement:
SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
?column?
---------------------------------------------------------------------
t
(1 row)
-- double check shard counts
SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass;
?column?
---------------------------------------------------------------------
t
(1 row)
-- check placement: These should be placed on different workers.
SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset
SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset
SELECT :'inst_1_node_endpoint' = :'inst_2_node_endpoint';
?column?
---------------------------------------------------------------------
f
(1 row)
DROP TABLE shard_count_table_1_inst_1;
DROP TABLE shard_count_table_1_inst_2;

View File

@ -291,3 +291,27 @@ select create_reference_table('temp_table');
DROP TABLE temp_table; DROP TABLE temp_table;
DROP TABLE shard_count_table_3; DROP TABLE shard_count_table_3;
-- test shard count 1 placement with colocate none.
-- create a base table instance
CREATE TABLE shard_count_table_1_inst_1 (a int);
SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none');
-- create another table with similar requirements
CREATE TABLE shard_count_table_1_inst_2 (a int);
SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none');
-- Now check placement:
SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
-- double check shard counts
SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass;
-- check placement: These should be placed on different workers.
SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset
SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset
SELECT :'inst_1_node_endpoint' = :'inst_2_node_endpoint';
DROP TABLE shard_count_table_1_inst_1;
DROP TABLE shard_count_table_1_inst_2;