diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 8a633e3dc..618098143 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -83,6 +83,7 @@ static char * CitusCreateAlterColumnarTableSet(char *qualifiedRelationName, const ColumnarOptions *options); static char * GetTableDDLCommandColumnar(void *context); static TableDDLCommand * ColumnarGetTableOptionsDDL(Oid relationId); +static CitusNodeRoleEnum LookupCitusNodeRole(Oid roleOid); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_get_table_metadata); @@ -94,7 +95,7 @@ PG_FUNCTION_INFO_V1(citus_get_active_worker_nodes); PG_FUNCTION_INFO_V1(master_get_round_robin_candidate_nodes); PG_FUNCTION_INFO_V1(master_stage_shard_row); PG_FUNCTION_INFO_V1(master_stage_shard_placement_row); - +PG_FUNCTION_INFO_V1(citus_node_list); /* * master_get_table_metadata is a deprecated UDF. @@ -438,6 +439,114 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS) } +/* + * citus_node_list returns a set of node host names and port numbers + * using the provided arguments. The arguments are: active boolean, and role + * (enum). The active boolean indicates whether to return only active nodes + * or all nodes (when provided as null). The role indicates whether to return + * only nodes with a specific role (worker or coordinator) or all nodes (when + * provided as null). Default values for both arguments are null. + */ +Datum +citus_node_list(PG_FUNCTION_ARGS) +{ + ActiveFilterEnum activeFilter = ACTIVE_FILTER_ALL; + if (PG_NARGS() > 0 && !PG_ARGISNULL(0)) + { + bool active = PG_GETARG_BOOL(0); + activeFilter = active ? ACTIVE_FILTER_ACTIVE : ACTIVE_FILTER_INACTIVE; + } + + CitusNodeRoleEnum roleFilter = CITUS_NODE_ROLE_ALL; + if (PG_NARGS() > 1 && !PG_ARGISNULL(1)) + { + Oid roleOid = PG_GETARG_OID(1); + roleFilter = LookupCitusNodeRole(roleOid); + } + + CheckCitusVersion(ERROR); + + FuncCallContext *functionContext = NULL; + uint32 workerNodeCount = 0; + + if (SRF_IS_FIRSTCALL()) + { + /* create a function context for cross-call persistence */ + functionContext = SRF_FIRSTCALL_INIT(); + + /* switch to memory context appropriate for multiple function calls */ + MemoryContext oldContext = MemoryContextSwitchTo( + functionContext->multi_call_memory_ctx); + + List *workerNodeList = FilterNodeListFunc(NoLock, + activeFilter, + roleFilter); + workerNodeCount = (uint32) list_length(workerNodeList); + + functionContext->user_fctx = workerNodeList; + functionContext->max_calls = workerNodeCount; + + /* + * This tuple descriptor must match the output parameters declared for + * the function in pg_proc. + */ + TupleDesc tupleDescriptor = CreateTemplateTupleDesc(WORKER_NODE_FIELDS); + TupleDescInitEntry(tupleDescriptor, (AttrNumber) 1, "node_name", + TEXTOID, -1, 0); + TupleDescInitEntry(tupleDescriptor, (AttrNumber) 2, "node_port", + INT8OID, -1, 0); + + functionContext->tuple_desc = BlessTupleDesc(tupleDescriptor); + + MemoryContextSwitchTo(oldContext); + } + + functionContext = SRF_PERCALL_SETUP(); + uint32 workerNodeIndex = functionContext->call_cntr; + workerNodeCount = functionContext->max_calls; + + if (workerNodeIndex < workerNodeCount) + { + List *workerNodeList = functionContext->user_fctx; + WorkerNode *workerNode = list_nth(workerNodeList, workerNodeIndex); + + Datum workerNodeDatum = WorkerNodeGetDatum(workerNode, + functionContext->tuple_desc); + + SRF_RETURN_NEXT(functionContext, workerNodeDatum); + } + else + { + SRF_RETURN_DONE(functionContext); + } +} + + +static CitusNodeRoleEnum +LookupCitusNodeRole(Oid roleOid) +{ + CitusNodeRoleEnum roleFilter = CITUS_NODE_ROLE_ALL; + + Datum enumLabelDatum = DirectFunctionCall1(enum_out, roleOid); + char *enumLabel = DatumGetCString(enumLabelDatum); + + if (strncmp(enumLabel, "coordinator", NAMEDATALEN) == 0) + { + roleFilter = CITUS_NODE_ROLE_COORDINATOR; + } + else if (strncmp(enumLabel, "worker", NAMEDATALEN) == 0) + { + roleFilter = CITUS_NODE_ROLE_WORKER; + } + else + { + ereport(ERROR, (errmsg("invalid label for enum: %s", enumLabel))); + } + + return roleFilter; +} + + /* Finds the relationId from a potentially qualified relation name. */ Oid ResolveRelationId(text *relationName, bool missingOk) diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 8a4245ca0..ecc47633a 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -82,8 +82,9 @@ WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, /* - * ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary in the cluster. - * This method excludes coordinator even if it is added as a worker to cluster. + * ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary + * in the cluster. This method excludes coordinator even if it is added as a worker + * to cluster. */ uint32 ActivePrimaryNonCoordinatorNodeCount(void) @@ -117,7 +118,7 @@ NodeIsCoordinator(WorkerNode *node) /* - * ActiveNodeListFilterFunc returns a list of all active nodes that checkFunction + * FilterActiveNodeListFunc returns a list of all active nodes that checkFunction * returns true for. * lockMode specifies which lock to use on pg_dist_node, this is necessary when * the caller wouldn't want nodes to be added concurrent to their use of this list @@ -154,10 +155,87 @@ FilterActiveNodeListFunc(LOCKMODE lockMode, bool (*checkFunction)(WorkerNode *)) /* - * ActivePrimaryNonCoordinatorNodeList returns a list of all active primary worker nodes - * in workerNodeHash. lockMode specifies which lock to use on pg_dist_node, - * this is necessary when the caller wouldn't want nodes to be added concurrent - * to their use of this list. + * FilterNodeListFunc returns a list of all active nodes that match the given + * active and role filters. + * 'active' specifies whether to return only active nodes or all nodes + * 'role' specifies whether to return coordinator nodes, worker nodes, + * or all nodes. 'lockMode' specifies which lock to use on pg_dist_node, + * this is necessary when the caller wouldn't want nodes to be added + * concurrent to their use of this list. + */ +List * +FilterNodeListFunc(LOCKMODE lockMode, ActiveFilterEnum active, + CitusNodeRoleEnum role) +{ + List *workerNodeList = NIL; + WorkerNode *workerNode = NULL; + HASH_SEQ_STATUS status; + + if (lockMode != NoLock) + { + LockRelationOid(DistNodeRelationId(), lockMode); + } + + HTAB *workerNodeHash = GetWorkerNodeHash(); + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) + { + bool activeCond = true; + if (active == ACTIVE_FILTER_ACTIVE) + { + activeCond = workerNode->isActive; + } + else if (active == ACTIVE_FILTER_INACTIVE) + { + activeCond = !workerNode->isActive; + } + else if (active == ACTIVE_FILTER_ALL) + { + activeCond = true; + } + else + { + /* This should never happen */ + Assert(false); + } + + bool roleCond = true; + if (role == CITUS_NODE_ROLE_COORDINATOR) + { + roleCond = NodeIsCoordinator(workerNode); + } + else if (role == CITUS_NODE_ROLE_WORKER) + { + roleCond = !NodeIsCoordinator(workerNode); + } + else if (role == CITUS_NODE_ROLE_ALL) + { + roleCond = true; + } + else + { + /* This should never happen */ + Assert(false); + } + + if (activeCond && roleCond) + { + WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode)); + *workerNodeCopy = *workerNode; + workerNodeList = lappend(workerNodeList, workerNodeCopy); + } + } + + return workerNodeList; +} + + +/* + * ActivePrimaryNonCoordinatorNodeList returns a list of all active primary + * worker nodes in workerNodeHash. lockMode specifies which lock to use on + * pg_dist_node, this is necessary when the caller wouldn't want nodes to + * be added concurrent to their use of this list. * This method excludes coordinator even if it is added as a worker to cluster. */ List * diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 0f70438e0..0ba3c7992 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -48,3 +48,4 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "udfs/repl_origin_helper/13.1-1.sql" #include "udfs/citus_finish_pg_upgrade/13.1-1.sql" #include "udfs/citus_is_primary_node/13.1-1.sql" +#include "udfs/citus_node_list/13.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql b/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql index dd89fc249..006726eb6 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql @@ -41,3 +41,9 @@ DROP FUNCTION citus_internal.start_replication_origin_tracking(); DROP FUNCTION citus_internal.stop_replication_origin_tracking(); DROP FUNCTION citus_internal.is_replication_origin_tracking_active(); #include "../udfs/citus_finish_pg_upgrade/12.1-1.sql" +DROP FUNCTION pg_catalog.citus_node_list( + IN active BOOL, + IN role citus_noderole, + OUT node_name text, + OUT node_port bigint); +DROP TYPE pg_catalog.citus_noderole; diff --git a/src/backend/distributed/sql/udfs/citus_node_list/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_node_list/13.1-1.sql new file mode 100644 index 000000000..5a77d58fc --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_node_list/13.1-1.sql @@ -0,0 +1,15 @@ +CREATE TYPE pg_catalog.citus_noderole AS ENUM ( + 'coordinator', -- node is coordinator + 'worker' -- node is worker +); + +CREATE OR REPLACE FUNCTION pg_catalog.citus_node_list( + IN active BOOL DEFAULT null, + IN role citus_noderole DEFAULT null, + OUT node_name text, + OUT node_port bigint) + RETURNS SETOF record + LANGUAGE C VOLATILE ROWS 100 + AS 'MODULE_PATHNAME', $$citus_node_list$$; +COMMENT ON FUNCTION pg_catalog.citus_node_list(active BOOL, role citus_noderole) + IS 'fetch set of nodes which match the given criteria'; diff --git a/src/backend/distributed/sql/udfs/citus_node_list/latest.sql b/src/backend/distributed/sql/udfs/citus_node_list/latest.sql new file mode 100644 index 000000000..5a77d58fc --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_node_list/latest.sql @@ -0,0 +1,15 @@ +CREATE TYPE pg_catalog.citus_noderole AS ENUM ( + 'coordinator', -- node is coordinator + 'worker' -- node is worker +); + +CREATE OR REPLACE FUNCTION pg_catalog.citus_node_list( + IN active BOOL DEFAULT null, + IN role citus_noderole DEFAULT null, + OUT node_name text, + OUT node_port bigint) + RETURNS SETOF record + LANGUAGE C VOLATILE ROWS 100 + AS 'MODULE_PATHNAME', $$citus_node_list$$; +COMMENT ON FUNCTION pg_catalog.citus_node_list(active BOOL, role citus_noderole) + IS 'fetch set of nodes which match the given criteria'; diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 02a43fe0b..95e764f5f 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -112,4 +112,21 @@ extern int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySiz extern int NodeNamePortCompare(const char *workerLhsName, const char *workerRhsName, int workerLhsPort, int workerRhsPort); +typedef enum ActiveFilterEnum +{ + ACTIVE_FILTER_ALL = 0, + ACTIVE_FILTER_ACTIVE = 1, + ACTIVE_FILTER_INACTIVE = 2 +} ActiveFilterEnum; + +typedef enum CitusNodeRoleEnum +{ + CITUS_NODE_ROLE_ALL = 0, + CITUS_NODE_ROLE_COORDINATOR = 1, + CITUS_NODE_ROLE_WORKER = 2 +} CitusNodeRoleEnum; + +extern List * FilterNodeListFunc(LOCKMODE lockMode, ActiveFilterEnum active, + CitusNodeRoleEnum role); + #endif /* WORKER_MANAGER_H */ diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index e6634d5c6..98f80dcb8 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -72,6 +72,22 @@ SELECT master_get_active_worker_nodes(); (localhost,57637) (2 rows) +SELECT * from citus_node_list(); + node_name | node_port +--------------------------------------------------------------------- + localhost | 57636 + localhost | 57638 + localhost | 57637 +(3 rows) + +SELECT * from citus_node_list(active := NULL); + node_name | node_port +--------------------------------------------------------------------- + localhost | 57636 + localhost | 57638 + localhost | 57637 +(3 rows) + -- try to add a node that is already in the cluster SELECT * FROM master_add_node('localhost', :worker_1_port); master_add_node @@ -126,6 +142,53 @@ SELECT master_get_active_worker_nodes(); (localhost,57637) (1 row) +SELECT * from citus_node_list(active := TRUE); + node_name | node_port +--------------------------------------------------------------------- + localhost | 57636 + localhost | 57637 +(2 rows) + +SELECT * from citus_node_list(active := FALSE); + node_name | node_port +--------------------------------------------------------------------- + localhost | 57638 +(1 row) + +SELECT * from citus_node_list(role := 'worker'); + node_name | node_port +--------------------------------------------------------------------- + localhost | 57638 + localhost | 57637 +(2 rows) + +SELECT * from citus_node_list(role := 'coordinator'); + node_name | node_port +--------------------------------------------------------------------- + localhost | 57636 +(1 row) + +SELECT * from citus_node_list(role := NULL); + node_name | node_port +--------------------------------------------------------------------- + localhost | 57636 + localhost | 57638 + localhost | 57637 +(3 rows) + +SELECT * from citus_node_list(role := 'foo'); +ERROR: invalid input value for enum citus_noderole: "foo" +SELECT * from citus_node_list(active := FALSE, role := 'coordinator'); + node_name | node_port +--------------------------------------------------------------------- +(0 rows) + +SELECT * from citus_node_list(active := FALSE, role := 'worker'); + node_name | node_port +--------------------------------------------------------------------- + localhost | 57638 +(1 row) + -- add some shard placements to the cluster SET citus.shard_count TO 16; SET citus.shard_replication_factor TO 1; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 51b2be416..648c81395 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1480,8 +1480,9 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_internal.update_placement_metadata(bigint,integer,integer) void | function citus_internal.update_relation_colocation(oid,integer) void | function citus_is_primary_node() boolean + | function citus_node_list(boolean,citus_noderole) SETOF record | function citus_unmark_object_distributed(oid,oid,integer,boolean) void -(27 rows) +(28 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 048e86c67..2ceb91411 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -148,6 +148,7 @@ ORDER BY 1; function citus_move_shard_placement(bigint,integer,integer,citus.shard_transfer_mode) function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) function citus_node_capacity_1(integer) + function citus_node_list(boolean,citus_noderole) function citus_nodeid_for_gpid(bigint) function citus_nodename_for_nodeid(integer) function citus_nodeport_for_nodeid(integer) @@ -389,6 +390,6 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(358 rows) +(359 rows) DROP TABLE extension_basic_types; diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index a1e0e9b09..47ce439f3 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -32,6 +32,10 @@ SELECT result FROM run_command_on_workers('SELECT citus_is_primary_node()'); -- get the active nodes SELECT master_get_active_worker_nodes(); +SELECT * from citus_node_list(); + +SELECT * from citus_node_list(active := NULL); + -- try to add a node that is already in the cluster SELECT * FROM master_add_node('localhost', :worker_1_port); @@ -51,6 +55,22 @@ SELECT citus_disable_node('localhost', :worker_2_port); SELECT public.wait_until_metadata_sync(20000); SELECT master_get_active_worker_nodes(); +SELECT * from citus_node_list(active := TRUE); + +SELECT * from citus_node_list(active := FALSE); + +SELECT * from citus_node_list(role := 'worker'); + +SELECT * from citus_node_list(role := 'coordinator'); + +SELECT * from citus_node_list(role := NULL); + +SELECT * from citus_node_list(role := 'foo'); + +SELECT * from citus_node_list(active := FALSE, role := 'coordinator'); + +SELECT * from citus_node_list(active := FALSE, role := 'worker'); + -- add some shard placements to the cluster SET citus.shard_count TO 16; SET citus.shard_replication_factor TO 1;