mirror of https://github.com/citusdata/citus.git
768 lines
29 KiB
Plaintext
768 lines
29 KiB
Plaintext
-- File to create functions and helpers needed for subsequent tests
|
|
-- create a helper function to create objects on each node
|
|
CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text)
|
|
RETURNS void LANGUAGE plpgsql AS $$
|
|
BEGIN
|
|
EXECUTE p_sql;
|
|
PERFORM run_command_on_workers(p_sql);
|
|
END;$$;
|
|
-- Create a function to make sure that queries returning the same result
|
|
CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
|
|
BEGIN
|
|
EXECUTE query;
|
|
EXCEPTION WHEN OTHERS THEN
|
|
IF SQLERRM LIKE 'failed to execute task%' THEN
|
|
RAISE 'Task failed to execute';
|
|
END IF;
|
|
END;
|
|
$$LANGUAGE plpgsql;
|
|
-- Create a function to ignore worker plans in explain output
|
|
CREATE OR REPLACE FUNCTION coordinator_plan(explain_command text, out query_plan text)
|
|
RETURNS SETOF TEXT AS $$
|
|
BEGIN
|
|
FOR query_plan IN execute explain_command LOOP
|
|
RETURN next;
|
|
IF query_plan LIKE '%Task Count:%'
|
|
THEN
|
|
RETURN;
|
|
END IF;
|
|
END LOOP;
|
|
RETURN;
|
|
END; $$ language plpgsql;
|
|
-- Create a function to ignore worker plans in explain output
|
|
-- It also shows task count for plan and subplans
|
|
CREATE OR REPLACE FUNCTION coordinator_plan_with_subplans(explain_command text, out query_plan text)
|
|
RETURNS SETOF TEXT AS $$
|
|
DECLARE
|
|
task_count_line_reached boolean := false;
|
|
BEGIN
|
|
FOR query_plan IN execute explain_command LOOP
|
|
IF NOT task_count_line_reached THEN
|
|
RETURN next;
|
|
END IF;
|
|
IF query_plan LIKE '%Task Count:%' THEN
|
|
IF NOT task_count_line_reached THEN
|
|
SELECT true INTO task_count_line_reached;
|
|
ELSE
|
|
RETURN next;
|
|
END IF;
|
|
END IF;
|
|
END LOOP;
|
|
RETURN;
|
|
END; $$ language plpgsql;
|
|
-- Create a function to normalize Memory Usage, Buckets, Batches
|
|
CREATE OR REPLACE FUNCTION plan_normalize_memory(explain_command text, out query_plan text)
|
|
RETURNS SETOF TEXT AS $$
|
|
BEGIN
|
|
FOR query_plan IN execute explain_command LOOP
|
|
query_plan := regexp_replace(query_plan, '(Memory( Usage)?|Buckets|Batches): \S*', '\1: xxx', 'g');
|
|
RETURN NEXT;
|
|
END LOOP;
|
|
END; $$ language plpgsql;
|
|
-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
|
|
CREATE OR REPLACE FUNCTION explain_has_is_not_null(explain_command text)
|
|
RETURNS BOOLEAN AS $$
|
|
DECLARE
|
|
query_plan text;
|
|
BEGIN
|
|
FOR query_plan IN EXECUTE explain_command LOOP
|
|
IF query_plan ILIKE '%is not null%'
|
|
THEN
|
|
RETURN true;
|
|
END IF;
|
|
END LOOP;
|
|
RETURN false;
|
|
END; $$ language plpgsql;
|
|
-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
|
|
CREATE OR REPLACE FUNCTION explain_has_distributed_subplan(explain_command text)
|
|
RETURNS BOOLEAN AS $$
|
|
DECLARE
|
|
query_plan text;
|
|
BEGIN
|
|
FOR query_plan IN EXECUTE explain_command LOOP
|
|
IF query_plan ILIKE '%Distributed Subplan %_%'
|
|
THEN
|
|
RETURN true;
|
|
END IF;
|
|
END LOOP;
|
|
RETURN false;
|
|
END; $$ language plpgsql;
|
|
--helper function to check there is a single task
|
|
CREATE OR REPLACE FUNCTION explain_has_single_task(explain_command text)
|
|
RETURNS BOOLEAN AS $$
|
|
DECLARE
|
|
query_plan text;
|
|
BEGIN
|
|
FOR query_plan IN EXECUTE explain_command LOOP
|
|
IF query_plan ILIKE '%Task Count: 1%'
|
|
THEN
|
|
RETURN true;
|
|
END IF;
|
|
END LOOP;
|
|
RETURN false;
|
|
END; $$ language plpgsql;
|
|
-- helper function to quickly run SQL on the whole cluster
|
|
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
|
|
RETURNS void LANGUAGE plpgsql AS $$
|
|
BEGIN
|
|
EXECUTE p_sql;
|
|
PERFORM run_command_on_workers(p_sql);
|
|
END;$$;
|
|
-- 1. Marks the given procedure as colocated with the given table.
|
|
-- 2. Marks the argument index with which we route the procedure.
|
|
CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
|
|
RETURNS void LANGUAGE plpgsql AS $$
|
|
BEGIN
|
|
update pg_catalog.pg_dist_object
|
|
set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
|
|
from pg_proc, pg_dist_partition
|
|
where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
|
|
END;$$;
|
|
-- helper function to verify the function of a coordinator is the same on all workers
|
|
CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text)
|
|
RETURNS bool
|
|
LANGUAGE plpgsql
|
|
AS $func$
|
|
DECLARE
|
|
coordinatorSql text;
|
|
workerSql text;
|
|
BEGIN
|
|
SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql;
|
|
FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP
|
|
IF workerSql != coordinatorSql THEN
|
|
RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql;
|
|
RETURN false;
|
|
END IF;
|
|
END LOOP;
|
|
|
|
RETURN true;
|
|
END;
|
|
$func$;
|
|
--
|
|
-- Procedure for creating shards for range partitioned distributed table.
|
|
--
|
|
CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[])
|
|
AS $$
|
|
DECLARE
|
|
new_shardid bigint;
|
|
idx int;
|
|
BEGIN
|
|
FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1))
|
|
LOOP
|
|
SELECT master_create_empty_shard(rel::text) INTO new_shardid;
|
|
UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid;
|
|
END LOOP;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
-- Introduce a function that waits until all cleanup records are deleted, for testing purposes
|
|
CREATE OR REPLACE FUNCTION wait_for_resource_cleanup() RETURNS void
|
|
SET client_min_messages TO ERROR
|
|
AS $$
|
|
DECLARE
|
|
record_count integer;
|
|
BEGIN
|
|
EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count;
|
|
WHILE record_count != 0 LOOP
|
|
CALL pg_catalog.citus_cleanup_orphaned_resources();
|
|
EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count;
|
|
END LOOP;
|
|
END$$ LANGUAGE plpgsql;
|
|
-- Returns the foreign keys where the referencing relation's name starts with
|
|
-- given prefix.
|
|
--
|
|
-- Foreign keys are groupped by their configurations and then the constraint name,
|
|
-- referencing table, and referenced table for each distinct configuration are
|
|
-- aggregated into arrays.
|
|
CREATE OR REPLACE FUNCTION get_grouped_fkey_constraints(referencing_relname_prefix text)
|
|
RETURNS jsonb AS $func$
|
|
DECLARE
|
|
confdelsetcols_column_ref text;
|
|
get_grouped_fkey_constraints_query text;
|
|
result jsonb;
|
|
BEGIN
|
|
-- Read confdelsetcols as null if no such column exists.
|
|
-- This can only be the case for PG versions < 15.
|
|
IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_constraint'::regclass AND attname='confdelsetcols')
|
|
THEN
|
|
confdelsetcols_column_ref := '(SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = conrelid AND attnum = ANY(confdelsetcols))';
|
|
ELSE
|
|
confdelsetcols_column_ref := '(SELECT null::smallint[])';
|
|
END IF;
|
|
|
|
EXECUTE format(
|
|
$$
|
|
SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.constraint_names) AS fkeys_with_different_config FROM (
|
|
SELECT array_agg(constraint_name ORDER BY constraint_oid) AS constraint_names,
|
|
array_agg(referencing_table::regclass::text ORDER BY constraint_oid) AS referencing_tables,
|
|
array_agg(referenced_table::regclass::text ORDER BY constraint_oid) AS referenced_tables,
|
|
referencing_columns, referenced_columns, deferable, deferred, on_update, on_delete, match_type, referencing_columns_set_null_or_default
|
|
FROM (
|
|
SELECT
|
|
oid AS constraint_oid,
|
|
conname AS constraint_name,
|
|
conrelid AS referencing_table,
|
|
(SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = conrelid AND attnum = ANY(conkey)) AS referencing_columns,
|
|
confrelid AS referenced_table,
|
|
(SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = confrelid AND attnum = ANY(confkey)) AS referenced_columns,
|
|
condeferrable AS deferable,
|
|
condeferred AS deferred,
|
|
confupdtype AS on_update,
|
|
confdeltype AS on_delete,
|
|
confmatchtype AS match_type,
|
|
%2$s AS referencing_columns_set_null_or_default
|
|
FROM pg_constraint WHERE starts_with(conrelid::regclass::text, '%1$s') AND contype = 'f'
|
|
) q2
|
|
GROUP BY referencing_columns, referenced_columns, deferable, deferred, on_update, on_delete, match_type, referencing_columns_set_null_or_default
|
|
) q1
|
|
$$,
|
|
referencing_relname_prefix,
|
|
confdelsetcols_column_ref
|
|
) INTO result;
|
|
RETURN result;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
CREATE OR REPLACE FUNCTION get_index_defs(schemaname text, tablename text)
|
|
RETURNS jsonb AS $func$
|
|
DECLARE
|
|
result jsonb;
|
|
indnullsnotdistinct_column_ref text;
|
|
BEGIN
|
|
-- Not use indnullsnotdistinct in group by clause if no such column exists.
|
|
-- This can only be the case for PG versions < 15.
|
|
IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_index'::regclass AND attname='indnullsnotdistinct')
|
|
THEN
|
|
indnullsnotdistinct_column_ref := ',indnullsnotdistinct';
|
|
ELSE
|
|
indnullsnotdistinct_column_ref := '';
|
|
END IF;
|
|
|
|
EXECUTE format(
|
|
$$
|
|
SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.indexnames) AS index_defs FROM (
|
|
SELECT array_agg(indexname ORDER BY indexrelid) AS indexnames,
|
|
array_agg(indexdef ORDER BY indexrelid) AS indexdefs
|
|
FROM pg_indexes
|
|
JOIN pg_index
|
|
ON (indexrelid = (schemaname || '.' || indexname)::regclass)
|
|
WHERE schemaname = '%1$s' AND starts_with(tablename, '%2$s')
|
|
GROUP BY indnatts, indnkeyatts, indisunique, indisprimary, indisexclusion,
|
|
indimmediate, indisclustered, indisvalid, indisready, indislive,
|
|
indisreplident, indkey, indcollation, indclass, indoption, indexprs,
|
|
indpred %3$s
|
|
) q1
|
|
$$,
|
|
schemaname, tablename, indnullsnotdistinct_column_ref) INTO result;
|
|
RETURN result;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
CREATE OR REPLACE FUNCTION get_column_defaults(schemaname text, tablename text)
|
|
RETURNS jsonb AS $func$
|
|
DECLARE
|
|
result jsonb;
|
|
BEGIN
|
|
EXECUTE format(
|
|
$$
|
|
SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.column_name) AS column_defs FROM (
|
|
SELECT column_name, column_default::text, generation_expression::text
|
|
FROM information_schema.columns
|
|
WHERE table_schema = '%1$s' AND table_name = '%2$s' AND
|
|
column_default IS NOT NULL OR generation_expression IS NOT NULL
|
|
) q1
|
|
$$,
|
|
schemaname, tablename) INTO result;
|
|
RETURN result;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
CREATE OR REPLACE FUNCTION get_column_attrs(relname_prefix text)
|
|
RETURNS jsonb AS $func$
|
|
DECLARE
|
|
result jsonb;
|
|
BEGIN
|
|
EXECUTE format(
|
|
$$
|
|
SELECT to_jsonb(q2.*) FROM (
|
|
SELECT relnames, jsonb_agg(to_jsonb(q1.*) - 'relnames' ORDER BY q1.column_name) AS column_attrs FROM (
|
|
SELECT array_agg(attrelid::regclass::text ORDER BY attrelid) AS relnames,
|
|
attname AS column_name, typname AS type_name, collname AS collation_name, attcompression AS compression_method, attnotnull AS not_null
|
|
FROM pg_attribute pa
|
|
LEFT JOIN pg_type pt ON (pa.atttypid = pt.oid)
|
|
LEFT JOIN pg_collation pc1 ON (pa.attcollation = pc1.oid)
|
|
JOIN pg_class pc2 ON (pa.attrelid = pc2.oid)
|
|
WHERE starts_with(attrelid::regclass::text, '%1$s') AND
|
|
attnum > 0 AND NOT attisdropped AND relkind = 'r'
|
|
GROUP BY column_name, type_name, collation_name, compression_method, not_null
|
|
) q1
|
|
GROUP BY relnames
|
|
) q2
|
|
$$,
|
|
relname_prefix) INTO result;
|
|
RETURN result;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
-- Returns true if all shard placements of given table have given number of indexes.
|
|
CREATE OR REPLACE FUNCTION verify_index_count_on_shard_placements(
|
|
qualified_table_name text,
|
|
n_expected_indexes int)
|
|
RETURNS BOOLEAN
|
|
AS $func$
|
|
DECLARE
|
|
v_result boolean;
|
|
BEGIN
|
|
SELECT n_expected_indexes = ALL(
|
|
SELECT result::int INTO v_result
|
|
FROM run_command_on_placements(
|
|
qualified_table_name,
|
|
$$SELECT COUNT(*) FROM pg_index WHERE indrelid::regclass = '%s'::regclass$$
|
|
)
|
|
);
|
|
RETURN v_result;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
-- Returns names of the foreign keys that shards of given table are involved in
|
|
-- (as referencing or referenced one).
|
|
CREATE OR REPLACE FUNCTION get_fkey_names_on_placements(
|
|
qualified_table_name text)
|
|
RETURNS TABLE (
|
|
on_node text,
|
|
shard_id bigint,
|
|
fkey_names text[]
|
|
)
|
|
AS $func$
|
|
BEGIN
|
|
RETURN QUERY SELECT
|
|
CASE WHEN groupid = 0 THEN 'on_coordinator' ELSE 'on_worker' END AS on_node_col,
|
|
shardid,
|
|
(CASE WHEN result = '' THEN '{}' ELSE result END)::text[] AS fkey_names_col
|
|
FROM run_command_on_placements(
|
|
qualified_table_name,
|
|
$$SELECT array_agg(conname ORDER BY conname) FROM pg_constraint WHERE '%s'::regclass IN (conrelid, confrelid) AND contype = 'f'$$
|
|
)
|
|
JOIN pg_dist_node USING (nodename, nodeport);
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
-- Returns true if all shard placements of given table have given number of partitions.
|
|
CREATE OR REPLACE FUNCTION verify_partition_count_on_placements(
|
|
qualified_table_name text,
|
|
n_expected_partitions int)
|
|
RETURNS BOOLEAN
|
|
AS $func$
|
|
DECLARE
|
|
v_result boolean;
|
|
BEGIN
|
|
SELECT n_expected_partitions = ALL(
|
|
SELECT result::int INTO v_result
|
|
FROM run_command_on_placements(
|
|
qualified_table_name,
|
|
$$SELECT COUNT(*) FROM pg_inherits WHERE inhparent = '%s'::regclass;$$
|
|
)
|
|
);
|
|
RETURN v_result;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
-- This function checks pg_dist_placement on all nodes and returns true if the following holds:
|
|
-- Whether shard is on the coordinator or on a primary worker node, and if this is expected.
|
|
-- Given shardid is used for shard placement of the table.
|
|
-- Placement metadata is correct on all nodes.
|
|
CREATE OR REPLACE FUNCTION verify_shard_placement_for_single_shard_table(
|
|
qualified_table_name text,
|
|
expected_shard_id bigint,
|
|
expect_placement_on_coord boolean)
|
|
RETURNS BOOLEAN
|
|
AS $func$
|
|
DECLARE
|
|
verify_workers_query text;
|
|
nodename_nodeport_groupid record;
|
|
result boolean;
|
|
BEGIN
|
|
SELECT nodename, nodeport, groupid INTO nodename_nodeport_groupid
|
|
FROM pg_dist_shard
|
|
JOIN pg_dist_placement USING (shardid)
|
|
JOIN pg_dist_node USING (groupid)
|
|
WHERE noderole = 'primary' AND shouldhaveshards AND isactive AND
|
|
logicalrelid = qualified_table_name::regclass AND shardid = expected_shard_id;
|
|
|
|
IF nodename_nodeport_groupid IS NULL
|
|
THEN
|
|
RAISE NOTICE 'Shard placement is not on a primary worker node';
|
|
RETURN false;
|
|
END IF;
|
|
|
|
IF (nodename_nodeport_groupid.groupid = 0) != expect_placement_on_coord
|
|
THEN
|
|
RAISE NOTICE 'Shard placement is on an unexpected node';
|
|
RETURN false;
|
|
END IF;
|
|
|
|
-- verify that metadata on workers is correct too
|
|
SELECT format(
|
|
'SELECT true = ALL(
|
|
SELECT result::boolean FROM run_command_on_workers($$
|
|
SELECT COUNT(*) = 1
|
|
FROM pg_dist_shard
|
|
JOIN pg_dist_placement USING (shardid)
|
|
JOIN pg_dist_node USING (groupid)
|
|
WHERE logicalrelid = ''%s''::regclass AND
|
|
shardid = %s AND
|
|
nodename = ''%s'' AND
|
|
nodeport = %s AND
|
|
groupid = %s
|
|
$$)
|
|
);',
|
|
qualified_table_name, expected_shard_id,
|
|
nodename_nodeport_groupid.nodename,
|
|
nodename_nodeport_groupid.nodeport,
|
|
nodename_nodeport_groupid.groupid
|
|
)
|
|
INTO verify_workers_query;
|
|
|
|
EXECUTE verify_workers_query INTO result;
|
|
RETURN result;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
-- This function checks pg_dist_placement on all nodes and returns true if the following holds:
|
|
-- Shard placement exist on coordinator and on all primary worker nodes.
|
|
-- Given shardid is used for shard placements of the table.
|
|
-- Given placementid is used for the coordinator shard placement.
|
|
-- Placement metadata is correct on all nodes.
|
|
CREATE OR REPLACE FUNCTION verify_shard_placements_for_reference_table(
|
|
qualified_table_name text,
|
|
expected_shard_id bigint,
|
|
expected_coord_placement_id bigint)
|
|
RETURNS BOOLEAN
|
|
AS $func$
|
|
DECLARE
|
|
verify_workers_query text;
|
|
result boolean;
|
|
BEGIN
|
|
SELECT format(
|
|
'SELECT true = ALL(
|
|
SELECT result::boolean FROM run_command_on_all_nodes($$
|
|
SELECT
|
|
(SELECT COUNT(*) FROM pg_dist_node WHERE noderole = ''primary'' AND isactive) =
|
|
(SELECT COUNT(*)
|
|
FROM pg_dist_shard
|
|
JOIN pg_dist_placement USING (shardid)
|
|
JOIN pg_dist_node USING (groupid)
|
|
WHERE noderole = ''primary'' AND isactive AND
|
|
logicalrelid = ''%s''::regclass AND shardid = %s)
|
|
AND
|
|
(SELECT COUNT(*) = 1
|
|
FROM pg_dist_shard
|
|
JOIN pg_dist_placement USING (shardid)
|
|
JOIN pg_dist_node USING (groupid)
|
|
WHERE noderole = ''primary'' AND isactive AND
|
|
logicalrelid = ''%s''::regclass AND shardid = %s AND
|
|
placementid = %s AND groupid = 0)
|
|
|
|
$$)
|
|
);',
|
|
qualified_table_name, expected_shard_id,
|
|
qualified_table_name, expected_shard_id,
|
|
expected_coord_placement_id
|
|
)
|
|
INTO verify_workers_query;
|
|
|
|
EXECUTE verify_workers_query INTO result;
|
|
RETURN result;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
-- This function checks pg_dist_partition on all nodes and returns true if the metadata
|
|
-- record for given single-shard table is correct.
|
|
CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_single_shard_table(
|
|
qualified_table_name text)
|
|
RETURNS BOOLEAN
|
|
AS $func$
|
|
DECLARE
|
|
verify_workers_query text;
|
|
result boolean;
|
|
BEGIN
|
|
SELECT format(
|
|
'SELECT true = ALL(
|
|
SELECT result::boolean FROM run_command_on_all_nodes($$
|
|
SELECT COUNT(*) = 1
|
|
FROM pg_dist_partition
|
|
WHERE logicalrelid = ''%s''::regclass AND
|
|
partmethod = ''n'' AND
|
|
partkey IS NULL AND
|
|
colocationid > 0 AND
|
|
repmodel = ''s'' AND
|
|
autoconverted = false
|
|
$$)
|
|
);',
|
|
qualified_table_name)
|
|
INTO verify_workers_query;
|
|
|
|
EXECUTE verify_workers_query INTO result;
|
|
RETURN result;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
-- This function checks pg_dist_partition on all nodes and returns true if the metadata
|
|
-- record for given reference table is correct.
|
|
CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_reference_table(
|
|
qualified_table_name text)
|
|
RETURNS BOOLEAN
|
|
AS $func$
|
|
DECLARE
|
|
verify_workers_query text;
|
|
result boolean;
|
|
BEGIN
|
|
SELECT format(
|
|
'SELECT true = ALL(
|
|
SELECT result::boolean FROM run_command_on_all_nodes($$
|
|
SELECT COUNT(*) = 1
|
|
FROM pg_dist_partition
|
|
WHERE logicalrelid = ''%s''::regclass AND
|
|
partmethod = ''n'' AND
|
|
partkey IS NULL AND
|
|
colocationid > 0 AND
|
|
repmodel = ''t'' AND
|
|
autoconverted = false
|
|
$$)
|
|
);',
|
|
qualified_table_name)
|
|
INTO verify_workers_query;
|
|
|
|
EXECUTE verify_workers_query INTO result;
|
|
RETURN result;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
-- Returns pg_seclabels entries from all nodes in the cluster for which
|
|
-- the object name is the input.
|
|
CREATE OR REPLACE FUNCTION get_citus_tests_label_provider_labels(object_name text,
|
|
master_port INTEGER DEFAULT 57636,
|
|
worker_1_port INTEGER DEFAULT 57637,
|
|
worker_2_port INTEGER DEFAULT 57638)
|
|
RETURNS TABLE (
|
|
node_type text,
|
|
result text
|
|
)
|
|
AS $func$
|
|
DECLARE
|
|
pg_seclabels_cmd TEXT := 'SELECT to_jsonb(q.*) FROM (' ||
|
|
'SELECT provider, objtype, label FROM pg_seclabels ' ||
|
|
'WHERE objname = ''' || object_name || ''') q';
|
|
BEGIN
|
|
RETURN QUERY
|
|
SELECT
|
|
CASE
|
|
WHEN nodeport = master_port THEN 'coordinator'
|
|
WHEN nodeport = worker_1_port THEN 'worker_1'
|
|
WHEN nodeport = worker_2_port THEN 'worker_2'
|
|
ELSE 'unexpected_node'
|
|
END AS node_type,
|
|
a.result
|
|
FROM run_command_on_all_nodes(pg_seclabels_cmd) a
|
|
JOIN pg_dist_node USING (nodeid)
|
|
ORDER BY node_type;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
-- For all nodes, returns database properties of given database, except
|
|
-- oid, datfrozenxid and datminmxid.
|
|
--
|
|
-- Also returns whether the node has a pg_dist_object record for the database
|
|
-- and whether there are any stale pg_dist_object records for a database.
|
|
CREATE OR REPLACE FUNCTION check_database_on_all_nodes(p_database_name text)
|
|
RETURNS TABLE (node_type text, result text)
|
|
AS $func$
|
|
DECLARE
|
|
pg_ge_15_17_options text := '';
|
|
pg_ge_16_options text := '';
|
|
BEGIN
|
|
IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_database'::regclass AND attname = 'daticulocale') THEN
|
|
pg_ge_15_17_options := ', daticulocale, datcollversion, datlocprovider';
|
|
ELSIF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_database'::regclass AND attname = 'datlocale') THEN
|
|
pg_ge_15_17_options := ', datlocale as daticulocale, datcollversion, datlocprovider';
|
|
ELSE
|
|
pg_ge_15_17_options := $$, null as daticulocale, null as datcollversion, 'c' as datlocprovider$$;
|
|
END IF;
|
|
|
|
IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_database'::regclass AND attname = 'daticurules') THEN
|
|
pg_ge_16_options := ', daticurules';
|
|
ELSE
|
|
pg_ge_16_options := ', null as daticurules';
|
|
END IF;
|
|
|
|
RETURN QUERY
|
|
SELECT
|
|
CASE WHEN (groupid = 0 AND groupid = (SELECT groupid FROM pg_dist_local_group)) THEN 'coordinator (local)'
|
|
WHEN (groupid = 0) THEN 'coordinator (remote)'
|
|
WHEN (groupid = (SELECT groupid FROM pg_dist_local_group)) THEN 'worker node (local)'
|
|
ELSE 'worker node (remote)'
|
|
END AS node_type,
|
|
q2.result
|
|
FROM run_command_on_all_nodes(
|
|
format(
|
|
$$
|
|
SELECT to_jsonb(q.*)
|
|
FROM (
|
|
SELECT
|
|
(
|
|
SELECT to_jsonb(database_properties.*)
|
|
FROM (
|
|
SELECT datname, pa.rolname as database_owner,
|
|
pg_encoding_to_char(pd.encoding) as encoding,
|
|
datistemplate, datallowconn, datconnlimit, datacl,
|
|
pt.spcname AS tablespace, datcollate, datctype
|
|
%2$s -- >= pg15 & pg17 options
|
|
%3$s -- >= pg16 options
|
|
FROM pg_database pd
|
|
JOIN pg_authid pa ON pd.datdba = pa.oid
|
|
JOIN pg_tablespace pt ON pd.dattablespace = pt.oid
|
|
WHERE datname = '%1$s'
|
|
) database_properties
|
|
) AS database_properties,
|
|
(
|
|
SELECT COUNT(*)=1
|
|
FROM pg_dist_object WHERE objid = (SELECT oid FROM pg_database WHERE datname = '%1$s')
|
|
) AS pg_dist_object_record_for_db_exists,
|
|
(
|
|
SELECT COUNT(*) > 0
|
|
FROM pg_dist_object
|
|
WHERE classid = 1262 AND objid NOT IN (SELECT oid FROM pg_database)
|
|
) AS stale_pg_dist_object_record_for_a_db_exists
|
|
) q
|
|
$$,
|
|
p_database_name, pg_ge_15_17_options, pg_ge_16_options
|
|
)
|
|
) q2
|
|
JOIN pg_dist_node USING (nodeid);
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
CREATE OR REPLACE FUNCTION check_parameter_privileges(users text[], parameters text[], permissions text[])
|
|
RETURNS TABLE ( res text, usr text, param text, perms text) AS $func$
|
|
DECLARE
|
|
u text;
|
|
p text;
|
|
perm text;
|
|
BEGIN
|
|
FOREACH u IN ARRAY users
|
|
LOOP
|
|
FOREACH p IN ARRAY parameters
|
|
LOOP
|
|
FOREACH perm IN ARRAY permissions
|
|
LOOP
|
|
RETURN QUERY EXECUTE format($inner$SELECT result ,'%1$s','%2$s','%3$s' FROM run_command_on_all_nodes($$SELECT has_parameter_privilege('%1$s','%2$s', '%3$s'); $$)$inner$, u, p, perm);
|
|
END LOOP;
|
|
END LOOP;
|
|
END LOOP;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
CREATE OR REPLACE FUNCTION check_database_privileges(role_name text, db_name text, permissions text[])
|
|
RETURNS TABLE(permission text, result text)
|
|
AS $func$
|
|
DECLARE
|
|
permission text;
|
|
BEGIN
|
|
FOREACH permission IN ARRAY permissions
|
|
LOOP
|
|
RETURN QUERY EXECUTE format($inner$SELECT %s, result FROM run_command_on_all_nodes($$select has_database_privilege(%s,%s,%s); $$)$inner$,
|
|
quote_literal(permission), quote_literal(role_name), quote_literal(db_name), quote_literal(permission));
|
|
END LOOP;
|
|
END;
|
|
$func$ LANGUAGE plpgsql;
|
|
CREATE or REPLACE FUNCTION initplan_references_to_pg17(text) returns text AS $$
|
|
DECLARE
|
|
expr_parts text[];
|
|
initplan_refs text[];
|
|
n_initplan_refs int = 0;
|
|
i int := 1;
|
|
rv text := '';
|
|
expr_part text;
|
|
BEGIN
|
|
-- Split the line on each $x; there must be at least one
|
|
-- For example 'foo = $0 and bar < $1' is split to: [ 'foo =', 'bar <' ]
|
|
expr_parts := regexp_split_to_array($1, '\$\d+');
|
|
|
|
-- Construct the PG17 formatted names in the given text
|
|
-- for example 'foo = $0 and bar < $1' yields [ '(InitPlan1).col1', '(InitPlan2).col1' ]
|
|
initplan_refs := ARRAY(select '(InitPlan ' || substr(x[1],2)::int + 1 || ').col1' from regexp_matches($1, '\$\d', 'g') x);
|
|
n_initplan_refs := array_length(initplan_refs, 1);
|
|
|
|
-- Combine expression parts with PG17 formatted names
|
|
FOREACH expr_part IN ARRAY expr_parts
|
|
LOOP
|
|
rv := rv || expr_part;
|
|
-- There should be more expr parts than init plan refs so
|
|
-- check init plan refs boundary each time
|
|
IF i <= n_initplan_refs THEN
|
|
rv := rv || initplan_refs[i];
|
|
END IF;
|
|
i := i + 1;
|
|
END LOOP;
|
|
RETURN rv;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
-- This function formats EXPLAIN output to conform to how PG17 EXPLAIN shows
|
|
-- scalar subquery outputs if the pg version is less than 17 (*). When 17
|
|
-- becomes the minimum supported pgversion this function can be retired.
|
|
--
|
|
-- (*) https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=fd0398fcb
|
|
CREATE OR REPLACE FUNCTION explain_with_pg17_initplan_format(explain_command text, out query_plan text)
|
|
RETURNS SETOF TEXT AS $$
|
|
DECLARE
|
|
pgversion int = 0;
|
|
BEGIN
|
|
pgversion = substring(version(), '\d+')::int ;
|
|
FOR query_plan IN execute explain_command LOOP
|
|
IF pgversion < 17 THEN
|
|
-- Two types of format changes are needed:
|
|
-- 1) 'Init Plan 1 (returns $0)' becomes just 'Init Plan 1'
|
|
-- 2) 'foo = $0' becomes 'foo = (InitPlan 1).col1'
|
|
IF query_plan ~ 'InitPlan \d \(returns' THEN
|
|
query_plan = regexp_replace(query_plan, '\(returns \$\d\)', '', 'g');
|
|
ELSIF query_plan ~ '\$\d' THEN
|
|
-- This line contains at least one InitPlan reference
|
|
-- Replace it to have PG17 style InitPlan references
|
|
query_plan = public.initplan_references_to_pg17(query_plan);
|
|
END IF;
|
|
END IF;
|
|
RETURN NEXT;
|
|
END LOOP;
|
|
END; $$ language plpgsql;
|
|
-- This function formats EXPLAIN output to conform to how pg <= 16 EXPLAIN
|
|
-- shows ANY <subquery> in an expression the pg version >= 17. When 17 is
|
|
-- the minimum supported pgversion this function can be retired. The commit
|
|
-- that changed how ANY <subquery> exrpressions appear in EXPLAIN is:
|
|
-- https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=fd0398fcb
|
|
CREATE OR REPLACE FUNCTION explain_with_pg16_subplan_format(explain_command text, out query_plan text)
|
|
RETURNS SETOF TEXT AS $$
|
|
DECLARE
|
|
pgversion int = 0;
|
|
BEGIN
|
|
pgversion = substring(version(), '\d+')::int ;
|
|
FOR query_plan IN execute explain_command LOOP
|
|
IF pgversion >= 17 THEN
|
|
IF query_plan ~ 'SubPlan \d+\).col' THEN
|
|
query_plan = regexp_replace(query_plan, '\(ANY \(\w+ = \(SubPlan (\d+)\).col1\)\)', '(SubPlan \1)', 'g');
|
|
END IF;
|
|
END IF;
|
|
RETURN NEXT;
|
|
END LOOP;
|
|
END; $$ language plpgsql;
|
|
-- To produce stable regression test output, it's usually necessary to
|
|
-- ignore details such as exact costs or row counts. These filter
|
|
-- functions replace changeable output details with fixed strings.
|
|
-- Copied from PG explain.sql
|
|
create function explain_filter(text) returns setof text
|
|
language plpgsql as
|
|
$$
|
|
declare
|
|
ln text;
|
|
begin
|
|
for ln in execute $1
|
|
loop
|
|
-- Replace any numeric word with just 'N'
|
|
ln := regexp_replace(ln, '-?\m\d+\M', 'N', 'g');
|
|
-- In sort output, the above won't match units-suffixed numbers
|
|
ln := regexp_replace(ln, '\m\d+kB', 'NkB', 'g');
|
|
-- Ignore text-mode buffers output because it varies depending
|
|
-- on the system state
|
|
CONTINUE WHEN (ln ~ ' +Buffers: .*');
|
|
-- Ignore text-mode "Planning:" line because whether it's output
|
|
-- varies depending on the system state
|
|
CONTINUE WHEN (ln = 'Planning:');
|
|
return next ln;
|
|
end loop;
|
|
end;
|
|
$$;
|