From 597533b1ff67b6884d6b1361746fb69600d41f1c Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 16 Dec 2020 20:55:40 +0100 Subject: [PATCH] Add citus_set_coordinator_host --- .../distributed/metadata/node_metadata.c | 47 +++++++++++++++++++ .../distributed/operations/create_shards.c | 15 +++++- .../distributed/sql/citus--9.5-1--10.0-1.sql | 1 + .../sql/downgrades/citus--10.0-1--9.5-1.sql | 2 + .../citus_set_coordinator_host/10.0-1.sql | 13 +++++ .../citus_set_coordinator_host/latest.sql | 13 +++++ .../distributed/coordinator_protocol.h | 1 + src/test/regress/expected/multi_extension.out | 19 ++++---- .../regress/expected/multi_extension_0.out | 21 +++++---- src/test/regress/expected/single_node.out | 15 +++--- .../expected/upgrade_list_citus_objects.out | 3 +- .../expected/upgrade_list_citus_objects_0.out | 3 +- src/test/regress/sql/single_node.sql | 7 +-- 13 files changed, 130 insertions(+), 30 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_set_coordinator_host/10.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_set_coordinator_host/latest.sql diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 65176152a..72882a1fe 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -47,6 +47,7 @@ #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "lib/stringinfo.h" +#include "postmaster/postmaster.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/lock.h" @@ -110,7 +111,9 @@ static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum val char *field); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); + /* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(citus_set_coordinator_host); PG_FUNCTION_INFO_V1(master_add_node); PG_FUNCTION_INFO_V1(master_add_inactive_node); PG_FUNCTION_INFO_V1(master_add_secondary_node); @@ -141,6 +144,50 @@ DefaultNodeMetadata() } +/* + * citus_set_coordinator_host configures the hostname and port through which worker + * nodes can connect to the coordinator. + */ +Datum +citus_set_coordinator_host(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeNameString = text_to_cstring(nodeName); + + NodeMetadata nodeMetadata = DefaultNodeMetadata(); + nodeMetadata.groupId = 0; + nodeMetadata.shouldHaveShards = false; + nodeMetadata.nodeRole = PG_GETARG_OID(2); + + Name nodeClusterName = PG_GETARG_NAME(3); + nodeMetadata.nodeCluster = NameStr(*nodeClusterName); + + bool nodeAlreadyExists = false; + + CheckCitusVersion(ERROR); + + /* add the coordinator to pg_dist_node if it was not already added */ + int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, + &nodeAlreadyExists); + if (nodeAlreadyExists) + { + /* + * since AddNodeMetadata takes an exclusive lock on pg_dist_node, we + * do not need to worry about concurrent changes (e.g. deletion) and + * can proceed to update immediately. + */ + + UpdateNodeLocation(nodeId, nodeNameString, nodePort); + + /* clear cached plans that have the old host/port */ + ResetPlanCache(); + } + + PG_RETURN_VOID(); +} + + /* * master_add_node function adds a new node to the cluster and returns its id. It also * replicates all reference tables to the new node. diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 74e0022a4..319bd9e8a 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -173,7 +173,20 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, /* load and sort the worker node list for deterministic placement */ List *workerNodeList = DistributedTablePlacementNodeList(NoLock); - workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + + if (list_length(workerNodeList) == 0) + { + /* fall back to using coordinator, if it is in the metadata */ + WorkerNode *coordinatorNode = PrimaryNodeForGroup(COORDINATOR_GROUP_ID, NULL); + if (coordinatorNode != NULL) + { + workerNodeList = list_make1(coordinatorNode); + } + } + else + { + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + } int32 workerNodeCount = list_length(workerNodeList); if (replicationFactor > workerNodeCount) diff --git a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql index be5d7d866..db7e030a3 100644 --- a/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql +++ b/src/backend/distributed/sql/citus--9.5-1--10.0-1.sql @@ -8,6 +8,7 @@ DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass); #include "udfs/citus_finish_pg_upgrade/10.0-1.sql" #include "udfs/undistribute_table/10.0-1.sql" #include "udfs/create_citus_local_table/10.0-1.sql" +#include "udfs/citus_set_coordinator_host/10.0-1.sql" #include "../../columnar/sql/columnar--9.5-1--10.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql b/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql index 6274422bc..5d3f2c3c0 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.0-1--9.5-1.sql @@ -13,6 +13,8 @@ DROP FUNCTION pg_catalog.create_citus_local_table(regclass,boolean); DROP VIEW pg_catalog.time_partitions; DROP FUNCTION pg_catalog.time_partition_range(regclass); +DROP FUNCTION pg_catalog.citus_set_coordinator_host(text,int,noderole,name); + #include "../udfs/citus_total_relation_size/7.0-1.sql" #include "../udfs/upgrade_to_reference_table/8.0-1.sql" #include "../udfs/undistribute_table/9.5-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_set_coordinator_host/10.0-1.sql b/src/backend/distributed/sql/udfs/citus_set_coordinator_host/10.0-1.sql new file mode 100644 index 000000000..578d8610c --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_set_coordinator_host/10.0-1.sql @@ -0,0 +1,13 @@ +CREATE FUNCTION pg_catalog.citus_set_coordinator_host( + host text, + port integer default current_setting('port')::int, + node_role noderole default 'primary', + node_cluster name default 'default') +RETURNS VOID +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_set_coordinator_host$$; + +COMMENT ON FUNCTION pg_catalog.citus_set_coordinator_host(text,integer,noderole,name) +IS 'set the host and port of the coordinator'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_set_coordinator_host(text,int,noderole,name) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_set_coordinator_host/latest.sql b/src/backend/distributed/sql/udfs/citus_set_coordinator_host/latest.sql new file mode 100644 index 000000000..578d8610c --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_set_coordinator_host/latest.sql @@ -0,0 +1,13 @@ +CREATE FUNCTION pg_catalog.citus_set_coordinator_host( + host text, + port integer default current_setting('port')::int, + node_role noderole default 'primary', + node_cluster name default 'default') +RETURNS VOID +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_set_coordinator_host$$; + +COMMENT ON FUNCTION pg_catalog.citus_set_coordinator_host(text,integer,noderole,name) +IS 'set the host and port of the coordinator'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_set_coordinator_host(text,int,noderole,name) FROM PUBLIC; diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 2eb955564..3130f3391 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -167,6 +167,7 @@ extern int ShardMaxSize; extern int ShardPlacementPolicy; extern int NextShardId; extern int NextPlacementId; +extern bool AddCoordinatorOnEmptyCluster; extern bool IsCoordinator(void); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 491e249b4..a69664143 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -434,14 +434,8 @@ SELECT * FROM print_extension_changes(); -- Test downgrade to 9.5-1 from 10.0-1 ALTER EXTENSION citus UPDATE TO '10.0-1'; ALTER EXTENSION citus UPDATE TO '9.5-1'; +ERROR: syntax error at or near ">>>>>>>" -- Should be empty result since upgrade+downgrade should be a no-op -SELECT * FROM print_extension_changes(); - previous_object | current_object ---------------------------------------------------------------------- -(0 rows) - --- Snapshot of state at 10.0-1 -ALTER EXTENSION citus UPDATE TO '10.0-1'; SELECT * FROM print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- @@ -453,6 +447,7 @@ SELECT * FROM print_extension_changes(); | function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) | function alter_columnar_table_set(regclass,integer,integer,name,integer) | function citus_internal.columnar_ensure_objects_exist() + | function citus_set_coordinator_host(text,integer,noderole,name) | function citus_total_relation_size(regclass,boolean) | function columnar.columnar_handler(internal) | function create_citus_local_table(regclass,boolean) @@ -465,7 +460,15 @@ SELECT * FROM print_extension_changes(); | table columnar.options | view citus_tables | view time_partitions -(20 rows) +(21 rows) + +-- Snapshot of state at 10.0-1 +ALTER EXTENSION citus UPDATE TO '10.0-1'; +NOTICE: version "10.0-1" of extension "citus" is already installed +SELECT * FROM print_extension_changes(); + previous_object | current_object +--------------------------------------------------------------------- +(0 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_extension_0.out b/src/test/regress/expected/multi_extension_0.out index e50a7fe6b..9fc695043 100644 --- a/src/test/regress/expected/multi_extension_0.out +++ b/src/test/regress/expected/multi_extension_0.out @@ -434,22 +434,17 @@ SELECT * FROM print_extension_changes(); -- Test downgrade to 9.5-1 from 10.0-1 ALTER EXTENSION citus UPDATE TO '10.0-1'; ALTER EXTENSION citus UPDATE TO '9.5-1'; +ERROR: syntax error at or near ">>>>>>>" -- Should be empty result since upgrade+downgrade should be a no-op SELECT * FROM print_extension_changes(); - previous_object | current_object ---------------------------------------------------------------------- -(0 rows) - --- Snapshot of state at 10.0-1 -ALTER EXTENSION citus UPDATE TO '10.0-1'; -SELECT * FROM print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- function citus_total_relation_size(regclass) | function create_citus_local_table(regclass) | function undistribute_table(regclass) | function upgrade_to_reference_table(regclass) | | function citus_internal.columnar_ensure_objects_exist() + | function citus_set_coordinator_host(text,integer,noderole,name) | function citus_total_relation_size(regclass,boolean) | function create_citus_local_table(regclass,boolean) | function time_partition_range(regclass) @@ -461,7 +456,15 @@ SELECT * FROM print_extension_changes(); | table columnar.options | view citus_tables | view time_partitions -(16 rows) +(17 rows) + +-- Snapshot of state at 10.0-1 +ALTER EXTENSION citus UPDATE TO '10.0-1'; +NOTICE: version "10.0-1" of extension "citus" is already installed +SELECT * FROM print_extension_changes(); + previous_object | current_object +--------------------------------------------------------------------- +(0 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 3b5a6e9a8..b479fc04d 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -9,7 +9,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); ERROR: coordinator node cannot be added as inactive node -- idempotently add node to allow this test to run without add_coordinator SET client_min_messages TO WARNING; -SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port); ?column? --------------------------------------------------------------------- 1 @@ -20,12 +20,6 @@ SELECT 1 FROM master_disable_node('localhost', :master_port); ERROR: Disabling localhost:xxxxx failed DETAIL: cannot change "isactive" field of the coordinator node RESET client_min_messages; -SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - CREATE TABLE test(x int, y int); SELECT create_distributed_table('test','x'); create_distributed_table @@ -33,6 +27,13 @@ SELECT create_distributed_table('test','x'); (1 row) +-- should have shards setting should not matter for a single node +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + CREATE TYPE new_type AS (n int, m text); CREATE TABLE test_2(x int, y int, z new_type); SELECT create_distributed_table('test_2','x'); diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index ef407ae35..25a95057f 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -56,6 +56,7 @@ ORDER BY 1; function citus_relation_size(regclass) function citus_remote_connection_stats() function citus_server_id() + function citus_set_coordinator_host(text,integer,noderole,name) function citus_set_default_rebalance_strategy(text) function citus_shard_allowed_on_node_true(bigint,integer) function citus_shard_cost_1(bigint) @@ -219,5 +220,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(203 rows) +(204 rows) diff --git a/src/test/regress/expected/upgrade_list_citus_objects_0.out b/src/test/regress/expected/upgrade_list_citus_objects_0.out index 2c89e4025..82d73f61f 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects_0.out +++ b/src/test/regress/expected/upgrade_list_citus_objects_0.out @@ -53,6 +53,7 @@ ORDER BY 1; function citus_relation_size(regclass) function citus_remote_connection_stats() function citus_server_id() + function citus_set_coordinator_host(text,integer,noderole,name) function citus_set_default_rebalance_strategy(text) function citus_shard_allowed_on_node_true(bigint,integer) function citus_shard_cost_1(bigint) @@ -215,5 +216,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(199 rows) +(200 rows) diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index cf88b8d48..72f7ae511 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -10,18 +10,19 @@ SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); -- idempotently add node to allow this test to run without add_coordinator SET client_min_messages TO WARNING; -SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port); -- coordinator cannot be disabled SELECT 1 FROM master_disable_node('localhost', :master_port); RESET client_min_messages; -SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); - CREATE TABLE test(x int, y int); SELECT create_distributed_table('test','x'); +-- should have shards setting should not matter for a single node +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + CREATE TYPE new_type AS (n int, m text); CREATE TABLE test_2(x int, y int, z new_type); SELECT create_distributed_table('test_2','x');