-- File to create functions and helpers needed for subsequent tests -- create a helper function to create objects on each node CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE p_sql; PERFORM run_command_on_workers(p_sql); END;$$; -- Create a function to make sure that queries returning the same result CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ BEGIN EXECUTE query; EXCEPTION WHEN OTHERS THEN IF SQLERRM LIKE 'failed to execute task%' THEN RAISE 'Task failed to execute'; END IF; END; $$LANGUAGE plpgsql; -- Create a function to ignore worker plans in explain output CREATE OR REPLACE FUNCTION coordinator_plan(explain_command text, out query_plan text) RETURNS SETOF TEXT AS $$ BEGIN FOR query_plan IN execute explain_command LOOP RETURN next; IF query_plan LIKE '%Task Count:%' THEN RETURN; END IF; END LOOP; RETURN; END; $$ language plpgsql; -- Create a function to ignore worker plans in explain output -- It also shows task count for plan and subplans CREATE OR REPLACE FUNCTION coordinator_plan_with_subplans(explain_command text, out query_plan text) RETURNS SETOF TEXT AS $$ DECLARE task_count_line_reached boolean := false; BEGIN FOR query_plan IN execute explain_command LOOP IF NOT task_count_line_reached THEN RETURN next; END IF; IF query_plan LIKE '%Task Count:%' THEN IF NOT task_count_line_reached THEN SELECT true INTO task_count_line_reached; ELSE RETURN next; END IF; END IF; END LOOP; RETURN; END; $$ language plpgsql; -- Create a function to normalize Memory Usage, Buckets, Batches CREATE OR REPLACE FUNCTION plan_normalize_memory(explain_command text, out query_plan text) RETURNS SETOF TEXT AS $$ BEGIN FOR query_plan IN execute explain_command LOOP query_plan := regexp_replace(query_plan, '(Memory( Usage)?|Buckets|Batches): \S*', '\1: xxx', 'g'); RETURN NEXT; END LOOP; END; $$ language plpgsql; -- helper function that returns true if output of given explain has "is not null" (case in-sensitive) CREATE OR REPLACE FUNCTION explain_has_is_not_null(explain_command text) RETURNS BOOLEAN AS $$ DECLARE query_plan text; BEGIN FOR query_plan IN EXECUTE explain_command LOOP IF query_plan ILIKE '%is not null%' THEN RETURN true; END IF; END LOOP; RETURN false; END; $$ language plpgsql; -- helper function that returns true if output of given explain has "is not null" (case in-sensitive) CREATE OR REPLACE FUNCTION explain_has_distributed_subplan(explain_command text) RETURNS BOOLEAN AS $$ DECLARE query_plan text; BEGIN FOR query_plan IN EXECUTE explain_command LOOP IF query_plan ILIKE '%Distributed Subplan %_%' THEN RETURN true; END IF; END LOOP; RETURN false; END; $$ language plpgsql; --helper function to check there is a single task CREATE OR REPLACE FUNCTION explain_has_single_task(explain_command text) RETURNS BOOLEAN AS $$ DECLARE query_plan text; BEGIN FOR query_plan IN EXECUTE explain_command LOOP IF query_plan ILIKE '%Task Count: 1%' THEN RETURN true; END IF; END LOOP; RETURN false; END; $$ language plpgsql; -- helper function to quickly run SQL on the whole cluster CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE p_sql; PERFORM run_command_on_workers(p_sql); END;$$; -- 1. Marks the given procedure as colocated with the given table. -- 2. Marks the argument index with which we route the procedure. CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) RETURNS void LANGUAGE plpgsql AS $$ BEGIN update pg_catalog.pg_dist_object set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid from pg_proc, pg_dist_partition where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; END;$$; -- helper function to verify the function of a coordinator is the same on all workers CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text) RETURNS bool LANGUAGE plpgsql AS $func$ DECLARE coordinatorSql text; workerSql text; BEGIN SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql; FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP IF workerSql != coordinatorSql THEN RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql; RETURN false; END IF; END LOOP; RETURN true; END; $func$; -- -- Procedure for creating shards for range partitioned distributed table. -- CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[]) AS $$ DECLARE new_shardid bigint; idx int; BEGIN FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1)) LOOP SELECT master_create_empty_shard(rel::text) INTO new_shardid; UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid; END LOOP; END; $$ LANGUAGE plpgsql; -- Introduce a function that waits until all cleanup records are deleted, for testing purposes CREATE OR REPLACE FUNCTION wait_for_resource_cleanup() RETURNS void SET client_min_messages TO ERROR AS $$ DECLARE record_count integer; BEGIN EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count; WHILE record_count != 0 LOOP CALL pg_catalog.citus_cleanup_orphaned_resources(); EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count; END LOOP; END$$ LANGUAGE plpgsql; -- Returns the foreign keys where the referencing relation's name starts with -- given prefix. -- -- Foreign keys are groupped by their configurations and then the constraint name, -- referencing table, and referenced table for each distinct configuration are -- aggregated into arrays. CREATE OR REPLACE FUNCTION get_grouped_fkey_constraints(referencing_relname_prefix text) RETURNS jsonb AS $func$ DECLARE confdelsetcols_column_ref text; get_grouped_fkey_constraints_query text; result jsonb; BEGIN -- Read confdelsetcols as null if no such column exists. -- This can only be the case for PG versions < 15. IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_constraint'::regclass AND attname='confdelsetcols') THEN confdelsetcols_column_ref := '(SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = conrelid AND attnum = ANY(confdelsetcols))'; ELSE confdelsetcols_column_ref := '(SELECT null::smallint[])'; END IF; EXECUTE format( $$ SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.constraint_names) AS fkeys_with_different_config FROM ( SELECT array_agg(constraint_name ORDER BY constraint_oid) AS constraint_names, array_agg(referencing_table::regclass::text ORDER BY constraint_oid) AS referencing_tables, array_agg(referenced_table::regclass::text ORDER BY constraint_oid) AS referenced_tables, referencing_columns, referenced_columns, deferable, deferred, on_update, on_delete, match_type, referencing_columns_set_null_or_default FROM ( SELECT oid AS constraint_oid, conname AS constraint_name, conrelid AS referencing_table, (SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = conrelid AND attnum = ANY(conkey)) AS referencing_columns, confrelid AS referenced_table, (SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = confrelid AND attnum = ANY(confkey)) AS referenced_columns, condeferrable AS deferable, condeferred AS deferred, confupdtype AS on_update, confdeltype AS on_delete, confmatchtype AS match_type, %2$s AS referencing_columns_set_null_or_default FROM pg_constraint WHERE starts_with(conrelid::regclass::text, '%1$s') AND contype = 'f' ) q2 GROUP BY referencing_columns, referenced_columns, deferable, deferred, on_update, on_delete, match_type, referencing_columns_set_null_or_default ) q1 $$, referencing_relname_prefix, confdelsetcols_column_ref ) INTO result; RETURN result; END; $func$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION get_index_defs(schemaname text, tablename text) RETURNS jsonb AS $func$ DECLARE result jsonb; indnullsnotdistinct_column_ref text; BEGIN -- Not use indnullsnotdistinct in group by clause if no such column exists. -- This can only be the case for PG versions < 15. IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_index'::regclass AND attname='indnullsnotdistinct') THEN indnullsnotdistinct_column_ref := ',indnullsnotdistinct'; ELSE indnullsnotdistinct_column_ref := ''; END IF; EXECUTE format( $$ SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.indexnames) AS index_defs FROM ( SELECT array_agg(indexname ORDER BY indexrelid) AS indexnames, array_agg(indexdef ORDER BY indexrelid) AS indexdefs FROM pg_indexes JOIN pg_index ON (indexrelid = (schemaname || '.' || indexname)::regclass) WHERE schemaname = '%1$s' AND starts_with(tablename, '%2$s') GROUP BY indnatts, indnkeyatts, indisunique, indisprimary, indisexclusion, indimmediate, indisclustered, indisvalid, indisready, indislive, indisreplident, indkey, indcollation, indclass, indoption, indexprs, indpred %3$s ) q1 $$, schemaname, tablename, indnullsnotdistinct_column_ref) INTO result; RETURN result; END; $func$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION get_column_defaults(schemaname text, tablename text) RETURNS jsonb AS $func$ DECLARE result jsonb; BEGIN EXECUTE format( $$ SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.column_name) AS column_defs FROM ( SELECT column_name, column_default::text, generation_expression::text FROM information_schema.columns WHERE table_schema = '%1$s' AND table_name = '%2$s' AND column_default IS NOT NULL OR generation_expression IS NOT NULL ) q1 $$, schemaname, tablename) INTO result; RETURN result; END; $func$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION get_column_attrs(relname_prefix text) RETURNS jsonb AS $func$ DECLARE result jsonb; BEGIN EXECUTE format( $$ SELECT to_jsonb(q2.*) FROM ( SELECT relnames, jsonb_agg(to_jsonb(q1.*) - 'relnames' ORDER BY q1.column_name) AS column_attrs FROM ( SELECT array_agg(attrelid::regclass::text ORDER BY attrelid) AS relnames, attname AS column_name, typname AS type_name, collname AS collation_name, attcompression AS compression_method, attnotnull AS not_null FROM pg_attribute pa LEFT JOIN pg_type pt ON (pa.atttypid = pt.oid) LEFT JOIN pg_collation pc1 ON (pa.attcollation = pc1.oid) JOIN pg_class pc2 ON (pa.attrelid = pc2.oid) WHERE starts_with(attrelid::regclass::text, '%1$s') AND attnum > 0 AND NOT attisdropped AND relkind = 'r' GROUP BY column_name, type_name, collation_name, compression_method, not_null ) q1 GROUP BY relnames ) q2 $$, relname_prefix) INTO result; RETURN result; END; $func$ LANGUAGE plpgsql; -- Returns true if all shard placements of given table have given number of indexes. CREATE OR REPLACE FUNCTION verify_index_count_on_shard_placements( qualified_table_name text, n_expected_indexes int) RETURNS BOOLEAN AS $func$ DECLARE v_result boolean; BEGIN SELECT n_expected_indexes = ALL( SELECT result::int INTO v_result FROM run_command_on_placements( qualified_table_name, $$SELECT COUNT(*) FROM pg_index WHERE indrelid::regclass = '%s'::regclass$$ ) ); RETURN v_result; END; $func$ LANGUAGE plpgsql; -- Returns names of the foreign keys that shards of given table are involved in -- (as referencing or referenced one). CREATE OR REPLACE FUNCTION get_fkey_names_on_placements( qualified_table_name text) RETURNS TABLE ( on_node text, shard_id bigint, fkey_names text[] ) AS $func$ BEGIN RETURN QUERY SELECT CASE WHEN groupid = 0 THEN 'on_coordinator' ELSE 'on_worker' END AS on_node_col, shardid, (CASE WHEN result = '' THEN '{}' ELSE result END)::text[] AS fkey_names_col FROM run_command_on_placements( qualified_table_name, $$SELECT array_agg(conname ORDER BY conname) FROM pg_constraint WHERE '%s'::regclass IN (conrelid, confrelid) AND contype = 'f'$$ ) JOIN pg_dist_node USING (nodename, nodeport); END; $func$ LANGUAGE plpgsql; -- Returns true if all shard placements of given table have given number of partitions. CREATE OR REPLACE FUNCTION verify_partition_count_on_placements( qualified_table_name text, n_expected_partitions int) RETURNS BOOLEAN AS $func$ DECLARE v_result boolean; BEGIN SELECT n_expected_partitions = ALL( SELECT result::int INTO v_result FROM run_command_on_placements( qualified_table_name, $$SELECT COUNT(*) FROM pg_inherits WHERE inhparent = '%s'::regclass;$$ ) ); RETURN v_result; END; $func$ LANGUAGE plpgsql; -- This function checks pg_dist_placement on all nodes and returns true if the following holds: -- Whether shard is on the coordinator or on a primary worker node, and if this is expected. -- Given shardid is used for shard placement of the table. -- Placement metadata is correct on all nodes. CREATE OR REPLACE FUNCTION verify_shard_placement_for_single_shard_table( qualified_table_name text, expected_shard_id bigint, expect_placement_on_coord boolean) RETURNS BOOLEAN AS $func$ DECLARE verify_workers_query text; nodename_nodeport_groupid record; result boolean; BEGIN SELECT nodename, nodeport, groupid INTO nodename_nodeport_groupid FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) JOIN pg_dist_node USING (groupid) WHERE noderole = 'primary' AND shouldhaveshards AND isactive AND logicalrelid = qualified_table_name::regclass AND shardid = expected_shard_id; IF nodename_nodeport_groupid IS NULL THEN RAISE NOTICE 'Shard placement is not on a primary worker node'; RETURN false; END IF; IF (nodename_nodeport_groupid.groupid = 0) != expect_placement_on_coord THEN RAISE NOTICE 'Shard placement is on an unexpected node'; RETURN false; END IF; -- verify that metadata on workers is correct too SELECT format( 'SELECT true = ALL( SELECT result::boolean FROM run_command_on_workers($$ SELECT COUNT(*) = 1 FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) JOIN pg_dist_node USING (groupid) WHERE logicalrelid = ''%s''::regclass AND shardid = %s AND nodename = ''%s'' AND nodeport = %s AND groupid = %s $$) );', qualified_table_name, expected_shard_id, nodename_nodeport_groupid.nodename, nodename_nodeport_groupid.nodeport, nodename_nodeport_groupid.groupid ) INTO verify_workers_query; EXECUTE verify_workers_query INTO result; RETURN result; END; $func$ LANGUAGE plpgsql; -- This function checks pg_dist_placement on all nodes and returns true if the following holds: -- Shard placement exist on coordinator and on all primary worker nodes. -- Given shardid is used for shard placements of the table. -- Given placementid is used for the coordinator shard placement. -- Placement metadata is correct on all nodes. CREATE OR REPLACE FUNCTION verify_shard_placements_for_reference_table( qualified_table_name text, expected_shard_id bigint, expected_coord_placement_id bigint) RETURNS BOOLEAN AS $func$ DECLARE verify_workers_query text; result boolean; BEGIN SELECT format( 'SELECT true = ALL( SELECT result::boolean FROM run_command_on_all_nodes($$ SELECT (SELECT COUNT(*) FROM pg_dist_node WHERE noderole = ''primary'' AND isactive) = (SELECT COUNT(*) FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) JOIN pg_dist_node USING (groupid) WHERE noderole = ''primary'' AND isactive AND logicalrelid = ''%s''::regclass AND shardid = %s) AND (SELECT COUNT(*) = 1 FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) JOIN pg_dist_node USING (groupid) WHERE noderole = ''primary'' AND isactive AND logicalrelid = ''%s''::regclass AND shardid = %s AND placementid = %s AND groupid = 0) $$) );', qualified_table_name, expected_shard_id, qualified_table_name, expected_shard_id, expected_coord_placement_id ) INTO verify_workers_query; EXECUTE verify_workers_query INTO result; RETURN result; END; $func$ LANGUAGE plpgsql; -- This function checks pg_dist_partition on all nodes and returns true if the metadata -- record for given single-shard table is correct. CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_single_shard_table( qualified_table_name text) RETURNS BOOLEAN AS $func$ DECLARE verify_workers_query text; result boolean; BEGIN SELECT format( 'SELECT true = ALL( SELECT result::boolean FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = ''%s''::regclass AND partmethod = ''n'' AND partkey IS NULL AND colocationid > 0 AND repmodel = ''s'' AND autoconverted = false $$) );', qualified_table_name) INTO verify_workers_query; EXECUTE verify_workers_query INTO result; RETURN result; END; $func$ LANGUAGE plpgsql; -- This function checks pg_dist_partition on all nodes and returns true if the metadata -- record for given reference table is correct. CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_reference_table( qualified_table_name text) RETURNS BOOLEAN AS $func$ DECLARE verify_workers_query text; result boolean; BEGIN SELECT format( 'SELECT true = ALL( SELECT result::boolean FROM run_command_on_all_nodes($$ SELECT COUNT(*) = 1 FROM pg_dist_partition WHERE logicalrelid = ''%s''::regclass AND partmethod = ''n'' AND partkey IS NULL AND colocationid > 0 AND repmodel = ''t'' AND autoconverted = false $$) );', qualified_table_name) INTO verify_workers_query; EXECUTE verify_workers_query INTO result; RETURN result; END; $func$ LANGUAGE plpgsql; -- Returns pg_seclabels entries from all nodes in the cluster for which -- the object name is the input. CREATE OR REPLACE FUNCTION get_citus_tests_label_provider_labels(object_name text, master_port INTEGER DEFAULT 57636, worker_1_port INTEGER DEFAULT 57637, worker_2_port INTEGER DEFAULT 57638) RETURNS TABLE ( node_type text, result text ) AS $func$ DECLARE pg_seclabels_cmd TEXT := 'SELECT to_jsonb(q.*) FROM (' || 'SELECT provider, objtype, label FROM pg_seclabels ' || 'WHERE objname = ''' || object_name || ''') q'; BEGIN RETURN QUERY SELECT CASE WHEN nodeport = master_port THEN 'coordinator' WHEN nodeport = worker_1_port THEN 'worker_1' WHEN nodeport = worker_2_port THEN 'worker_2' ELSE 'unexpected_node' END AS node_type, a.result FROM run_command_on_all_nodes(pg_seclabels_cmd) a JOIN pg_dist_node USING (nodeid) ORDER BY node_type; END; $func$ LANGUAGE plpgsql; -- For all nodes, returns database properties of given database, except -- oid, datfrozenxid and datminmxid. -- -- Also returns whether the node has a pg_dist_object record for the database -- and whether there are any stale pg_dist_object records for a database. CREATE OR REPLACE FUNCTION check_database_on_all_nodes(p_database_name text) RETURNS TABLE (node_type text, result text) AS $func$ DECLARE pg_ge_15_options text := ''; pg_ge_16_options text := ''; BEGIN IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_database'::regclass AND attname = 'datlocprovider') THEN pg_ge_15_options := ', daticulocale, datcollversion, datlocprovider'; ELSE pg_ge_15_options := $$, null as daticulocale, null as datcollversion, 'c' as datlocprovider$$; END IF; IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_database'::regclass AND attname = 'daticurules') THEN pg_ge_16_options := ', daticurules'; ELSE pg_ge_16_options := ', null as daticurules'; END IF; RETURN QUERY SELECT CASE WHEN (groupid = 0 AND groupid = (SELECT groupid FROM pg_dist_local_group)) THEN 'coordinator (local)' WHEN (groupid = 0) THEN 'coordinator (remote)' WHEN (groupid = (SELECT groupid FROM pg_dist_local_group)) THEN 'worker node (local)' ELSE 'worker node (remote)' END AS node_type, q2.result FROM run_command_on_all_nodes( format( $$ SELECT to_jsonb(q.*) FROM ( SELECT ( SELECT to_jsonb(database_properties.*) FROM ( SELECT datname, pa.rolname as database_owner, pg_encoding_to_char(pd.encoding) as encoding, datistemplate, datallowconn, datconnlimit, datacl, pt.spcname AS tablespace, datcollate, datctype %2$s -- >= pg15 options %3$s -- >= pg16 options FROM pg_database pd JOIN pg_authid pa ON pd.datdba = pa.oid JOIN pg_tablespace pt ON pd.dattablespace = pt.oid WHERE datname = '%1$s' ) database_properties ) AS database_properties, ( SELECT COUNT(*)=1 FROM pg_dist_object WHERE objid = (SELECT oid FROM pg_database WHERE datname = '%1$s') ) AS pg_dist_object_record_for_db_exists, ( SELECT COUNT(*) > 0 FROM pg_dist_object WHERE classid = 1262 AND objid NOT IN (SELECT oid FROM pg_database) ) AS stale_pg_dist_object_record_for_a_db_exists ) q $$, p_database_name, pg_ge_15_options, pg_ge_16_options ) ) q2 JOIN pg_dist_node USING (nodeid); END; $func$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION check_database_privileges(role_name text, db_name text, permissions text[]) RETURNS TABLE(permission text, result text) AS $func$ DECLARE permission text; BEGIN FOREACH permission IN ARRAY permissions LOOP RETURN QUERY EXECUTE format($inner$SELECT %s, result FROM run_command_on_all_nodes($$select has_database_privilege(%s,%s,%s); $$)$inner$, quote_literal(permission), quote_literal(role_name), quote_literal(db_name), quote_literal(permission)); END LOOP; END; $func$ LANGUAGE plpgsql;