mirror of https://github.com/citusdata/citus.git
Merge d2fc258e62
into 37e23f44b4
commit
648602d939
|
@ -148,7 +148,9 @@ static void ConvertCitusLocalTableToTableType(Oid relationId,
|
||||||
DistributedTableParams *
|
DistributedTableParams *
|
||||||
distributedTableParams);
|
distributedTableParams);
|
||||||
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
||||||
Oid colocatedTableId, bool localTableEmpty);
|
Oid colocatedTableId,
|
||||||
|
bool localTableEmpty,
|
||||||
|
uint32 colocationId);
|
||||||
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
|
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
|
||||||
uint32 colocationId);
|
uint32 colocationId);
|
||||||
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
|
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
|
||||||
|
@ -1288,9 +1290,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
|
||||||
if (tableType == HASH_DISTRIBUTED)
|
if (tableType == HASH_DISTRIBUTED)
|
||||||
{
|
{
|
||||||
/* create shards for hash distributed table */
|
/* create shards for hash distributed table */
|
||||||
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
|
CreateHashDistributedTableShards(relationId,
|
||||||
|
distributedTableParams->shardCount,
|
||||||
colocatedTableId,
|
colocatedTableId,
|
||||||
localTableEmpty);
|
localTableEmpty,
|
||||||
|
colocationId);
|
||||||
}
|
}
|
||||||
else if (tableType == REFERENCE_TABLE)
|
else if (tableType == REFERENCE_TABLE)
|
||||||
{
|
{
|
||||||
|
@ -1865,7 +1869,8 @@ DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTable
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
||||||
Oid colocatedTableId, bool localTableEmpty)
|
Oid colocatedTableId, bool localTableEmpty,
|
||||||
|
uint32 colocationId)
|
||||||
{
|
{
|
||||||
bool useExclusiveConnection = false;
|
bool useExclusiveConnection = false;
|
||||||
|
|
||||||
|
@ -1904,7 +1909,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
||||||
* we can directly use ShardReplicationFactor global variable here.
|
* we can directly use ShardReplicationFactor global variable here.
|
||||||
*/
|
*/
|
||||||
CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor,
|
CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor,
|
||||||
useExclusiveConnection);
|
useExclusiveConnection, colocationId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,9 @@
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
|
|
||||||
|
/* Config variables managed via guc */
|
||||||
|
bool EnableSingleShardTableMultiNodePlacement = false;
|
||||||
|
|
||||||
|
|
||||||
/* declarations for dynamic loading */
|
/* declarations for dynamic loading */
|
||||||
PG_FUNCTION_INFO_V1(master_create_worker_shards);
|
PG_FUNCTION_INFO_V1(master_create_worker_shards);
|
||||||
|
@ -81,7 +84,8 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
||||||
int32 replicationFactor, bool useExclusiveConnections)
|
int32 replicationFactor, bool useExclusiveConnections,
|
||||||
|
uint32 colocationId)
|
||||||
{
|
{
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
|
||||||
List *insertedShardPlacements = NIL;
|
List *insertedShardPlacements = NIL;
|
||||||
|
@ -163,9 +167,19 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
||||||
/* set shard storage type according to relation type */
|
/* set shard storage type according to relation type */
|
||||||
char shardStorageType = ShardStorageType(distributedTableId);
|
char shardStorageType = ShardStorageType(distributedTableId);
|
||||||
|
|
||||||
|
int64 shardOffset = 0;
|
||||||
|
if (EnableSingleShardTableMultiNodePlacement && shardCount == 1 &&
|
||||||
|
shardStorageType == SHARD_STORAGE_TABLE)
|
||||||
|
{
|
||||||
|
/* For single shard distributed tables, use the colocationId to offset
|
||||||
|
* where the shard is placed.
|
||||||
|
*/
|
||||||
|
shardOffset = colocationId;
|
||||||
|
}
|
||||||
|
|
||||||
for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++)
|
for (int64 shardIndex = 0; shardIndex < shardCount; shardIndex++)
|
||||||
{
|
{
|
||||||
uint32 roundRobinNodeIndex = shardIndex % workerNodeCount;
|
uint32 roundRobinNodeIndex = (shardIndex + shardOffset) % workerNodeCount;
|
||||||
|
|
||||||
/* initialize the hash token space for this shard */
|
/* initialize the hash token space for this shard */
|
||||||
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
|
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
|
||||||
|
|
|
@ -86,6 +86,7 @@
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
|
#include "distributed/pg_dist_shard.h"
|
||||||
#include "distributed/placement_connection.h"
|
#include "distributed/placement_connection.h"
|
||||||
#include "distributed/priority.h"
|
#include "distributed/priority.h"
|
||||||
#include "distributed/query_pushdown_planning.h"
|
#include "distributed/query_pushdown_planning.h"
|
||||||
|
@ -1451,6 +1452,17 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.enable_single_shard_table_multi_node_placement",
|
||||||
|
gettext_noop("Enables placement of single shard distributed tables in"
|
||||||
|
" all nodes of the cluster"),
|
||||||
|
NULL,
|
||||||
|
&EnableSingleShardTableMultiNodePlacement,
|
||||||
|
false,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_statistics_collection",
|
"citus.enable_statistics_collection",
|
||||||
gettext_noop("Enables sending basic usage statistics to Citus."),
|
gettext_noop("Enables sending basic usage statistics to Citus."),
|
||||||
|
|
|
@ -259,7 +259,8 @@ extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
|
||||||
extern uint64 UpdateShardStatistics(int64 shardId);
|
extern uint64 UpdateShardStatistics(int64 shardId);
|
||||||
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
||||||
int32 replicationFactor,
|
int32 replicationFactor,
|
||||||
bool useExclusiveConnections);
|
bool useExclusiveConnections,
|
||||||
|
uint32 colocationId);
|
||||||
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
|
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
|
||||||
bool useExclusiveConnections);
|
bool useExclusiveConnections);
|
||||||
extern void CreateReferenceTableShard(Oid distributedTableId);
|
extern void CreateReferenceTableShard(Oid distributedTableId);
|
||||||
|
|
|
@ -59,5 +59,6 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard;
|
||||||
#define SHARD_STORAGE_TABLE 't'
|
#define SHARD_STORAGE_TABLE 't'
|
||||||
#define SHARD_STORAGE_VIRTUAL 'v'
|
#define SHARD_STORAGE_VIRTUAL 'v'
|
||||||
|
|
||||||
|
extern bool EnableSingleShardTableMultiNodePlacement;
|
||||||
|
|
||||||
#endif /* PG_DIST_SHARD_H */
|
#endif /* PG_DIST_SHARD_H */
|
||||||
|
|
|
@ -466,3 +466,52 @@ select create_reference_table('temp_table');
|
||||||
ERROR: cannot distribute a temporary table
|
ERROR: cannot distribute a temporary table
|
||||||
DROP TABLE temp_table;
|
DROP TABLE temp_table;
|
||||||
DROP TABLE shard_count_table_3;
|
DROP TABLE shard_count_table_3;
|
||||||
|
-- test shard count 1 placement with colocate none.
|
||||||
|
-- create a base table instance
|
||||||
|
set citus.enable_single_shard_table_multi_node_placement to on;
|
||||||
|
CREATE TABLE shard_count_table_1_inst_1 (a int);
|
||||||
|
SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- create another table with similar requirements
|
||||||
|
CREATE TABLE shard_count_table_1_inst_2 (a int);
|
||||||
|
SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Now check placement:
|
||||||
|
SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- double check shard counts
|
||||||
|
SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- check placement: These should be placed on different workers.
|
||||||
|
SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset
|
||||||
|
SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset
|
||||||
|
SELECT :'inst_1_node_endpoint', :'inst_2_node_endpoint', :'inst_1_node_endpoint' = :'inst_2_node_endpoint';
|
||||||
|
?column? | ?column? | ?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
localhost:xxxxx | localhost:xxxxx | f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE shard_count_table_1_inst_1;
|
||||||
|
DROP TABLE shard_count_table_1_inst_2;
|
||||||
|
|
|
@ -423,6 +423,23 @@ SELECT create_distributed_table('company_employees_mx', 'company_id');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE articles_single_shard_hash_mx_partition_inst1 (LIKE articles_single_shard_hash_mx);
|
||||||
|
CREATE TABLE articles_single_shard_hash_mx_partition_inst2 (LIKE articles_single_shard_hash_mx);
|
||||||
|
SET citus.shard_count TO 1;
|
||||||
|
SET citus.enable_single_shard_table_multi_node_placement to on;
|
||||||
|
SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst1', 'author_id', colocate_with => 'none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst2', 'author_id', colocate_with => 'none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
set citus.enable_single_shard_table_multi_node_placement to off;
|
||||||
WITH shard_counts AS (
|
WITH shard_counts AS (
|
||||||
SELECT logicalrelid, count(*) AS shard_count FROM pg_dist_shard GROUP BY logicalrelid
|
SELECT logicalrelid, count(*) AS shard_count FROM pg_dist_shard GROUP BY logicalrelid
|
||||||
)
|
)
|
||||||
|
@ -454,7 +471,9 @@ ORDER BY colocationid, logicalrelid;
|
||||||
labs_mx | 1220007 | 1 | h | s
|
labs_mx | 1220007 | 1 | h | s
|
||||||
objects_mx | 1220007 | 1 | h | s
|
objects_mx | 1220007 | 1 | h | s
|
||||||
articles_single_shard_hash_mx | 1220007 | 1 | h | s
|
articles_single_shard_hash_mx | 1220007 | 1 | h | s
|
||||||
(23 rows)
|
articles_single_shard_hash_mx_partition_inst1 | 1220008 | 1 | h | s
|
||||||
|
articles_single_shard_hash_mx_partition_inst2 | 1220009 | 1 | h | s
|
||||||
|
(25 rows)
|
||||||
|
|
||||||
-- check the citus_tables view
|
-- check the citus_tables view
|
||||||
SELECT table_name, citus_table_type, distribution_column, shard_count, table_owner
|
SELECT table_name, citus_table_type, distribution_column, shard_count, table_owner
|
||||||
|
@ -465,6 +484,8 @@ ORDER BY table_name::text;
|
||||||
app_analytics_events_mx | distributed | app_id | 4 | postgres
|
app_analytics_events_mx | distributed | app_id | 4 | postgres
|
||||||
articles_hash_mx | distributed | author_id | 2 | postgres
|
articles_hash_mx | distributed | author_id | 2 | postgres
|
||||||
articles_single_shard_hash_mx | distributed | author_id | 1 | postgres
|
articles_single_shard_hash_mx | distributed | author_id | 1 | postgres
|
||||||
|
articles_single_shard_hash_mx_partition_inst1 | distributed | author_id | 1 | postgres
|
||||||
|
articles_single_shard_hash_mx_partition_inst2 | distributed | author_id | 1 | postgres
|
||||||
citus_mx_test_schema.nation_hash | distributed | n_nationkey | 16 | postgres
|
citus_mx_test_schema.nation_hash | distributed | n_nationkey | 16 | postgres
|
||||||
citus_mx_test_schema.nation_hash_collation_search_path | distributed | n_nationkey | 4 | postgres
|
citus_mx_test_schema.nation_hash_collation_search_path | distributed | n_nationkey | 4 | postgres
|
||||||
citus_mx_test_schema.nation_hash_composite_types | distributed | n_nationkey | 4 | postgres
|
citus_mx_test_schema.nation_hash_composite_types | distributed | n_nationkey | 4 | postgres
|
||||||
|
@ -485,7 +506,7 @@ ORDER BY table_name::text;
|
||||||
part_mx | reference | <none> | 1 | postgres
|
part_mx | reference | <none> | 1 | postgres
|
||||||
researchers_mx | distributed | lab_id | 2 | postgres
|
researchers_mx | distributed | lab_id | 2 | postgres
|
||||||
supplier_mx | reference | <none> | 1 | postgres
|
supplier_mx | reference | <none> | 1 | postgres
|
||||||
(23 rows)
|
(25 rows)
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
|
@ -497,6 +518,8 @@ ORDER BY table_name::text;
|
||||||
app_analytics_events_mx | distributed | app_id | 4 | postgres
|
app_analytics_events_mx | distributed | app_id | 4 | postgres
|
||||||
articles_hash_mx | distributed | author_id | 2 | postgres
|
articles_hash_mx | distributed | author_id | 2 | postgres
|
||||||
articles_single_shard_hash_mx | distributed | author_id | 1 | postgres
|
articles_single_shard_hash_mx | distributed | author_id | 1 | postgres
|
||||||
|
articles_single_shard_hash_mx_partition_inst1 | distributed | author_id | 1 | postgres
|
||||||
|
articles_single_shard_hash_mx_partition_inst2 | distributed | author_id | 1 | postgres
|
||||||
citus_mx_test_schema.nation_hash | distributed | n_nationkey | 16 | postgres
|
citus_mx_test_schema.nation_hash | distributed | n_nationkey | 16 | postgres
|
||||||
citus_mx_test_schema.nation_hash_collation_search_path | distributed | n_nationkey | 4 | postgres
|
citus_mx_test_schema.nation_hash_collation_search_path | distributed | n_nationkey | 4 | postgres
|
||||||
citus_mx_test_schema.nation_hash_composite_types | distributed | n_nationkey | 4 | postgres
|
citus_mx_test_schema.nation_hash_composite_types | distributed | n_nationkey | 4 | postgres
|
||||||
|
@ -517,7 +540,7 @@ ORDER BY table_name::text;
|
||||||
part_mx | reference | <none> | 1 | postgres
|
part_mx | reference | <none> | 1 | postgres
|
||||||
researchers_mx | distributed | lab_id | 2 | postgres
|
researchers_mx | distributed | lab_id | 2 | postgres
|
||||||
supplier_mx | reference | <none> | 1 | postgres
|
supplier_mx | reference | <none> | 1 | postgres
|
||||||
(23 rows)
|
(25 rows)
|
||||||
|
|
||||||
SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards ORDER BY shard_name::text;
|
SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards ORDER BY shard_name::text;
|
||||||
shard_name | table_name | citus_table_type | shard_size
|
shard_name | table_name | citus_table_type | shard_size
|
||||||
|
@ -553,6 +576,14 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
|
||||||
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
|
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
|
||||||
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
|
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
|
||||||
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
|
articles_single_shard_hash_mx_1220106 | articles_single_shard_hash_mx | distributed | 0
|
||||||
|
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
|
||||||
|
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
|
||||||
|
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
|
||||||
|
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
|
||||||
|
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
|
||||||
|
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
|
||||||
|
articles_single_shard_hash_mx_partition_inst1_1220111 | articles_single_shard_hash_mx_partition_inst1 | distributed | 0
|
||||||
|
articles_single_shard_hash_mx_partition_inst2_1220112 | articles_single_shard_hash_mx_partition_inst2 | distributed | 0
|
||||||
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
|
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
|
||||||
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
|
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
|
||||||
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
|
citus_mx_test_schema.nation_hash_1220016 | citus_mx_test_schema.nation_hash | distributed | 0
|
||||||
|
@ -991,7 +1022,7 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
|
||||||
supplier_mx_1220087 | supplier_mx | reference | 0
|
supplier_mx_1220087 | supplier_mx | reference | 0
|
||||||
supplier_mx_1220087 | supplier_mx | reference | 0
|
supplier_mx_1220087 | supplier_mx | reference | 0
|
||||||
supplier_mx_1220087 | supplier_mx | reference | 0
|
supplier_mx_1220087 | supplier_mx | reference | 0
|
||||||
(469 rows)
|
(477 rows)
|
||||||
|
|
||||||
-- Show that altering type name is not supported from worker node
|
-- Show that altering type name is not supported from worker node
|
||||||
ALTER TYPE citus_mx_test_schema.order_side_mx RENAME TO temp_order_side_mx;
|
ALTER TYPE citus_mx_test_schema.order_side_mx RENAME TO temp_order_side_mx;
|
||||||
|
|
|
@ -774,15 +774,64 @@ SELECT a.author_id as first_author, b.word_count as second_word_count
|
||||||
FROM articles_hash a, articles_single_shard_hash b
|
FROM articles_hash a, articles_single_shard_hash b
|
||||||
WHERE a.author_id = 10 and a.author_id = b.author_id
|
WHERE a.author_id = 10 and a.author_id = b.author_id
|
||||||
ORDER BY 1,2 LIMIT 3;
|
ORDER BY 1,2 LIMIT 3;
|
||||||
DEBUG: Creating router plan
|
DEBUG: found no worker with all shard placements
|
||||||
DEBUG: query has a single distribution column value: 10
|
DEBUG: push down of limit count: 3
|
||||||
first_author | second_word_count
|
DEBUG: join prunable for task partitionId 0 and 1
|
||||||
---------------------------------------------------------------------
|
DEBUG: join prunable for task partitionId 0 and 2
|
||||||
10 | 19519
|
DEBUG: join prunable for task partitionId 0 and 3
|
||||||
10 | 19519
|
DEBUG: join prunable for task partitionId 0 and 4
|
||||||
10 | 19519
|
DEBUG: join prunable for task partitionId 0 and 5
|
||||||
(3 rows)
|
DEBUG: join prunable for task partitionId 1 and 0
|
||||||
|
DEBUG: join prunable for task partitionId 1 and 2
|
||||||
|
DEBUG: join prunable for task partitionId 1 and 3
|
||||||
|
DEBUG: join prunable for task partitionId 1 and 4
|
||||||
|
DEBUG: join prunable for task partitionId 1 and 5
|
||||||
|
DEBUG: join prunable for task partitionId 2 and 0
|
||||||
|
DEBUG: join prunable for task partitionId 2 and 1
|
||||||
|
DEBUG: join prunable for task partitionId 2 and 3
|
||||||
|
DEBUG: join prunable for task partitionId 2 and 4
|
||||||
|
DEBUG: join prunable for task partitionId 2 and 5
|
||||||
|
DEBUG: join prunable for task partitionId 3 and 0
|
||||||
|
DEBUG: join prunable for task partitionId 3 and 1
|
||||||
|
DEBUG: join prunable for task partitionId 3 and 2
|
||||||
|
DEBUG: join prunable for task partitionId 3 and 4
|
||||||
|
DEBUG: join prunable for task partitionId 3 and 5
|
||||||
|
DEBUG: join prunable for task partitionId 4 and 0
|
||||||
|
DEBUG: join prunable for task partitionId 4 and 1
|
||||||
|
DEBUG: join prunable for task partitionId 4 and 2
|
||||||
|
DEBUG: join prunable for task partitionId 4 and 3
|
||||||
|
DEBUG: join prunable for task partitionId 4 and 5
|
||||||
|
DEBUG: join prunable for task partitionId 5 and 0
|
||||||
|
DEBUG: join prunable for task partitionId 5 and 1
|
||||||
|
DEBUG: join prunable for task partitionId 5 and 2
|
||||||
|
DEBUG: join prunable for task partitionId 5 and 3
|
||||||
|
DEBUG: join prunable for task partitionId 5 and 4
|
||||||
|
DEBUG: pruning merge fetch taskId 1
|
||||||
|
DETAIL: Creating dependency on merge taskId 2
|
||||||
|
DEBUG: pruning merge fetch taskId 2
|
||||||
|
DETAIL: Creating dependency on merge taskId 2
|
||||||
|
DEBUG: pruning merge fetch taskId 4
|
||||||
|
DETAIL: Creating dependency on merge taskId 4
|
||||||
|
DEBUG: pruning merge fetch taskId 5
|
||||||
|
DETAIL: Creating dependency on merge taskId 4
|
||||||
|
DEBUG: pruning merge fetch taskId 7
|
||||||
|
DETAIL: Creating dependency on merge taskId 6
|
||||||
|
DEBUG: pruning merge fetch taskId 8
|
||||||
|
DETAIL: Creating dependency on merge taskId 6
|
||||||
|
DEBUG: pruning merge fetch taskId 10
|
||||||
|
DETAIL: Creating dependency on merge taskId 8
|
||||||
|
DEBUG: pruning merge fetch taskId 11
|
||||||
|
DETAIL: Creating dependency on merge taskId 8
|
||||||
|
DEBUG: pruning merge fetch taskId 13
|
||||||
|
DETAIL: Creating dependency on merge taskId 10
|
||||||
|
DEBUG: pruning merge fetch taskId 14
|
||||||
|
DETAIL: Creating dependency on merge taskId 10
|
||||||
|
DEBUG: pruning merge fetch taskId 16
|
||||||
|
DETAIL: Creating dependency on merge taskId 12
|
||||||
|
DEBUG: pruning merge fetch taskId 17
|
||||||
|
DETAIL: Creating dependency on merge taskId 12
|
||||||
|
ERROR: the query contains a join that requires repartitioning
|
||||||
|
HINT: Set citus.enable_repartition_joins to on to enable repartitioning
|
||||||
SET citus.enable_non_colocated_router_query_pushdown TO OFF;
|
SET citus.enable_non_colocated_router_query_pushdown TO OFF;
|
||||||
-- but this is not the case otherwise
|
-- but this is not the case otherwise
|
||||||
SELECT a.author_id as first_author, b.word_count as second_word_count
|
SELECT a.author_id as first_author, b.word_count as second_word_count
|
||||||
|
|
|
@ -291,3 +291,28 @@ select create_reference_table('temp_table');
|
||||||
DROP TABLE temp_table;
|
DROP TABLE temp_table;
|
||||||
|
|
||||||
DROP TABLE shard_count_table_3;
|
DROP TABLE shard_count_table_3;
|
||||||
|
|
||||||
|
-- test shard count 1 placement with colocate none.
|
||||||
|
-- create a base table instance
|
||||||
|
set citus.enable_single_shard_table_multi_node_placement to on;
|
||||||
|
CREATE TABLE shard_count_table_1_inst_1 (a int);
|
||||||
|
SELECT create_distributed_table('shard_count_table_1_inst_1', 'a', shard_count:=1, colocate_with:='none');
|
||||||
|
|
||||||
|
-- create another table with similar requirements
|
||||||
|
CREATE TABLE shard_count_table_1_inst_2 (a int);
|
||||||
|
SELECT create_distributed_table('shard_count_table_1_inst_2', 'a', shard_count:=1, colocate_with:='none');
|
||||||
|
|
||||||
|
-- Now check placement:
|
||||||
|
SELECT (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) != (SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
|
||||||
|
|
||||||
|
-- double check shard counts
|
||||||
|
SELECT (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass) = (SELECT shard_count FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_2'::regclass);
|
||||||
|
SELECT shard_count = 1 FROM citus_tables WHERE table_name = 'shard_count_table_1_inst_1'::regclass;
|
||||||
|
|
||||||
|
-- check placement: These should be placed on different workers.
|
||||||
|
SELECT nodename || ':' || nodeport AS inst_1_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_1'::regclass \gset
|
||||||
|
SELECT nodename || ':' || nodeport AS inst_2_node_endpoint FROM citus_shards WHERE table_name = 'shard_count_table_1_inst_2'::regclass \gset
|
||||||
|
SELECT :'inst_1_node_endpoint', :'inst_2_node_endpoint', :'inst_1_node_endpoint' = :'inst_2_node_endpoint';
|
||||||
|
|
||||||
|
DROP TABLE shard_count_table_1_inst_1;
|
||||||
|
DROP TABLE shard_count_table_1_inst_2;
|
||||||
|
|
|
@ -390,6 +390,14 @@ SET citus.shard_count TO 4;
|
||||||
CREATE TABLE company_employees_mx (company_id int, employee_id int, manager_id int);
|
CREATE TABLE company_employees_mx (company_id int, employee_id int, manager_id int);
|
||||||
SELECT create_distributed_table('company_employees_mx', 'company_id');
|
SELECT create_distributed_table('company_employees_mx', 'company_id');
|
||||||
|
|
||||||
|
CREATE TABLE articles_single_shard_hash_mx_partition_inst1 (LIKE articles_single_shard_hash_mx);
|
||||||
|
CREATE TABLE articles_single_shard_hash_mx_partition_inst2 (LIKE articles_single_shard_hash_mx);
|
||||||
|
SET citus.shard_count TO 1;
|
||||||
|
SET citus.enable_single_shard_table_multi_node_placement to on;
|
||||||
|
SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst1', 'author_id', colocate_with => 'none');
|
||||||
|
SELECT create_distributed_table('articles_single_shard_hash_mx_partition_inst2', 'author_id', colocate_with => 'none');
|
||||||
|
set citus.enable_single_shard_table_multi_node_placement to off;
|
||||||
|
|
||||||
WITH shard_counts AS (
|
WITH shard_counts AS (
|
||||||
SELECT logicalrelid, count(*) AS shard_count FROM pg_dist_shard GROUP BY logicalrelid
|
SELECT logicalrelid, count(*) AS shard_count FROM pg_dist_shard GROUP BY logicalrelid
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue