Do not allow distributed functions on non-metadata synced nodes (#5586)

Before this commit, Citus was triggering metadata syncing
in the background when a function is distributed. However,
with Citus 11, we expect all clusters to have metadata synced
enabled. So, we do not expect any nodes not to have the metadata.

This change:
	(a) pro: simplifies the code and opens up possibilities
		 to simplify futher by reducing the scope of
		 bg worker to only sync node metadata
        (b) pro: explicitly asks users to sync the metadata such that
  	    any unforseen impact can be easily detected
        (c) con: For distributed functions without distribution
		 argument, we do not necessarily require the metadata
		 sycned. However, for completeness and simplicity, we
		 do so.
pull/5591/head
Önder Kalacı 2022-01-04 13:12:57 +01:00 committed by GitHub
parent 30eb24009c
commit 0a8b0b06c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 61 additions and 215 deletions

View File

@ -67,6 +67,7 @@
(strncmp(arg, prefix, strlen(prefix)) == 0) (strncmp(arg, prefix, strlen(prefix)) == 0)
/* forward declaration for helper functions*/ /* forward declaration for helper functions*/
static void ErrorIfAnyNodeDoesNotHaveMetadata(void);
static char * GetAggregateDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace); static char * GetAggregateDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace);
static char * GetFunctionAlterOwnerCommand(const RegProcedure funcOid); static char * GetFunctionAlterOwnerCommand(const RegProcedure funcOid);
static int GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName, static int GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName,
@ -77,7 +78,6 @@ static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid
distributionColumnType, Oid distributionColumnType, Oid
sourceRelationId); sourceRelationId);
static void EnsureSequentialModeForFunctionDDL(void); static void EnsureSequentialModeForFunctionDDL(void);
static void TriggerSyncMetadataToPrimaryNodes(void);
static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt);
static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static bool ShouldPropagateAlterFunction(const ObjectAddress *address);
static bool ShouldAddFunctionSignature(FunctionParameterMode mode); static bool ShouldAddFunctionSignature(FunctionParameterMode mode);
@ -194,6 +194,13 @@ create_distributed_function(PG_FUNCTION_ARGS)
if (distributionArgumentName != NULL) if (distributionArgumentName != NULL)
{ {
/*
* Prior to Citus 11, this code was triggering metadata
* syncing. However, with Citus 11+, we expect the metadata
* has already been synced.
*/
ErrorIfAnyNodeDoesNotHaveMetadata();
DistributeFunctionWithDistributionArgument(funcOid, distributionArgumentName, DistributeFunctionWithDistributionArgument(funcOid, distributionArgumentName,
distributionArgumentOid, distributionArgumentOid,
colocateWithTableName, colocateWithTableName,
@ -206,6 +213,13 @@ create_distributed_function(PG_FUNCTION_ARGS)
} }
else if (colocatedWithReferenceTable) else if (colocatedWithReferenceTable)
{ {
/*
* Prior to Citus 11, this code was triggering metadata
* syncing. However, with Citus 11+, we expect the metadata
* has already been synced.
*/
ErrorIfAnyNodeDoesNotHaveMetadata();
DistributeFunctionColocatedWithReferenceTable(&functionAddress); DistributeFunctionColocatedWithReferenceTable(&functionAddress);
} }
@ -213,6 +227,34 @@ create_distributed_function(PG_FUNCTION_ARGS)
} }
/*
* ErrorIfAnyNodeDoesNotHaveMetadata throws error if any
* of the worker nodes does not have the metadata.
*/
static void
ErrorIfAnyNodeDoesNotHaveMetadata(void)
{
List *workerNodeList =
ActivePrimaryNonCoordinatorNodeList(ShareLock);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
if (!workerNode->hasMetadata)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot process the distributed function "
"since the node %s:%d does not have metadata "
"synced and this command requires all the nodes "
"have the metadata sycned", workerNode->workerName,
workerNode->workerPort),
errhint("To sync the metadata execute: "
"SELECT enable_citus_mx_for_pre_citus11();")));
}
}
}
/* /*
* DistributeFunctionWithDistributionArgument updates pg_dist_object records for * DistributeFunctionWithDistributionArgument updates pg_dist_object records for
* a function/procedure that has a distribution argument, and triggers metadata * a function/procedure that has a distribution argument, and triggers metadata
@ -238,13 +280,6 @@ DistributeFunctionWithDistributionArgument(RegProcedure funcOid,
/* record the distribution argument and colocationId */ /* record the distribution argument and colocationId */
UpdateFunctionDistributionInfo(functionAddress, &distributionArgumentIndex, UpdateFunctionDistributionInfo(functionAddress, &distributionArgumentIndex,
&colocationId); &colocationId);
/*
* Once we have at least one distributed function/procedure with distribution
* argument, we sync the metadata to nodes so that the function/procedure
* delegation can be handled locally on the nodes.
*/
TriggerSyncMetadataToPrimaryNodes();
} }
@ -293,13 +328,6 @@ DistributeFunctionColocatedWithReferenceTable(const ObjectAddress *functionAddre
int *distributionArgumentIndex = NULL; int *distributionArgumentIndex = NULL;
UpdateFunctionDistributionInfo(functionAddress, distributionArgumentIndex, UpdateFunctionDistributionInfo(functionAddress, distributionArgumentIndex,
&colocationId); &colocationId);
/*
* Once we have at least one distributed function/procedure that reads
* from a reference table, we sync the metadata to nodes so that the
* function/procedure delegation can be handled locally on the nodes.
*/
TriggerSyncMetadataToPrimaryNodes();
} }
@ -1119,47 +1147,6 @@ EnsureSequentialModeForFunctionDDL(void)
} }
/*
* TriggerSyncMetadataToPrimaryNodes iterates over the active primary nodes,
* and triggers the metadata syncs if the node has not the metadata. Later,
* maintenance daemon will sync the metadata to nodes.
*/
void
TriggerSyncMetadataToPrimaryNodes(void)
{
List *workerList = ActivePrimaryNonCoordinatorNodeList(ShareLock);
bool triggerMetadataSync = false;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerList)
{
/* if already has metadata, no need to do it again */
if (!workerNode->hasMetadata)
{
/*
* Let the maintanince deamon do the hard work of syncing the metadata. We prefer
* this because otherwise node activation might fail withing transaction blocks.
*/
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata,
BoolGetDatum(true));
triggerMetadataSync = true;
}
else if (!workerNode->metadataSynced)
{
triggerMetadataSync = true;
}
}
/* let the maintanince deamon know about the metadata sync */
if (triggerMetadataSync)
{
TriggerMetadataSyncOnCommit();
}
}
/* /*
* ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION * ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION
* statement. We only propagate replace's of distributed functions to keep the function on * statement. We only propagate replace's of distributed functions to keep the function on

View File

@ -316,53 +316,6 @@ IsObjectDistributed(const ObjectAddress *address)
} }
/*
* ClusterHasDistributedFunctionWithDistArgument returns true if there
* is at least one distributed function in the cluster with distribution
* argument index set.
*/
bool
ClusterHasDistributedFunctionWithDistArgument(void)
{
bool foundDistributedFunction = false;
HeapTuple pgDistObjectTup = NULL;
Relation pgDistObjectRel = table_open(DistObjectRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistObjectRel);
SysScanDesc pgDistObjectScan =
systable_beginscan(pgDistObjectRel, InvalidOid, false, NULL, 0, NULL);
while (HeapTupleIsValid(pgDistObjectTup = systable_getnext(pgDistObjectScan)))
{
Form_pg_dist_object pg_dist_object =
(Form_pg_dist_object) GETSTRUCT(pgDistObjectTup);
if (pg_dist_object->classid == ProcedureRelationId)
{
bool distArgumentIsNull =
heap_attisnull(pgDistObjectTup,
Anum_pg_dist_object_distribution_argument_index,
tupleDescriptor);
/* we found one distributed function that has an distribution argument */
if (!distArgumentIsNull)
{
foundDistributedFunction = true;
break;
}
}
}
systable_endscan(pgDistObjectScan);
relation_close(pgDistObjectRel, AccessShareLock);
return foundDistributedFunction;
}
/* /*
* GetDistributedObjectAddressList returns a list of ObjectAddresses that contains all * GetDistributedObjectAddressList returns a list of ObjectAddresses that contains all
* distributed objects as marked in pg_dist_object * distributed objects as marked in pg_dist_object

View File

@ -614,18 +614,6 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode)
ReplicateAllReferenceTablesToNode(newWorkerNode->workerName, ReplicateAllReferenceTablesToNode(newWorkerNode->workerName,
newWorkerNode->workerPort); newWorkerNode->workerPort);
} }
/*
* Let the maintenance daemon do the hard work of syncing the metadata.
* We prefer this because otherwise node activation might fail within
* transaction blocks.
*/
if (ClusterHasDistributedFunctionWithDistArgument())
{
SetWorkerColumnLocalOnly(newWorkerNode, Anum_pg_dist_node_hasmetadata,
BoolGetDatum(true));
TriggerMetadataSyncOnCommit();
}
} }
} }
@ -893,7 +881,7 @@ ActivateNode(char *nodeName, int nodePort)
* not fail just because the current metadata is not synced. * not fail just because the current metadata is not synced.
*/ */
SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(isActive)); BoolGetDatum(true));
} }
SetUpDistributedTableDependencies(workerNode); SetUpDistributedTableDependencies(workerNode);

View File

@ -243,12 +243,6 @@ SELECT * FROM run_command_on_workers($$SELECT function_tests.dup('0123456789ab')
localhost | 57638 | t | (01:23:45:67:89:ab,"01:23:45:67:89:ab is text") localhost | 57638 | t | (01:23:45:67:89:ab,"01:23:45:67:89:ab is text")
(2 rows) (2 rows)
SELECT public.wait_until_metadata_sync(30000);
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table'); SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table');
create_distributed_function create_distributed_function
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -586,13 +580,6 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', dist
(1 row) (1 row)
-- make sure that the primary nodes are now metadata synced
select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
bool_and
---------------------------------------------------------------------
t
(1 row)
-- make sure that both of the nodes have the function because we've succeeded -- make sure that both of the nodes have the function because we've succeeded
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$); SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$);
run_command_on_workers run_command_on_workers
@ -610,7 +597,7 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', dist
-- show that we are able to propagate objects with multiple item on address arrays -- show that we are able to propagate objects with multiple item on address arrays
SELECT * FROM (SELECT unnest(master_metadata_snapshot()) as metadata_command order by 1) as innerResult WHERE metadata_command like '%distributed_object_data%'; SELECT * FROM (SELECT unnest(master_metadata_snapshot()) as metadata_command order by 1) as innerResult WHERE metadata_command like '%distributed_object_data%';
metadata_command metadata_command
--------------------------------------------------------------------- ---------------------------------------------------------------------
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('type', ARRAY['public.usage_access_type']::text[], ARRAY[]::text[], -1, 0), ('type', ARRAY['function_tests.dup_result']::text[], ARRAY[]::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func']::text[], ARRAY['public.usage_access_type', 'integer[]']::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func_third']::text[], ARRAY['integer', 'integer[]']::text[], 0, 50), ('function', ARRAY['function_tests', 'notice']::text[], ARRAY['pg_catalog.text']::text[], -1, 0), ('function', ARRAY['function_tests', 'dup']::text[], ARRAY['pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_with_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_mi''xed_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_sfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_invfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_finalfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'my_rank']::text[], ARRAY['pg_catalog."any"']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_sfunc']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_finalfunc']::text[], ARRAY['function_tests.dup_result']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'agg_names']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('server', ARRAY['fake_fdw_server']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['schema_colocation']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests2']::text[], ARRAY[]::text[], -1, 0), ('extension', ARRAY['plpgsql']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('type', ARRAY['public.usage_access_type']::text[], ARRAY[]::text[], -1, 0), ('type', ARRAY['function_tests.dup_result']::text[], ARRAY[]::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func']::text[], ARRAY['public.usage_access_type', 'integer[]']::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func_third']::text[], ARRAY['integer', 'integer[]']::text[], 0, 50), ('function', ARRAY['function_tests', 'notice']::text[], ARRAY['pg_catalog.text']::text[], -1, 0), ('function', ARRAY['function_tests', 'dup']::text[], ARRAY['pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_with_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_mi''xed_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_sfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_invfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_finalfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'my_rank']::text[], ARRAY['pg_catalog."any"']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_sfunc']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_finalfunc']::text[], ARRAY['function_tests.dup_result']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'agg_names']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('server', ARRAY['fake_fdw_server']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['schema_colocation']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests2']::text[], ARRAY[]::text[], -1, 0), ('extension', ARRAY['plpgsql']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
(1 row) (1 row)
@ -635,12 +622,6 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1'
ERROR: cannot colocate function "eq_with_param_names" and table "replicated_table_func_test" ERROR: cannot colocate function "eq_with_param_names" and table "replicated_table_func_test"
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model. DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
HINT: When distributing tables make sure that citus.shard_replication_factor = 1 HINT: When distributing tables make sure that citus.shard_replication_factor = 1
SELECT public.wait_until_metadata_sync(30000);
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
-- a function can be colocated with a different distribution argument type -- a function can be colocated with a different distribution argument type
-- as long as there is a coercion path -- as long as there is a coercion path
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
@ -767,13 +748,6 @@ SET citus.shard_count TO 55;
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1'); SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1');
ERROR: cannot distribute the function "eq_with_param_names" since there is no table to colocate with ERROR: cannot distribute the function "eq_with_param_names" since there is no table to colocate with
HINT: Provide a distributed table via "colocate_with" option to create_distributed_function() HINT: Provide a distributed table via "colocate_with" option to create_distributed_function()
-- sync metadata to workers for consistent results when clearing objects
SELECT public.wait_until_metadata_sync(30000);
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
CREATE TABLE test (id int, name text); CREATE TABLE test (id int, name text);
@ -1123,3 +1097,11 @@ SELECT run_command_on_workers($$DROP USER functionuser$$);
(localhost,57638,t,"DROP ROLE") (localhost,57638,t,"DROP ROLE")
(2 rows) (2 rows)
-- sync metadata again
SELECT start_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
start_metadata_sync_to_node
---------------------------------------------------------------------
(2 rows)

View File

@ -20,19 +20,6 @@ BEGIN
RAISE INFO 'information message %', $1; RAISE INFO 'information message %', $1;
END; END;
$proc$; $proc$;
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. -- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id text); CREATE TABLE colocation_table(id text);
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
@ -48,12 +35,6 @@ SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'c
(1 row) (1 row)
SELECT wait_until_metadata_sync(30000);
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result nodename | nodeport | success | result
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -215,15 +196,3 @@ SELECT run_command_on_workers($$DROP USER procedureuser;$$);
(localhost,57638,t,"DROP ROLE") (localhost,57638,t,"DROP ROLE")
(2 rows) (2 rows)
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)

View File

@ -25,19 +25,6 @@ SELECT create_distributed_function('count_values(int)', '$1', colocate_with:='t1
(1 row) (1 row)
-- make sure that the metadata synced before running the queries
SELECT wait_until_metadata_sync(5000);
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
SELECT bool_and(metadatasynced) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
bool_and
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
SELECT count_values(11); SELECT count_values(11);
DEBUG: pushing down the function call DEBUG: pushing down the function call

View File

@ -326,8 +326,12 @@ test: ssl_by_default
test: distributed_types distributed_types_conflict disable_object_propagation distributed_types_xact_add_enum_value test: distributed_types distributed_types_conflict disable_object_propagation distributed_types_xact_add_enum_value
test: check_mx test: check_mx
test: distributed_functions distributed_functions_conflict test: distributed_functions distributed_functions_conflict
test: distributed_collations distributed_collations_conflict test: distributed_collations
test: distributed_procedure test: distributed_procedure
# blocked on #5583
test: turn_mx_off
test: distributed_collations_conflict
test: turn_mx_on test: turn_mx_on
# --------- # ---------

View File

@ -205,8 +205,6 @@ END;
SELECT create_distributed_function('dup(macaddr)', '$1', colocate_with := 'streaming_table'); SELECT create_distributed_function('dup(macaddr)', '$1', colocate_with := 'streaming_table');
SELECT * FROM run_command_on_workers($$SELECT function_tests.dup('0123456789ab');$$) ORDER BY 1,2; SELECT * FROM run_command_on_workers($$SELECT function_tests.dup('0123456789ab');$$) ORDER BY 1,2;
SELECT public.wait_until_metadata_sync(30000);
SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table'); SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table');
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','0123456789ab');$$) ORDER BY 1,2; SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','0123456789ab');$$) ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
@ -352,9 +350,6 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_w
-- valid distribution with distribution_arg_name -- valid distribution with distribution_arg_name
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='val1'); SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='val1');
-- make sure that the primary nodes are now metadata synced
select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
-- make sure that both of the nodes have the function because we've succeeded -- make sure that both of the nodes have the function because we've succeeded
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$); SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$);
@ -373,8 +368,6 @@ CREATE TABLE replicated_table_func_test (a macaddr);
SELECT create_distributed_table('replicated_table_func_test', 'a'); SELECT create_distributed_table('replicated_table_func_test', 'a');
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', colocate_with:='replicated_table_func_test'); SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', colocate_with:='replicated_table_func_test');
SELECT public.wait_until_metadata_sync(30000);
-- a function can be colocated with a different distribution argument type -- a function can be colocated with a different distribution argument type
-- as long as there is a coercion path -- as long as there is a coercion path
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
@ -441,10 +434,6 @@ SELECT create_distributed_function('add_polygons(polygon,polygon)', '$1', coloca
SET citus.shard_count TO 55; SET citus.shard_count TO 55;
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1'); SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1');
-- sync metadata to workers for consistent results when clearing objects
SELECT public.wait_until_metadata_sync(30000);
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
CREATE TABLE test (id int, name text); CREATE TABLE test (id int, name text);
@ -705,3 +694,6 @@ DROP SCHEMA function_tests2 CASCADE;
DROP USER functionuser; DROP USER functionuser;
SELECT run_command_on_workers($$DROP USER functionuser$$); SELECT run_command_on_workers($$DROP USER functionuser$$);
-- sync metadata again
SELECT start_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';

View File

@ -17,23 +17,12 @@ BEGIN
END; END;
$proc$; $proc$;
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
SELECT pg_reload_conf();
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. -- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id text); CREATE TABLE colocation_table(id text);
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('colocation_table','id'); SELECT create_distributed_table('colocation_table','id');
SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table'); SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table');
SELECT wait_until_metadata_sync(30000);
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');
@ -94,5 +83,3 @@ SELECT run_command_on_workers($$DROP SCHEMA procedure_tests2 CASCADE;$$);
DROP USER procedureuser; DROP USER procedureuser;
SELECT run_command_on_workers($$DROP USER procedureuser;$$); SELECT run_command_on_workers($$DROP USER procedureuser;$$);
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);

View File

@ -18,9 +18,6 @@ $$
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
SELECT create_distributed_function('count_values(int)', '$1', colocate_with:='t1'); SELECT create_distributed_function('count_values(int)', '$1', colocate_with:='t1');
-- make sure that the metadata synced before running the queries
SELECT wait_until_metadata_sync(5000);
SELECT bool_and(metadatasynced) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
SELECT count_values(11); SELECT count_values(11);