diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index f7bc29c34..e3b93b809 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -67,6 +67,7 @@ (strncmp(arg, prefix, strlen(prefix)) == 0) /* forward declaration for helper functions*/ +static void ErrorIfAnyNodeDoesNotHaveMetadata(void); static char * GetAggregateDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace); static char * GetFunctionAlterOwnerCommand(const RegProcedure funcOid); static int GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName, @@ -77,7 +78,6 @@ static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnType, Oid sourceRelationId); static void EnsureSequentialModeForFunctionDDL(void); -static void TriggerSyncMetadataToPrimaryNodes(void); static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static bool ShouldAddFunctionSignature(FunctionParameterMode mode); @@ -194,6 +194,13 @@ create_distributed_function(PG_FUNCTION_ARGS) if (distributionArgumentName != NULL) { + /* + * Prior to Citus 11, this code was triggering metadata + * syncing. However, with Citus 11+, we expect the metadata + * has already been synced. + */ + ErrorIfAnyNodeDoesNotHaveMetadata(); + DistributeFunctionWithDistributionArgument(funcOid, distributionArgumentName, distributionArgumentOid, colocateWithTableName, @@ -206,6 +213,13 @@ create_distributed_function(PG_FUNCTION_ARGS) } else if (colocatedWithReferenceTable) { + /* + * Prior to Citus 11, this code was triggering metadata + * syncing. However, with Citus 11+, we expect the metadata + * has already been synced. + */ + ErrorIfAnyNodeDoesNotHaveMetadata(); + DistributeFunctionColocatedWithReferenceTable(&functionAddress); } @@ -213,6 +227,34 @@ create_distributed_function(PG_FUNCTION_ARGS) } +/* + * ErrorIfAnyNodeDoesNotHaveMetadata throws error if any + * of the worker nodes does not have the metadata. + */ +static void +ErrorIfAnyNodeDoesNotHaveMetadata(void) +{ + List *workerNodeList = + ActivePrimaryNonCoordinatorNodeList(ShareLock); + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + if (!workerNode->hasMetadata) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot process the distributed function " + "since the node %s:%d does not have metadata " + "synced and this command requires all the nodes " + "have the metadata sycned", workerNode->workerName, + workerNode->workerPort), + errhint("To sync the metadata execute: " + "SELECT enable_citus_mx_for_pre_citus11();"))); + } + } +} + + /* * DistributeFunctionWithDistributionArgument updates pg_dist_object records for * a function/procedure that has a distribution argument, and triggers metadata @@ -238,13 +280,6 @@ DistributeFunctionWithDistributionArgument(RegProcedure funcOid, /* record the distribution argument and colocationId */ UpdateFunctionDistributionInfo(functionAddress, &distributionArgumentIndex, &colocationId); - - /* - * Once we have at least one distributed function/procedure with distribution - * argument, we sync the metadata to nodes so that the function/procedure - * delegation can be handled locally on the nodes. - */ - TriggerSyncMetadataToPrimaryNodes(); } @@ -293,13 +328,6 @@ DistributeFunctionColocatedWithReferenceTable(const ObjectAddress *functionAddre int *distributionArgumentIndex = NULL; UpdateFunctionDistributionInfo(functionAddress, distributionArgumentIndex, &colocationId); - - /* - * Once we have at least one distributed function/procedure that reads - * from a reference table, we sync the metadata to nodes so that the - * function/procedure delegation can be handled locally on the nodes. - */ - TriggerSyncMetadataToPrimaryNodes(); } @@ -1119,47 +1147,6 @@ EnsureSequentialModeForFunctionDDL(void) } -/* - * TriggerSyncMetadataToPrimaryNodes iterates over the active primary nodes, - * and triggers the metadata syncs if the node has not the metadata. Later, - * maintenance daemon will sync the metadata to nodes. - */ -void -TriggerSyncMetadataToPrimaryNodes(void) -{ - List *workerList = ActivePrimaryNonCoordinatorNodeList(ShareLock); - bool triggerMetadataSync = false; - - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerList) - { - /* if already has metadata, no need to do it again */ - if (!workerNode->hasMetadata) - { - /* - * Let the maintanince deamon do the hard work of syncing the metadata. We prefer - * this because otherwise node activation might fail withing transaction blocks. - */ - LockRelationOid(DistNodeRelationId(), ExclusiveLock); - SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata, - BoolGetDatum(true)); - - triggerMetadataSync = true; - } - else if (!workerNode->metadataSynced) - { - triggerMetadataSync = true; - } - } - - /* let the maintanince deamon know about the metadata sync */ - if (triggerMetadataSync) - { - TriggerMetadataSyncOnCommit(); - } -} - - /* * ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION * statement. We only propagate replace's of distributed functions to keep the function on diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index 6a9adf5b8..92b72d64d 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -316,53 +316,6 @@ IsObjectDistributed(const ObjectAddress *address) } -/* - * ClusterHasDistributedFunctionWithDistArgument returns true if there - * is at least one distributed function in the cluster with distribution - * argument index set. - */ -bool -ClusterHasDistributedFunctionWithDistArgument(void) -{ - bool foundDistributedFunction = false; - - HeapTuple pgDistObjectTup = NULL; - - Relation pgDistObjectRel = table_open(DistObjectRelationId(), AccessShareLock); - - TupleDesc tupleDescriptor = RelationGetDescr(pgDistObjectRel); - - SysScanDesc pgDistObjectScan = - systable_beginscan(pgDistObjectRel, InvalidOid, false, NULL, 0, NULL); - while (HeapTupleIsValid(pgDistObjectTup = systable_getnext(pgDistObjectScan))) - { - Form_pg_dist_object pg_dist_object = - (Form_pg_dist_object) GETSTRUCT(pgDistObjectTup); - - if (pg_dist_object->classid == ProcedureRelationId) - { - bool distArgumentIsNull = - heap_attisnull(pgDistObjectTup, - Anum_pg_dist_object_distribution_argument_index, - tupleDescriptor); - - /* we found one distributed function that has an distribution argument */ - if (!distArgumentIsNull) - { - foundDistributedFunction = true; - - break; - } - } - } - - systable_endscan(pgDistObjectScan); - relation_close(pgDistObjectRel, AccessShareLock); - - return foundDistributedFunction; -} - - /* * GetDistributedObjectAddressList returns a list of ObjectAddresses that contains all * distributed objects as marked in pg_dist_object diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 6c8c94137..b56f094b0 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -614,18 +614,6 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode) ReplicateAllReferenceTablesToNode(newWorkerNode->workerName, newWorkerNode->workerPort); } - - /* - * Let the maintenance daemon do the hard work of syncing the metadata. - * We prefer this because otherwise node activation might fail within - * transaction blocks. - */ - if (ClusterHasDistributedFunctionWithDistArgument()) - { - SetWorkerColumnLocalOnly(newWorkerNode, Anum_pg_dist_node_hasmetadata, - BoolGetDatum(true)); - TriggerMetadataSyncOnCommit(); - } } } @@ -893,7 +881,7 @@ ActivateNode(char *nodeName, int nodePort) * not fail just because the current metadata is not synced. */ SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, - BoolGetDatum(isActive)); + BoolGetDatum(true)); } SetUpDistributedTableDependencies(workerNode); diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index d448cd774..d4520231c 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -243,12 +243,6 @@ SELECT * FROM run_command_on_workers($$SELECT function_tests.dup('0123456789ab') localhost | 57638 | t | (01:23:45:67:89:ab,"01:23:45:67:89:ab is text") (2 rows) -SELECT public.wait_until_metadata_sync(30000); - wait_until_metadata_sync ---------------------------------------------------------------------- - -(1 row) - SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table'); create_distributed_function --------------------------------------------------------------------- @@ -586,13 +580,6 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', dist (1 row) --- make sure that the primary nodes are now metadata synced -select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; - bool_and ---------------------------------------------------------------------- - t -(1 row) - -- make sure that both of the nodes have the function because we've succeeded SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$); run_command_on_workers @@ -610,7 +597,7 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', dist -- show that we are able to propagate objects with multiple item on address arrays SELECT * FROM (SELECT unnest(master_metadata_snapshot()) as metadata_command order by 1) as innerResult WHERE metadata_command like '%distributed_object_data%'; - metadata_command + metadata_command --------------------------------------------------------------------- WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('type', ARRAY['public.usage_access_type']::text[], ARRAY[]::text[], -1, 0), ('type', ARRAY['function_tests.dup_result']::text[], ARRAY[]::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func']::text[], ARRAY['public.usage_access_type', 'integer[]']::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func_third']::text[], ARRAY['integer', 'integer[]']::text[], 0, 50), ('function', ARRAY['function_tests', 'notice']::text[], ARRAY['pg_catalog.text']::text[], -1, 0), ('function', ARRAY['function_tests', 'dup']::text[], ARRAY['pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_with_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_mi''xed_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_sfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_invfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_finalfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'my_rank']::text[], ARRAY['pg_catalog."any"']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_sfunc']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_finalfunc']::text[], ARRAY['function_tests.dup_result']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'agg_names']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('server', ARRAY['fake_fdw_server']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['schema_colocation']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests2']::text[], ARRAY[]::text[], -1, 0), ('extension', ARRAY['plpgsql']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data; (1 row) @@ -635,12 +622,6 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1' ERROR: cannot colocate function "eq_with_param_names" and table "replicated_table_func_test" DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model. HINT: When distributing tables make sure that citus.shard_replication_factor = 1 -SELECT public.wait_until_metadata_sync(30000); - wait_until_metadata_sync ---------------------------------------------------------------------- - -(1 row) - -- a function can be colocated with a different distribution argument type -- as long as there is a coercion path SET citus.shard_replication_factor TO 1; @@ -767,13 +748,6 @@ SET citus.shard_count TO 55; SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1'); ERROR: cannot distribute the function "eq_with_param_names" since there is no table to colocate with HINT: Provide a distributed table via "colocate_with" option to create_distributed_function() --- sync metadata to workers for consistent results when clearing objects -SELECT public.wait_until_metadata_sync(30000); - wait_until_metadata_sync ---------------------------------------------------------------------- - -(1 row) - SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 4; CREATE TABLE test (id int, name text); @@ -1123,3 +1097,11 @@ SELECT run_command_on_workers($$DROP USER functionuser$$); (localhost,57638,t,"DROP ROLE") (2 rows) +-- sync metadata again +SELECT start_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; + start_metadata_sync_to_node +--------------------------------------------------------------------- + + +(2 rows) + diff --git a/src/test/regress/expected/distributed_procedure.out b/src/test/regress/expected/distributed_procedure.out index d339840c8..4345aab61 100644 --- a/src/test/regress/expected/distributed_procedure.out +++ b/src/test/regress/expected/distributed_procedure.out @@ -20,19 +20,6 @@ BEGIN RAISE INFO 'information message %', $1; END; $proc$; --- set sync intervals to less than 15s so wait_until_metadata_sync never times out -ALTER SYSTEM SET citus.metadata_sync_interval TO 3000; -ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500; -SELECT pg_reload_conf(); - pg_reload_conf ---------------------------------------------------------------------- - t -(1 row) - -CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) - RETURNS void - LANGUAGE C STRICT - AS 'citus'; -- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. CREATE TABLE colocation_table(id text); SET citus.shard_replication_factor TO 1; @@ -48,12 +35,6 @@ SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'c (1 row) -SELECT wait_until_metadata_sync(30000); - wait_until_metadata_sync ---------------------------------------------------------------------- - -(1 row) - SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; nodename | nodeport | success | result --------------------------------------------------------------------- @@ -215,15 +196,3 @@ SELECT run_command_on_workers($$DROP USER procedureuser;$$); (localhost,57638,t,"DROP ROLE") (2 rows) -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); - stop_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - -SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); - stop_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - diff --git a/src/test/regress/expected/upgrade_distributed_function_before.out b/src/test/regress/expected/upgrade_distributed_function_before.out index 02b34e764..7cc40f2dd 100644 --- a/src/test/regress/expected/upgrade_distributed_function_before.out +++ b/src/test/regress/expected/upgrade_distributed_function_before.out @@ -25,19 +25,6 @@ SELECT create_distributed_function('count_values(int)', '$1', colocate_with:='t1 (1 row) --- make sure that the metadata synced before running the queries -SELECT wait_until_metadata_sync(5000); - wait_until_metadata_sync ---------------------------------------------------------------------- - -(1 row) - -SELECT bool_and(metadatasynced) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; - bool_and ---------------------------------------------------------------------- - t -(1 row) - SET client_min_messages TO DEBUG1; SELECT count_values(11); DEBUG: pushing down the function call diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 0044ae00e..adebed4a2 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -326,8 +326,12 @@ test: ssl_by_default test: distributed_types distributed_types_conflict disable_object_propagation distributed_types_xact_add_enum_value test: check_mx test: distributed_functions distributed_functions_conflict -test: distributed_collations distributed_collations_conflict +test: distributed_collations test: distributed_procedure + +# blocked on #5583 +test: turn_mx_off +test: distributed_collations_conflict test: turn_mx_on # --------- diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index c33ae5893..349e8fe44 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -205,8 +205,6 @@ END; SELECT create_distributed_function('dup(macaddr)', '$1', colocate_with := 'streaming_table'); SELECT * FROM run_command_on_workers($$SELECT function_tests.dup('0123456789ab');$$) ORDER BY 1,2; -SELECT public.wait_until_metadata_sync(30000); - SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table'); SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','0123456789ab');$$) ORDER BY 1,2; SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)'); @@ -352,9 +350,6 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_w -- valid distribution with distribution_arg_name SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='val1'); --- make sure that the primary nodes are now metadata synced -select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; - -- make sure that both of the nodes have the function because we've succeeded SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_with_param_names';$$); @@ -373,8 +368,6 @@ CREATE TABLE replicated_table_func_test (a macaddr); SELECT create_distributed_table('replicated_table_func_test', 'a'); SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', '$1', colocate_with:='replicated_table_func_test'); -SELECT public.wait_until_metadata_sync(30000); - -- a function can be colocated with a different distribution argument type -- as long as there is a coercion path SET citus.shard_replication_factor TO 1; @@ -441,10 +434,6 @@ SELECT create_distributed_function('add_polygons(polygon,polygon)', '$1', coloca SET citus.shard_count TO 55; SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1'); --- sync metadata to workers for consistent results when clearing objects -SELECT public.wait_until_metadata_sync(30000); - - SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 4; CREATE TABLE test (id int, name text); @@ -705,3 +694,6 @@ DROP SCHEMA function_tests2 CASCADE; DROP USER functionuser; SELECT run_command_on_workers($$DROP USER functionuser$$); + +-- sync metadata again +SELECT start_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; diff --git a/src/test/regress/sql/distributed_procedure.sql b/src/test/regress/sql/distributed_procedure.sql index 0e3d2d7d0..3f923d9f3 100644 --- a/src/test/regress/sql/distributed_procedure.sql +++ b/src/test/regress/sql/distributed_procedure.sql @@ -17,23 +17,12 @@ BEGIN END; $proc$; --- set sync intervals to less than 15s so wait_until_metadata_sync never times out -ALTER SYSTEM SET citus.metadata_sync_interval TO 3000; -ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500; -SELECT pg_reload_conf(); - -CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) - RETURNS void - LANGUAGE C STRICT - AS 'citus'; - -- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. CREATE TABLE colocation_table(id text); SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('colocation_table','id'); SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table'); -SELECT wait_until_metadata_sync(30000); SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); @@ -94,5 +83,3 @@ SELECT run_command_on_workers($$DROP SCHEMA procedure_tests2 CASCADE;$$); DROP USER procedureuser; SELECT run_command_on_workers($$DROP USER procedureuser;$$); -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); diff --git a/src/test/regress/sql/upgrade_distributed_function_before.sql b/src/test/regress/sql/upgrade_distributed_function_before.sql index f3682dc9a..6459e0212 100644 --- a/src/test/regress/sql/upgrade_distributed_function_before.sql +++ b/src/test/regress/sql/upgrade_distributed_function_before.sql @@ -18,9 +18,6 @@ $$ $$ LANGUAGE plpgsql; SELECT create_distributed_function('count_values(int)', '$1', colocate_with:='t1'); --- make sure that the metadata synced before running the queries -SELECT wait_until_metadata_sync(5000); -SELECT bool_and(metadatasynced) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; SET client_min_messages TO DEBUG1; SELECT count_values(11);