Add udf citus_node_list(active, role)

Add new udf: citus_node_list with arguments:
 IN active BOOL DEFAULT null,
  IN role citus_noderole DEFAULT null,
  OUT node_name text,
  OUT node_port bigint

When the udf is called without any arguments, it will return all nodes.

Add corresponding drop statements for the downgrade script:
   src/backend/distributed/sql/downgrades/citus--13.1-1--13.0-1.sql

Reformat some of the function comments to fit in 80 character
column limit.

Update test cases for the new udf
udf-citus-node-list
Alper Kocatas 2025-03-24 20:03:59 +00:00
parent a7e686c106
commit 2015bda5d9
11 changed files with 336 additions and 10 deletions

View File

@ -83,6 +83,7 @@ static char * CitusCreateAlterColumnarTableSet(char *qualifiedRelationName,
const ColumnarOptions *options); const ColumnarOptions *options);
static char * GetTableDDLCommandColumnar(void *context); static char * GetTableDDLCommandColumnar(void *context);
static TableDDLCommand * ColumnarGetTableOptionsDDL(Oid relationId); static TableDDLCommand * ColumnarGetTableOptionsDDL(Oid relationId);
static CitusNodeRoleEnum LookupCitusNodeRole(Oid roleOid);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_get_table_metadata); PG_FUNCTION_INFO_V1(master_get_table_metadata);
@ -94,7 +95,7 @@ PG_FUNCTION_INFO_V1(citus_get_active_worker_nodes);
PG_FUNCTION_INFO_V1(master_get_round_robin_candidate_nodes); PG_FUNCTION_INFO_V1(master_get_round_robin_candidate_nodes);
PG_FUNCTION_INFO_V1(master_stage_shard_row); PG_FUNCTION_INFO_V1(master_stage_shard_row);
PG_FUNCTION_INFO_V1(master_stage_shard_placement_row); PG_FUNCTION_INFO_V1(master_stage_shard_placement_row);
PG_FUNCTION_INFO_V1(citus_node_list);
/* /*
* master_get_table_metadata is a deprecated UDF. * master_get_table_metadata is a deprecated UDF.
@ -438,6 +439,114 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS)
} }
/*
* citus_node_list returns a set of node host names and port numbers
* using the provided arguments. The arguments are: active boolean, and role
* (enum). The active boolean indicates whether to return only active nodes
* or all nodes (when provided as null). The role indicates whether to return
* only nodes with a specific role (worker or coordinator) or all nodes (when
* provided as null). Default values for both arguments are null.
*/
Datum
citus_node_list(PG_FUNCTION_ARGS)
{
ActiveFilterEnum activeFilter = ACTIVE_FILTER_ALL;
if (PG_NARGS() > 0 && !PG_ARGISNULL(0))
{
bool active = PG_GETARG_BOOL(0);
activeFilter = active ? ACTIVE_FILTER_ACTIVE : ACTIVE_FILTER_INACTIVE;
}
CitusNodeRoleEnum roleFilter = CITUS_NODE_ROLE_ALL;
if (PG_NARGS() > 1 && !PG_ARGISNULL(1))
{
Oid roleOid = PG_GETARG_OID(1);
roleFilter = LookupCitusNodeRole(roleOid);
}
CheckCitusVersion(ERROR);
FuncCallContext *functionContext = NULL;
uint32 workerNodeCount = 0;
if (SRF_IS_FIRSTCALL())
{
/* create a function context for cross-call persistence */
functionContext = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
MemoryContext oldContext = MemoryContextSwitchTo(
functionContext->multi_call_memory_ctx);
List *workerNodeList = FilterNodeListFunc(NoLock,
activeFilter,
roleFilter);
workerNodeCount = (uint32) list_length(workerNodeList);
functionContext->user_fctx = workerNodeList;
functionContext->max_calls = workerNodeCount;
/*
* This tuple descriptor must match the output parameters declared for
* the function in pg_proc.
*/
TupleDesc tupleDescriptor = CreateTemplateTupleDesc(WORKER_NODE_FIELDS);
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 1, "node_name",
TEXTOID, -1, 0);
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 2, "node_port",
INT8OID, -1, 0);
functionContext->tuple_desc = BlessTupleDesc(tupleDescriptor);
MemoryContextSwitchTo(oldContext);
}
functionContext = SRF_PERCALL_SETUP();
uint32 workerNodeIndex = functionContext->call_cntr;
workerNodeCount = functionContext->max_calls;
if (workerNodeIndex < workerNodeCount)
{
List *workerNodeList = functionContext->user_fctx;
WorkerNode *workerNode = list_nth(workerNodeList, workerNodeIndex);
Datum workerNodeDatum = WorkerNodeGetDatum(workerNode,
functionContext->tuple_desc);
SRF_RETURN_NEXT(functionContext, workerNodeDatum);
}
else
{
SRF_RETURN_DONE(functionContext);
}
}
static CitusNodeRoleEnum
LookupCitusNodeRole(Oid roleOid)
{
CitusNodeRoleEnum roleFilter = CITUS_NODE_ROLE_ALL;
Datum enumLabelDatum = DirectFunctionCall1(enum_out, roleOid);
char *enumLabel = DatumGetCString(enumLabelDatum);
if (strncmp(enumLabel, "coordinator", NAMEDATALEN) == 0)
{
roleFilter = CITUS_NODE_ROLE_COORDINATOR;
}
else if (strncmp(enumLabel, "worker", NAMEDATALEN) == 0)
{
roleFilter = CITUS_NODE_ROLE_WORKER;
}
else
{
ereport(ERROR, (errmsg("invalid label for enum: %s", enumLabel)));
}
return roleFilter;
}
/* Finds the relationId from a potentially qualified relation name. */ /* Finds the relationId from a potentially qualified relation name. */
Oid Oid
ResolveRelationId(text *relationName, bool missingOk) ResolveRelationId(text *relationName, bool missingOk)

View File

@ -82,8 +82,9 @@ WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId,
/* /*
* ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary in the cluster. * ActivePrimaryNonCoordinatorNodeCount returns the number of groups with a primary
* This method excludes coordinator even if it is added as a worker to cluster. * in the cluster. This method excludes coordinator even if it is added as a worker
* to cluster.
*/ */
uint32 uint32
ActivePrimaryNonCoordinatorNodeCount(void) ActivePrimaryNonCoordinatorNodeCount(void)
@ -117,7 +118,7 @@ NodeIsCoordinator(WorkerNode *node)
/* /*
* ActiveNodeListFilterFunc returns a list of all active nodes that checkFunction * FilterActiveNodeListFunc returns a list of all active nodes that checkFunction
* returns true for. * returns true for.
* lockMode specifies which lock to use on pg_dist_node, this is necessary when * lockMode specifies which lock to use on pg_dist_node, this is necessary when
* the caller wouldn't want nodes to be added concurrent to their use of this list * the caller wouldn't want nodes to be added concurrent to their use of this list
@ -154,10 +155,87 @@ FilterActiveNodeListFunc(LOCKMODE lockMode, bool (*checkFunction)(WorkerNode *))
/* /*
* ActivePrimaryNonCoordinatorNodeList returns a list of all active primary worker nodes * FilterNodeListFunc returns a list of all active nodes that match the given
* in workerNodeHash. lockMode specifies which lock to use on pg_dist_node, * active and role filters.
* this is necessary when the caller wouldn't want nodes to be added concurrent * 'active' specifies whether to return only active nodes or all nodes
* to their use of this list. * 'role' specifies whether to return coordinator nodes, worker nodes,
* or all nodes. 'lockMode' specifies which lock to use on pg_dist_node,
* this is necessary when the caller wouldn't want nodes to be added
* concurrent to their use of this list.
*/
List *
FilterNodeListFunc(LOCKMODE lockMode, ActiveFilterEnum active,
CitusNodeRoleEnum role)
{
List *workerNodeList = NIL;
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
if (lockMode != NoLock)
{
LockRelationOid(DistNodeRelationId(), lockMode);
}
HTAB *workerNodeHash = GetWorkerNodeHash();
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
bool activeCond = true;
if (active == ACTIVE_FILTER_ACTIVE)
{
activeCond = workerNode->isActive;
}
else if (active == ACTIVE_FILTER_INACTIVE)
{
activeCond = !workerNode->isActive;
}
else if (active == ACTIVE_FILTER_ALL)
{
activeCond = true;
}
else
{
/* This should never happen */
Assert(false);
}
bool roleCond = true;
if (role == CITUS_NODE_ROLE_COORDINATOR)
{
roleCond = NodeIsCoordinator(workerNode);
}
else if (role == CITUS_NODE_ROLE_WORKER)
{
roleCond = !NodeIsCoordinator(workerNode);
}
else if (role == CITUS_NODE_ROLE_ALL)
{
roleCond = true;
}
else
{
/* This should never happen */
Assert(false);
}
if (activeCond && roleCond)
{
WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode));
*workerNodeCopy = *workerNode;
workerNodeList = lappend(workerNodeList, workerNodeCopy);
}
}
return workerNodeList;
}
/*
* ActivePrimaryNonCoordinatorNodeList returns a list of all active primary
* worker nodes in workerNodeHash. lockMode specifies which lock to use on
* pg_dist_node, this is necessary when the caller wouldn't want nodes to
* be added concurrent to their use of this list.
* This method excludes coordinator even if it is added as a worker to cluster. * This method excludes coordinator even if it is added as a worker to cluster.
*/ */
List * List *

View File

@ -48,3 +48,4 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/repl_origin_helper/13.1-1.sql" #include "udfs/repl_origin_helper/13.1-1.sql"
#include "udfs/citus_finish_pg_upgrade/13.1-1.sql" #include "udfs/citus_finish_pg_upgrade/13.1-1.sql"
#include "udfs/citus_is_primary_node/13.1-1.sql" #include "udfs/citus_is_primary_node/13.1-1.sql"
#include "udfs/citus_node_list/13.1-1.sql"

View File

@ -41,3 +41,9 @@ DROP FUNCTION citus_internal.start_replication_origin_tracking();
DROP FUNCTION citus_internal.stop_replication_origin_tracking(); DROP FUNCTION citus_internal.stop_replication_origin_tracking();
DROP FUNCTION citus_internal.is_replication_origin_tracking_active(); DROP FUNCTION citus_internal.is_replication_origin_tracking_active();
#include "../udfs/citus_finish_pg_upgrade/12.1-1.sql" #include "../udfs/citus_finish_pg_upgrade/12.1-1.sql"
DROP FUNCTION pg_catalog.citus_node_list(
IN active BOOL,
IN role citus_noderole,
OUT node_name text,
OUT node_port bigint);
DROP TYPE pg_catalog.citus_noderole;

View File

@ -0,0 +1,15 @@
CREATE TYPE pg_catalog.citus_noderole AS ENUM (
'coordinator', -- node is coordinator
'worker' -- node is worker
);
CREATE OR REPLACE FUNCTION pg_catalog.citus_node_list(
IN active BOOL DEFAULT null,
IN role citus_noderole DEFAULT null,
OUT node_name text,
OUT node_port bigint)
RETURNS SETOF record
LANGUAGE C VOLATILE ROWS 100
AS 'MODULE_PATHNAME', $$citus_node_list$$;
COMMENT ON FUNCTION pg_catalog.citus_node_list(active BOOL, role citus_noderole)
IS 'fetch set of nodes which match the given criteria';

View File

@ -0,0 +1,15 @@
CREATE TYPE pg_catalog.citus_noderole AS ENUM (
'coordinator', -- node is coordinator
'worker' -- node is worker
);
CREATE OR REPLACE FUNCTION pg_catalog.citus_node_list(
IN active BOOL DEFAULT null,
IN role citus_noderole DEFAULT null,
OUT node_name text,
OUT node_port bigint)
RETURNS SETOF record
LANGUAGE C VOLATILE ROWS 100
AS 'MODULE_PATHNAME', $$citus_node_list$$;
COMMENT ON FUNCTION pg_catalog.citus_node_list(active BOOL, role citus_noderole)
IS 'fetch set of nodes which match the given criteria';

View File

@ -112,4 +112,21 @@ extern int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySiz
extern int NodeNamePortCompare(const char *workerLhsName, const char *workerRhsName, extern int NodeNamePortCompare(const char *workerLhsName, const char *workerRhsName,
int workerLhsPort, int workerRhsPort); int workerLhsPort, int workerRhsPort);
typedef enum ActiveFilterEnum
{
ACTIVE_FILTER_ALL = 0,
ACTIVE_FILTER_ACTIVE = 1,
ACTIVE_FILTER_INACTIVE = 2
} ActiveFilterEnum;
typedef enum CitusNodeRoleEnum
{
CITUS_NODE_ROLE_ALL = 0,
CITUS_NODE_ROLE_COORDINATOR = 1,
CITUS_NODE_ROLE_WORKER = 2
} CitusNodeRoleEnum;
extern List * FilterNodeListFunc(LOCKMODE lockMode, ActiveFilterEnum active,
CitusNodeRoleEnum role);
#endif /* WORKER_MANAGER_H */ #endif /* WORKER_MANAGER_H */

View File

@ -72,6 +72,22 @@ SELECT master_get_active_worker_nodes();
(localhost,57637) (localhost,57637)
(2 rows) (2 rows)
SELECT * from citus_node_list();
node_name | node_port
---------------------------------------------------------------------
localhost | 57636
localhost | 57638
localhost | 57637
(3 rows)
SELECT * from citus_node_list(active := NULL);
node_name | node_port
---------------------------------------------------------------------
localhost | 57636
localhost | 57638
localhost | 57637
(3 rows)
-- try to add a node that is already in the cluster -- try to add a node that is already in the cluster
SELECT * FROM master_add_node('localhost', :worker_1_port); SELECT * FROM master_add_node('localhost', :worker_1_port);
master_add_node master_add_node
@ -126,6 +142,53 @@ SELECT master_get_active_worker_nodes();
(localhost,57637) (localhost,57637)
(1 row) (1 row)
SELECT * from citus_node_list(active := TRUE);
node_name | node_port
---------------------------------------------------------------------
localhost | 57636
localhost | 57637
(2 rows)
SELECT * from citus_node_list(active := FALSE);
node_name | node_port
---------------------------------------------------------------------
localhost | 57638
(1 row)
SELECT * from citus_node_list(role := 'worker');
node_name | node_port
---------------------------------------------------------------------
localhost | 57638
localhost | 57637
(2 rows)
SELECT * from citus_node_list(role := 'coordinator');
node_name | node_port
---------------------------------------------------------------------
localhost | 57636
(1 row)
SELECT * from citus_node_list(role := NULL);
node_name | node_port
---------------------------------------------------------------------
localhost | 57636
localhost | 57638
localhost | 57637
(3 rows)
SELECT * from citus_node_list(role := 'foo');
ERROR: invalid input value for enum citus_noderole: "foo"
SELECT * from citus_node_list(active := FALSE, role := 'coordinator');
node_name | node_port
---------------------------------------------------------------------
(0 rows)
SELECT * from citus_node_list(active := FALSE, role := 'worker');
node_name | node_port
---------------------------------------------------------------------
localhost | 57638
(1 row)
-- add some shard placements to the cluster -- add some shard placements to the cluster
SET citus.shard_count TO 16; SET citus.shard_count TO 16;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;

View File

@ -1480,8 +1480,9 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_internal.update_placement_metadata(bigint,integer,integer) void | function citus_internal.update_placement_metadata(bigint,integer,integer) void
| function citus_internal.update_relation_colocation(oid,integer) void | function citus_internal.update_relation_colocation(oid,integer) void
| function citus_is_primary_node() boolean | function citus_is_primary_node() boolean
| function citus_node_list(boolean,citus_noderole) SETOF record
| function citus_unmark_object_distributed(oid,oid,integer,boolean) void | function citus_unmark_object_distributed(oid,oid,integer,boolean) void
(27 rows) (28 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version

View File

@ -148,6 +148,7 @@ ORDER BY 1;
function citus_move_shard_placement(bigint,integer,integer,citus.shard_transfer_mode) function citus_move_shard_placement(bigint,integer,integer,citus.shard_transfer_mode)
function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode) function citus_move_shard_placement(bigint,text,integer,text,integer,citus.shard_transfer_mode)
function citus_node_capacity_1(integer) function citus_node_capacity_1(integer)
function citus_node_list(boolean,citus_noderole)
function citus_nodeid_for_gpid(bigint) function citus_nodeid_for_gpid(bigint)
function citus_nodename_for_nodeid(integer) function citus_nodename_for_nodeid(integer)
function citus_nodeport_for_nodeid(integer) function citus_nodeport_for_nodeid(integer)
@ -389,6 +390,6 @@ ORDER BY 1;
view citus_stat_tenants_local view citus_stat_tenants_local
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(358 rows) (359 rows)
DROP TABLE extension_basic_types; DROP TABLE extension_basic_types;

View File

@ -32,6 +32,10 @@ SELECT result FROM run_command_on_workers('SELECT citus_is_primary_node()');
-- get the active nodes -- get the active nodes
SELECT master_get_active_worker_nodes(); SELECT master_get_active_worker_nodes();
SELECT * from citus_node_list();
SELECT * from citus_node_list(active := NULL);
-- try to add a node that is already in the cluster -- try to add a node that is already in the cluster
SELECT * FROM master_add_node('localhost', :worker_1_port); SELECT * FROM master_add_node('localhost', :worker_1_port);
@ -51,6 +55,22 @@ SELECT citus_disable_node('localhost', :worker_2_port);
SELECT public.wait_until_metadata_sync(20000); SELECT public.wait_until_metadata_sync(20000);
SELECT master_get_active_worker_nodes(); SELECT master_get_active_worker_nodes();
SELECT * from citus_node_list(active := TRUE);
SELECT * from citus_node_list(active := FALSE);
SELECT * from citus_node_list(role := 'worker');
SELECT * from citus_node_list(role := 'coordinator');
SELECT * from citus_node_list(role := NULL);
SELECT * from citus_node_list(role := 'foo');
SELECT * from citus_node_list(active := FALSE, role := 'coordinator');
SELECT * from citus_node_list(active := FALSE, role := 'worker');
-- add some shard placements to the cluster -- add some shard placements to the cluster
SET citus.shard_count TO 16; SET citus.shard_count TO 16;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;