diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 41226d041..40558fe8e 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -202,7 +202,9 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency) /* - * ReplicateAllDependenciesToNode replicate all previously marked objects to a worker node + * ReplicateAllDependenciesToNode replicate all previously marked objects to a worker + * node. The function also sets clusterHasDistributedFunction if there are any + * distributed functions. */ void ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 26dc5528e..7621838a1 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -16,6 +16,7 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "funcapi.h" #include "access/htup_details.h" @@ -26,12 +27,14 @@ #include "catalog/pg_type.h" #include "distributed/colocation_utils.h" #include "distributed/master_protocol.h" +#include "distributed/maintenanced.h" #include "distributed/metadata_sync.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata/pg_dist_object.h" #include "distributed/multi_executor.h" #include "distributed/relation_access_tracking.h" #include "distributed/worker_transaction.h" +#include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgrprotos.h" #include "utils/fmgroids.h" @@ -54,9 +57,12 @@ static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, int *distribution_argument_index, int *colocationId); static void EnsureSequentialModeForFunctionDDL(void); +static void TriggerSyncMetadataToPrimaryNodes(void); + PG_FUNCTION_INFO_V1(create_distributed_function); + /* * create_distributed_function gets a function or procedure name with their list of * argument types in parantheses, then it creates a new distributed function. @@ -165,6 +171,13 @@ create_distributed_function(PG_FUNCTION_ARGS) /* if provided, make sure to record the distribution argument and colocationId */ UpdateFunctionDistributionInfo(&functionAddress, &distributionArgumentIndex, &colocationId); + + /* + * Once we have at least one distributed function/procedure with distribution + * argument, we sync the metadata to nodes so that the function/procedure + * delegation can be handled locally on the nodes. + */ + TriggerSyncMetadataToPrimaryNodes(); } PG_RETURN_VOID(); @@ -561,3 +574,41 @@ EnsureSequentialModeForFunctionDDL(void) "use only one connection for all future commands"))); SetLocalMultiShardModifyModeToSequential(); } + + +/* + * TriggerSyncMetadataToPrimaryNodes iterates over the active primary nodes, + * and triggers the metadata syncs if the node has not the metadata. Later, + * maintenance daemon will sync the metadata to nodes. + */ +static void +TriggerSyncMetadataToPrimaryNodes(void) +{ + List *workerList = ActivePrimaryNodeList(ShareLock); + ListCell *workerCell = NULL; + bool triggerMetadataSync = false; + + foreach(workerCell, workerList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerCell); + + /* if already has metadata, no need to do it again */ + if (!workerNode->hasMetadata) + { + /* + * Let the maintanince deamon do the hard work of syncing the metadata. We prefer + * this because otherwise node activation might fail withing transaction blocks. + */ + LockRelationOid(DistNodeRelationId(), ExclusiveLock); + MarkNodeHasMetadata(workerNode->workerName, workerNode->workerPort, true); + + triggerMetadataSync = true; + } + } + + /* let the maintanince deamon know about the metadata sync */ + if (triggerMetadataSync) + { + TriggerMetadataSync(MyDatabaseId); + } +} diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index b9d23f950..ae0423300 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -20,6 +20,7 @@ #include "catalog/namespace.h" #include "catalog/objectaddress.h" #include "catalog/pg_namespace.h" +#include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata/pg_dist_object.h" @@ -260,6 +261,63 @@ IsObjectDistributed(const ObjectAddress *address) } +/* + * ClusterHasDistributedFunctionWithDistArgument returns true if there + * is at least one distributed function in the cluster with distribution + * argument index set. + */ +bool +ClusterHasDistributedFunctionWithDistArgument(void) +{ + bool foundDistributedFunction = false; + + SysScanDesc pgDistObjectScan = NULL; + HeapTuple pgDistObjectTup = NULL; + + Relation pgDistObjectRel = heap_open(DistObjectRelationId(), AccessShareLock); + +#if (PG_VERSION_NUM >= 110000) + TupleDesc tupleDescriptor = RelationGetDescr(pgDistObjectRel); +#endif + + pgDistObjectScan = + systable_beginscan(pgDistObjectRel, InvalidOid, false, NULL, 0, NULL); + while (HeapTupleIsValid(pgDistObjectTup = systable_getnext(pgDistObjectScan))) + { + Form_pg_dist_object pg_dist_object = + (Form_pg_dist_object) GETSTRUCT(pgDistObjectTup); + + if (pg_dist_object->classid == ProcedureRelationId) + { + bool distArgumentIsNull = false; +#if (PG_VERSION_NUM >= 110000) + distArgumentIsNull = + heap_attisnull(pgDistObjectTup, + Anum_pg_dist_object_distribution_argument_index, + tupleDescriptor); +#else + distArgumentIsNull = + heap_attisnull(pgDistObjectTup, + Anum_pg_dist_object_distribution_argument_index); +#endif + + /* we found one distributed function that has an distribution argument */ + if (!distArgumentIsNull) + { + foundDistributedFunction = true; + + break; + } + } + } + + systable_endscan(pgDistObjectScan); + relation_close(pgDistObjectRel, AccessShareLock); + + return foundDistributedFunction; +} + + /* * GetDistributedObjectAddressList returns a list of ObjectAddresses that contains all * distributed objects as marked in pg_dist_object diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 6d3ab9ab4..1c0920841 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -53,7 +53,6 @@ static char * LocalGroupIdUpdateCommand(int32 groupId); -static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata); static void UpdateDistNodeBoolAttr(char *nodeName, int32 nodePort, int attrNum, bool value); static List * SequenceDDLCommandsForTable(Oid relationId); @@ -76,10 +75,24 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) { text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); - char *nodeNameString = text_to_cstring(nodeName); - char *escapedNodeName = quote_literal_cstr(nodeNameString); + char *nodeNameString = text_to_cstring(nodeName); + + StartMetadatSyncToNode(nodeNameString, nodePort); + + PG_RETURN_VOID(); +} + + +/* + * StartMetadatSyncToNode is the internal API for + * start_metadata_sync_to_node(). + */ +void +StartMetadatSyncToNode(char *nodeNameString, int32 nodePort) +{ WorkerNode *workerNode = NULL; + char *escapedNodeName = quote_literal_cstr(nodeNameString); /* fail if metadata synchronization doesn't succeed */ bool raiseInterrupts = true; @@ -119,13 +132,11 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) * If this is a secondary node we can't actually sync metadata to it; we assume * the primary node is receiving metadata. */ - PG_RETURN_VOID(); + return; } SyncMetadataSnapshotToNode(workerNode, raiseInterrupts); MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true); - - PG_RETURN_VOID(); } @@ -945,7 +956,7 @@ LocalGroupIdUpdateCommand(int32 groupId) * MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in * pg_dist_node to hasMetadata. */ -static void +void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata) { UpdateDistNodeBoolAttr(nodeName, nodePort, diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index f14735c0d..123a5492b 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -80,8 +80,36 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS) uint32 timeout = PG_GETARG_UINT32(0); int waitResult = 0; - MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION, - "localhost", PostPortNumber); + List *workerList = ActivePrimaryNodeList(NoLock); + ListCell *workerCell = NULL; + bool waitNotifications = false; + MultiConnection *connection = NULL; + + foreach(workerCell, workerList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerCell); + + /* if already has metadata, no need to do it again */ + if (workerNode->hasMetadata && !workerNode->metadataSynced) + { + waitNotifications = true; + break; + } + } + + /* + * If all the metadata nodes have already been synced, we should not wait. + * That's primarily because the maintenance deamon might have already sent + * the notification and we'd wait unnecessarily here. Worse, the test outputs + * might be inconsistent across executions due to the warning. + */ + if (!waitNotifications) + { + PG_RETURN_VOID(); + } + + connection = GetNodeConnection(FORCE_NEW_CONNECTION, + "localhost", PostPortNumber); ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL); waitResult = WaitLatchOrSocket(NULL, WL_SOCKET_READABLE | WL_TIMEOUT, diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 7c202d550..325fef9df 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -26,6 +26,7 @@ #include "distributed/maintenanced.h" #include "distributed/master_protocol.h" #include "distributed/master_metadata_utility.h" +#include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" @@ -475,6 +476,16 @@ ActivateNode(char *nodeName, int nodePort) EnsureNoModificationsHaveBeenDone(); ReplicateAllDependenciesToNode(nodeName, nodePort); ReplicateAllReferenceTablesToNode(nodeName, nodePort); + + /* + * Let the maintanince deamon do the hard work of syncing the metadata. We prefer + * this because otherwise node activation might fail withing transaction blocks. + */ + if (ClusterHasDistributedFunctionWithDistArgument()) + { + MarkNodeHasMetadata(nodeName, nodePort, true); + TriggerMetadataSync(MyDatabaseId); + } } return workerNode->nodeId; @@ -1073,6 +1084,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, { List *workerNodeList = list_make1(workerNode); char *nodeInsertCommand = NodeListInsertCommand(workerNodeList); + SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand); } diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index d2892a238..c609ca795 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -18,6 +18,7 @@ extern bool ObjectExists(const ObjectAddress *address); extern bool IsObjectDistributed(const ObjectAddress *address); +extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); diff --git a/src/include/distributed/metadata/pg_dist_object.h b/src/include/distributed/metadata/pg_dist_object.h index 2eea47a0b..442bb408b 100644 --- a/src/include/distributed/metadata/pg_dist_object.h +++ b/src/include/distributed/metadata/pg_dist_object.h @@ -28,13 +28,13 @@ typedef struct FormData_pg_dist_object Oid objid; /* object id of the distributed object */ int32 objsubid; /* object sub id of the distributed object, eg. attnum */ - uint32 distribution_argument_index; /* only valid for distributed functions/procedures */ - uint32 colocationid; /* only valid for distributed functions/procedures */ - #ifdef CATALOG_VARLEN /* variable-length fields start here */ text type; text[] object_names; text[] object_arguments; + + uint32 distribution_argument_index; /* only valid for distributed functions/procedures */ + uint32 colocationid; /* only valid for distributed functions/procedures */ #endif } FormData_pg_dist_object; diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 05cc00b7b..aaf79d8b1 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -21,6 +21,7 @@ extern int MetadataSyncInterval; extern int MetadataSyncRetryInterval; /* Functions declarations for metadata syncing */ +extern void StartMetadatSyncToNode(char *nodeNameString, int32 nodePort); extern bool ClusterHasKnownMetadataWorkers(void); extern bool ShouldSyncTableMetadata(Oid relationId); extern List * MetadataCreateCommands(void); @@ -40,6 +41,7 @@ extern char * CreateSchemaDDLCommand(Oid schemaId); extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, uint64 shardLength, int32 groupId); extern void CreateTableMetadataOnWorkers(Oid relationId); +extern void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata); extern void MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced); extern bool SyncMetadataToNodes(void); extern bool SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort, diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 4c2c08125..63a4cbc3e 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -18,19 +18,6 @@ CREATE FUNCTION add(integer, integer) RETURNS integer LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -SELECT create_distributed_function('add(int,int)', '$1'); - create_distributed_function ------------------------------ - -(1 row) - -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; - nodename | nodeport | success | result ------------+----------+---------+-------- - localhost | 57637 | t | 5 - localhost | 57638 | t | 5 -(2 rows) - -- Test some combination of functions without ddl propagation -- This will prevent the workers from having those types created. They are -- created just-in-time on function distribution @@ -39,19 +26,6 @@ CREATE TYPE dup_result AS (f1 int, f2 text); CREATE FUNCTION dup(int) RETURNS dup_result AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; -SELECT create_distributed_function('dup(int)', '$1'); - create_distributed_function ------------------------------ - -(1 row) - -SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; - nodename | nodeport | success | result ------------+----------+---------+------------------- - localhost | 57637 | t | (42,"42 is text") - localhost | 57638 | t | (42,"42 is text") -(2 rows) - CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer AS 'select $1 + $2;' LANGUAGE SQL @@ -67,6 +41,71 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; +-- make sure that none of the active and primary nodes hasmetadata +-- at the start of the test +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + bool_or +--------- + f +(1 row) + +-- if not paremeters are supplied, we'd see that function doesn't have +-- distribution_argument_index and colocationid +SELECT create_distributed_function('add_mixed_param_names(int, int)'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object +WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure; + ?column? | ?column? +----------+---------- + t | t +(1 row) + +-- also show that we can use the function +SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 5 + localhost | 57638 | t | 5 +(2 rows) + +-- make sure that none of the active and primary nodes hasmetadata +-- since the function doesn't have a parameter +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + bool_or +--------- + f +(1 row) + +SELECT create_distributed_function('dup(int)', '$1'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+------------------- + localhost | 57637 | t | (42,"42 is text") + localhost | 57638 | t | (42,"42 is text") +(2 rows) + +SELECT create_distributed_function('add(int,int)', '$1'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 5 + localhost | 57638 | t | 5 +(2 rows) + -- postgres doesn't accept parameter names in the regprocedure input SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); ERROR: syntax error at or near "int" @@ -111,6 +150,31 @@ HINT: To use the default value, set colocate_with option to "default" SELECT create_distributed_function('add_with_param_names(int, int)', ''); ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() +-- The first distributed function syncs the metadata to nodes +-- and metadata syncing is not supported within transaction blocks +BEGIN; + SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); + create_distributed_function +----------------------------- + +(1 row) + +ROLLBACK; +-- make sure that none of the nodes have the function because we've rollbacked +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); + run_command_on_workers +------------------------ + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + +-- make sure that none of the active and primary nodes hasmetadata +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + bool_or +--------- + t +(1 row) + -- valid distribution with distribution_arg_name SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); create_distributed_function @@ -118,6 +182,21 @@ SELECT create_distributed_function('add_with_param_names(int, int)', distributio (1 row) +-- make sure that the primary nodes are now metadata synced +select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + bool_and +---------- + t +(1 row) + +-- make sure that both of the nodes have the function because we've succeeded +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); + run_command_on_workers +------------------------ + (localhost,57637,t,1) + (localhost,57638,t,1) +(2 rows) + -- valid distribution with distribution_arg_name -- case insensitive SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1'); create_distributed_function @@ -218,39 +297,44 @@ WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass t (1 row) --- if not paremeters are supplied, we'd see that function doesn't have --- distribution_argument_index and colocationid -SELECT create_distributed_function('add_mixed_param_names(int, int)'); - create_distributed_function ------------------------------ +-- clear objects +SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; + stop_metadata_sync_to_node +---------------------------- + -(1 row) - -SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object -WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure; - ?column? | ?column? -----------+---------- - t | t -(1 row) - --- also show that we can use the function -SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2; - nodename | nodeport | success | result ------------+----------+---------+-------- - localhost | 57637 | t | 5 - localhost | 57638 | t | 5 (2 rows) --- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; -SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$); - run_command_on_workers ------------------------------------ - (localhost,57637,t,"DROP SCHEMA") - (localhost,57638,t,"DROP SCHEMA") -(2 rows) +-- This is hacky, but we should clean-up the resources as below +\c - - - :worker_1_port +SET client_min_messages TO error; -- suppress cascading objects dropping +UPDATE pg_dist_local_group SET groupid = 0; +SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; + worker_drop_distributed_table +------------------------------- + + + +(3 rows) +TRUNCATE pg_dist_node; +DROP SCHEMA function_tests CASCADE; +\c - - - :worker_2_port +SET client_min_messages TO error; -- suppress cascading objects dropping +UPDATE pg_dist_local_group SET groupid = 0; +SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; + worker_drop_distributed_table +------------------------------- + + + +(3 rows) + +TRUNCATE pg_dist_node; +DROP SCHEMA function_tests CASCADE; +\c - - - :master_port DROP USER functionuser; SELECT run_command_on_workers($$DROP USER functionuser;$$); run_command_on_workers diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 85a92131d..6976f987d 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -281 280 f +282 281 f transactionnumberwaitingtransactionnumbers -280 -281 280 +281 +282 281 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -285 284 f -286 284 f -286 285 t +286 285 f +287 285 f +287 286 t transactionnumberwaitingtransactionnumbers -284 -285 284 -286 284,285 +285 +286 285 +287 285,286 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges_0.out b/src/test/regress/expected/isolation_dump_global_wait_edges_0.out index 6976f987d..2153c8ec5 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges_0.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges_0.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -282 281 f +283 282 f transactionnumberwaitingtransactionnumbers -281 -282 281 +282 +283 282 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -286 285 f -287 285 f -287 286 t +287 286 f +288 286 f +288 287 t transactionnumberwaitingtransactionnumbers -285 -286 285 -287 285,286 +286 +287 286 +288 286,287 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_ensure_dependency_activate_node.out b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out index 1180a3f69..25b778060 100644 --- a/src/test/regress/expected/isolation_ensure_dependency_activate_node.out +++ b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out @@ -1739,7 +1739,7 @@ master_remove_node -starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-distribute-function s1-commit s2-print-distributed-objects +starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-distribute-function s1-commit s2-begin s2-commit s3-wait-for-metadata-sync s1-print-distributed-objects ?column? 1 @@ -1815,7 +1815,21 @@ step s2-distribute-function: <... completed> create_distributed_function -step s2-print-distributed-objects: +step s2-begin: + BEGIN; + +step s2-commit: + COMMIT; + +step s3-wait-for-metadata-sync: + SELECT public.wait_until_metadata_sync(5000); + +wait_until_metadata_sync + + +step s1-print-distributed-objects: + SELECT 1 FROM master_add_node('localhost', 57638); + -- print an overview of all distributed objects SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1; @@ -1831,6 +1845,11 @@ step s2-print-distributed-objects: SELECT count(*) FROM pg_proc WHERE proname='add'; SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add';$$); + SELECT master_remove_node('localhost', 57638); + +?column? + +1 pg_identify_object_as_address (function,"{public,add}","{integer,integer}") @@ -1858,9 +1877,12 @@ run_command_on_workers master_remove_node +master_remove_node + + -starting permutation: s1-print-distributed-objects s1-begin s2-public-schema s2-distribute-function s1-add-worker s1-commit s2-print-distributed-objects +starting permutation: s1-print-distributed-objects s1-begin s2-public-schema s2-distribute-function s2-begin s2-commit s3-wait-for-metadata-sync s1-add-worker s1-commit s3-wait-for-metadata-sync s2-print-distributed-objects ?column? 1 @@ -1926,6 +1948,18 @@ step s2-distribute-function: create_distributed_function +step s2-begin: + BEGIN; + +step s2-commit: + COMMIT; + +step s3-wait-for-metadata-sync: + SELECT public.wait_until_metadata_sync(5000); + +wait_until_metadata_sync + + step s1-add-worker: SELECT 1 FROM master_add_node('localhost', 57638); @@ -1935,6 +1969,12 @@ step s1-add-worker: step s1-commit: COMMIT; +step s3-wait-for-metadata-sync: + SELECT public.wait_until_metadata_sync(5000); + +wait_until_metadata_sync + + step s2-print-distributed-objects: -- print an overview of all distributed objects SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1; @@ -1980,7 +2020,7 @@ master_remove_node -starting permutation: s1-print-distributed-objects s1-begin s2-begin s2-create-schema s2-distribute-function s1-add-worker s2-commit s1-commit s2-print-distributed-objects +starting permutation: s1-print-distributed-objects s2-begin s2-create-schema s2-distribute-function s2-commit s3-wait-for-metadata-sync s1-begin s1-add-worker s1-commit s3-wait-for-metadata-sync s2-print-distributed-objects ?column? 1 @@ -2033,9 +2073,6 @@ run_command_on_workers master_remove_node -step s1-begin: - BEGIN; - step s2-begin: BEGIN; @@ -2050,19 +2087,33 @@ step s2-distribute-function: create_distributed_function -step s1-add-worker: - SELECT 1 FROM master_add_node('localhost', 57638); - step s2-commit: COMMIT; -step s1-add-worker: <... completed> +step s3-wait-for-metadata-sync: + SELECT public.wait_until_metadata_sync(5000); + +wait_until_metadata_sync + + +step s1-begin: + BEGIN; + +step s1-add-worker: + SELECT 1 FROM master_add_node('localhost', 57638); + ?column? 1 step s1-commit: COMMIT; +step s3-wait-for-metadata-sync: + SELECT public.wait_until_metadata_sync(5000); + +wait_until_metadata_sync + + step s2-print-distributed-objects: -- print an overview of all distributed objects SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1; diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 210852645..3b325431c 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -376,6 +376,8 @@ if($isolationtester) push(@pgOptions, '-c', "citus.log_distributed_deadlock_detection=on"); push(@pgOptions, '-c', "citus.distributed_deadlock_detection_factor=-1"); push(@pgOptions, '-c', "citus.shard_count=4"); + push(@pgOptions, '-c', "citus.metadata_sync_interval=1000"); + push(@pgOptions, '-c', "citus.metadata_sync_retry_interval=100"); } # Add externally added options last, so they overwrite the default ones above diff --git a/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec index 7c6d00eef..d249ed363 100644 --- a/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec +++ b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec @@ -2,13 +2,21 @@ # add single one of the nodes for the purpose of the test setup { + CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER) + RETURNS void + LANGUAGE C STRICT VOLATILE + AS 'citus', $$wait_until_metadata_sync$$; + SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; - SELECT 1 FROM master_add_node('localhost', 57637); + SELECT 1 FROM master_add_node('localhost', 57637); } # ensure that both nodes exists for the remaining of the isolation tests teardown { + SELECT 1 FROM master_add_node('localhost', 57637); + SELECT 1 FROM master_add_node('localhost', 57638); + -- schema drops are not cascaded SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS myschema CASCADE;$$); DROP SCHEMA IF EXISTS myschema CASCADE; @@ -36,7 +44,7 @@ teardown -- similarly drop the function in the workers manually SELECT run_command_on_workers($$DROP FUNCTION IF EXISTS add(INT,INT) CASCADE;$$); - SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; + SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; } session "s1" @@ -150,6 +158,7 @@ step "s2-print-distributed-objects" SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add';$$); } + session "s3" step "s3-public-schema" @@ -180,6 +189,17 @@ step "s3-commit" COMMIT; } + +step "s3-wait-for-metadata-sync" +{ + SELECT public.wait_until_metadata_sync(5000); +} + +step "s3-listen-channel" +{ + LISTEN metadata_sync; +} + session "s4" step "s4-public-schema" @@ -216,6 +236,7 @@ step "s4-commit" COMMIT; } + # schema only tests permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-print-distributed-objects" permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-commit" "s2-print-distributed-objects" @@ -236,6 +257,14 @@ permutation "s1-print-distributed-objects" "s1-begin" "s2-public-schema" "s2-cre permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-schema" "s2-create-type" "s2-create-table-with-type" "s1-add-worker" "s2-commit" "s1-commit" "s2-print-distributed-objects" # distributed function tests -permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-distribute-function" "s1-commit" "s2-print-distributed-objects" -permutation "s1-print-distributed-objects" "s1-begin" "s2-public-schema" "s2-distribute-function" "s1-add-worker" "s1-commit" "s2-print-distributed-objects" -permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-schema" "s2-distribute-function" "s1-add-worker" "s2-commit" "s1-commit" "s2-print-distributed-objects" +# isolation tests are not very simple psql, so trigger NOTIFY reliably for +# s3-wait-for-metadata-sync step, we do "s2-begin" followed directly by +# "s2-commit", because "COMMIT" syncs the messages + +permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-distribute-function" "s1-commit" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s1-print-distributed-objects" +permutation "s1-print-distributed-objects" "s1-begin" "s2-public-schema" "s2-distribute-function" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s1-add-worker" "s1-commit" "s3-wait-for-metadata-sync" "s2-print-distributed-objects" + +# we cannot run the following operations concurrently +# the problem is that NOTIFY event doesn't (reliably) happen before COMMIT +# so we have to commit s2 before s1 starts +permutation "s1-print-distributed-objects" "s2-begin" "s2-create-schema" "s2-distribute-function" "s2-commit" "s3-wait-for-metadata-sync" "s1-begin" "s1-add-worker" "s1-commit" "s3-wait-for-metadata-sync" "s2-print-distributed-objects" diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index 2dc6f7a42..35a23782a 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -8,16 +8,12 @@ CREATE SCHEMA function_tests AUTHORIZATION functionuser; SET search_path TO function_tests; SET citus.shard_count TO 4; - -- Create and distribute a simple function CREATE FUNCTION add(integer, integer) RETURNS integer AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -SELECT create_distributed_function('add(int,int)', '$1'); -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; - -- Test some combination of functions without ddl propagation -- This will prevent the workers from having those types created. They are @@ -30,9 +26,6 @@ CREATE FUNCTION dup(int) RETURNS dup_result AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; -SELECT create_distributed_function('dup(int)', '$1'); -SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; - CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer AS 'select $1 + $2;' LANGUAGE SQL @@ -51,6 +44,29 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer IMMUTABLE RETURNS NULL ON NULL INPUT; +-- make sure that none of the active and primary nodes hasmetadata +-- at the start of the test +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + +-- if not paremeters are supplied, we'd see that function doesn't have +-- distribution_argument_index and colocationid +SELECT create_distributed_function('add_mixed_param_names(int, int)'); +SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object +WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure; + +-- also show that we can use the function +SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2; + +-- make sure that none of the active and primary nodes hasmetadata +-- since the function doesn't have a parameter +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + +SELECT create_distributed_function('dup(int)', '$1'); +SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; + +SELECT create_distributed_function('add(int,int)', '$1'); +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + -- postgres doesn't accept parameter names in the regprocedure input SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); @@ -77,9 +93,27 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL) -- empty string distribution_arg_index SELECT create_distributed_function('add_with_param_names(int, int)', ''); +-- The first distributed function syncs the metadata to nodes +-- and metadata syncing is not supported within transaction blocks +BEGIN; + SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); +ROLLBACK; + +-- make sure that none of the nodes have the function because we've rollbacked +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); + +-- make sure that none of the active and primary nodes hasmetadata +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + -- valid distribution with distribution_arg_name SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); +-- make sure that the primary nodes are now metadata synced +select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + +-- make sure that both of the nodes have the function because we've succeeded +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); + -- valid distribution with distribution_arg_name -- case insensitive SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1'); @@ -132,18 +166,29 @@ FROM pg_dist_partition, citus.pg_dist_object as objects WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'add_with_param_names(int, int)'::regprocedure; --- if not paremeters are supplied, we'd see that function doesn't have --- distribution_argument_index and colocationid -SELECT create_distributed_function('add_mixed_param_names(int, int)'); -SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object -WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure; - --- also show that we can use the function -SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2; - -- clear objects +SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; + SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; -SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$); + +-- This is hacky, but we should clean-up the resources as below + +\c - - - :worker_1_port +SET client_min_messages TO error; -- suppress cascading objects dropping +UPDATE pg_dist_local_group SET groupid = 0; +SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; +TRUNCATE pg_dist_node; +DROP SCHEMA function_tests CASCADE; + +\c - - - :worker_2_port +SET client_min_messages TO error; -- suppress cascading objects dropping +UPDATE pg_dist_local_group SET groupid = 0; +SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; +TRUNCATE pg_dist_node; +DROP SCHEMA function_tests CASCADE; + +\c - - - :master_port + DROP USER functionuser; SELECT run_command_on_workers($$DROP USER functionuser;$$); diff --git a/src/test/regress/sql/fast_path_router_modify.sql b/src/test/regress/sql/fast_path_router_modify.sql index c3f6f9cd2..ee55284b6 100644 --- a/src/test/regress/sql/fast_path_router_modify.sql +++ b/src/test/regress/sql/fast_path_router_modify.sql @@ -110,4 +110,4 @@ SELECT modify_fast_path_plpsql(6,6); RESET client_min_messages; -DROP SCHEMA fast_path_router_modify CASCADE; \ No newline at end of file +DROP SCHEMA fast_path_router_modify CASCADE;