From d37745bfc71347f56b71cab4a576759e3ace5616 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 19 Sep 2019 18:54:52 +0200 Subject: [PATCH] Sync metadata to worker nodes after create_distributed_function Since the distributed functions are useful when the workers have metadata, we automatically sync it. Also, after master_add_node(). We do it lazily and let the deamon sync it. That's mainly because the metadata syncing cannot be done in transaction blocks, and we don't want to add lots of transactional limitations to master_add_node() and create_distributed_function(). --- .../distributed/commands/dependencies.c | 4 +- src/backend/distributed/commands/function.c | 51 +++++ src/backend/distributed/metadata/distobject.c | 58 ++++++ .../distributed/metadata/metadata_sync.c | 25 ++- src/backend/distributed/test/metadata_sync.c | 32 ++- src/backend/distributed/utils/node_metadata.c | 12 ++ src/include/distributed/metadata/distobject.h | 1 + .../distributed/metadata/pg_dist_object.h | 6 +- src/include/distributed/metadata_sync.h | 2 + .../expected/distributed_functions.out | 190 +++++++++++++----- .../isolation_dump_global_wait_edges.out | 18 +- .../isolation_dump_global_wait_edges_0.out | 18 +- ...lation_ensure_dependency_activate_node.out | 73 ++++++- src/test/regress/pg_regress_multi.pl | 2 + ...ation_ensure_dependency_activate_node.spec | 39 +++- .../regress/sql/distributed_functions.sql | 79 ++++++-- .../regress/sql/fast_path_router_modify.sql | 2 +- 17 files changed, 494 insertions(+), 118 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 41226d041..40558fe8e 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -202,7 +202,9 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency) /* - * ReplicateAllDependenciesToNode replicate all previously marked objects to a worker node + * ReplicateAllDependenciesToNode replicate all previously marked objects to a worker + * node. The function also sets clusterHasDistributedFunction if there are any + * distributed functions. */ void ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 26dc5528e..7621838a1 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -16,6 +16,7 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "funcapi.h" #include "access/htup_details.h" @@ -26,12 +27,14 @@ #include "catalog/pg_type.h" #include "distributed/colocation_utils.h" #include "distributed/master_protocol.h" +#include "distributed/maintenanced.h" #include "distributed/metadata_sync.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata/pg_dist_object.h" #include "distributed/multi_executor.h" #include "distributed/relation_access_tracking.h" #include "distributed/worker_transaction.h" +#include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgrprotos.h" #include "utils/fmgroids.h" @@ -54,9 +57,12 @@ static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, int *distribution_argument_index, int *colocationId); static void EnsureSequentialModeForFunctionDDL(void); +static void TriggerSyncMetadataToPrimaryNodes(void); + PG_FUNCTION_INFO_V1(create_distributed_function); + /* * create_distributed_function gets a function or procedure name with their list of * argument types in parantheses, then it creates a new distributed function. @@ -165,6 +171,13 @@ create_distributed_function(PG_FUNCTION_ARGS) /* if provided, make sure to record the distribution argument and colocationId */ UpdateFunctionDistributionInfo(&functionAddress, &distributionArgumentIndex, &colocationId); + + /* + * Once we have at least one distributed function/procedure with distribution + * argument, we sync the metadata to nodes so that the function/procedure + * delegation can be handled locally on the nodes. + */ + TriggerSyncMetadataToPrimaryNodes(); } PG_RETURN_VOID(); @@ -561,3 +574,41 @@ EnsureSequentialModeForFunctionDDL(void) "use only one connection for all future commands"))); SetLocalMultiShardModifyModeToSequential(); } + + +/* + * TriggerSyncMetadataToPrimaryNodes iterates over the active primary nodes, + * and triggers the metadata syncs if the node has not the metadata. Later, + * maintenance daemon will sync the metadata to nodes. + */ +static void +TriggerSyncMetadataToPrimaryNodes(void) +{ + List *workerList = ActivePrimaryNodeList(ShareLock); + ListCell *workerCell = NULL; + bool triggerMetadataSync = false; + + foreach(workerCell, workerList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerCell); + + /* if already has metadata, no need to do it again */ + if (!workerNode->hasMetadata) + { + /* + * Let the maintanince deamon do the hard work of syncing the metadata. We prefer + * this because otherwise node activation might fail withing transaction blocks. + */ + LockRelationOid(DistNodeRelationId(), ExclusiveLock); + MarkNodeHasMetadata(workerNode->workerName, workerNode->workerPort, true); + + triggerMetadataSync = true; + } + } + + /* let the maintanince deamon know about the metadata sync */ + if (triggerMetadataSync) + { + TriggerMetadataSync(MyDatabaseId); + } +} diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index b9d23f950..ae0423300 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -20,6 +20,7 @@ #include "catalog/namespace.h" #include "catalog/objectaddress.h" #include "catalog/pg_namespace.h" +#include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata/pg_dist_object.h" @@ -260,6 +261,63 @@ IsObjectDistributed(const ObjectAddress *address) } +/* + * ClusterHasDistributedFunctionWithDistArgument returns true if there + * is at least one distributed function in the cluster with distribution + * argument index set. + */ +bool +ClusterHasDistributedFunctionWithDistArgument(void) +{ + bool foundDistributedFunction = false; + + SysScanDesc pgDistObjectScan = NULL; + HeapTuple pgDistObjectTup = NULL; + + Relation pgDistObjectRel = heap_open(DistObjectRelationId(), AccessShareLock); + +#if (PG_VERSION_NUM >= 110000) + TupleDesc tupleDescriptor = RelationGetDescr(pgDistObjectRel); +#endif + + pgDistObjectScan = + systable_beginscan(pgDistObjectRel, InvalidOid, false, NULL, 0, NULL); + while (HeapTupleIsValid(pgDistObjectTup = systable_getnext(pgDistObjectScan))) + { + Form_pg_dist_object pg_dist_object = + (Form_pg_dist_object) GETSTRUCT(pgDistObjectTup); + + if (pg_dist_object->classid == ProcedureRelationId) + { + bool distArgumentIsNull = false; +#if (PG_VERSION_NUM >= 110000) + distArgumentIsNull = + heap_attisnull(pgDistObjectTup, + Anum_pg_dist_object_distribution_argument_index, + tupleDescriptor); +#else + distArgumentIsNull = + heap_attisnull(pgDistObjectTup, + Anum_pg_dist_object_distribution_argument_index); +#endif + + /* we found one distributed function that has an distribution argument */ + if (!distArgumentIsNull) + { + foundDistributedFunction = true; + + break; + } + } + } + + systable_endscan(pgDistObjectScan); + relation_close(pgDistObjectRel, AccessShareLock); + + return foundDistributedFunction; +} + + /* * GetDistributedObjectAddressList returns a list of ObjectAddresses that contains all * distributed objects as marked in pg_dist_object diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 6d3ab9ab4..1c0920841 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -53,7 +53,6 @@ static char * LocalGroupIdUpdateCommand(int32 groupId); -static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata); static void UpdateDistNodeBoolAttr(char *nodeName, int32 nodePort, int attrNum, bool value); static List * SequenceDDLCommandsForTable(Oid relationId); @@ -76,10 +75,24 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) { text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); - char *nodeNameString = text_to_cstring(nodeName); - char *escapedNodeName = quote_literal_cstr(nodeNameString); + char *nodeNameString = text_to_cstring(nodeName); + + StartMetadatSyncToNode(nodeNameString, nodePort); + + PG_RETURN_VOID(); +} + + +/* + * StartMetadatSyncToNode is the internal API for + * start_metadata_sync_to_node(). + */ +void +StartMetadatSyncToNode(char *nodeNameString, int32 nodePort) +{ WorkerNode *workerNode = NULL; + char *escapedNodeName = quote_literal_cstr(nodeNameString); /* fail if metadata synchronization doesn't succeed */ bool raiseInterrupts = true; @@ -119,13 +132,11 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) * If this is a secondary node we can't actually sync metadata to it; we assume * the primary node is receiving metadata. */ - PG_RETURN_VOID(); + return; } SyncMetadataSnapshotToNode(workerNode, raiseInterrupts); MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true); - - PG_RETURN_VOID(); } @@ -945,7 +956,7 @@ LocalGroupIdUpdateCommand(int32 groupId) * MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in * pg_dist_node to hasMetadata. */ -static void +void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata) { UpdateDistNodeBoolAttr(nodeName, nodePort, diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index f14735c0d..123a5492b 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -80,8 +80,36 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS) uint32 timeout = PG_GETARG_UINT32(0); int waitResult = 0; - MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION, - "localhost", PostPortNumber); + List *workerList = ActivePrimaryNodeList(NoLock); + ListCell *workerCell = NULL; + bool waitNotifications = false; + MultiConnection *connection = NULL; + + foreach(workerCell, workerList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerCell); + + /* if already has metadata, no need to do it again */ + if (workerNode->hasMetadata && !workerNode->metadataSynced) + { + waitNotifications = true; + break; + } + } + + /* + * If all the metadata nodes have already been synced, we should not wait. + * That's primarily because the maintenance deamon might have already sent + * the notification and we'd wait unnecessarily here. Worse, the test outputs + * might be inconsistent across executions due to the warning. + */ + if (!waitNotifications) + { + PG_RETURN_VOID(); + } + + connection = GetNodeConnection(FORCE_NEW_CONNECTION, + "localhost", PostPortNumber); ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL); waitResult = WaitLatchOrSocket(NULL, WL_SOCKET_READABLE | WL_TIMEOUT, diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 7c202d550..325fef9df 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -26,6 +26,7 @@ #include "distributed/maintenanced.h" #include "distributed/master_protocol.h" #include "distributed/master_metadata_utility.h" +#include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" @@ -475,6 +476,16 @@ ActivateNode(char *nodeName, int nodePort) EnsureNoModificationsHaveBeenDone(); ReplicateAllDependenciesToNode(nodeName, nodePort); ReplicateAllReferenceTablesToNode(nodeName, nodePort); + + /* + * Let the maintanince deamon do the hard work of syncing the metadata. We prefer + * this because otherwise node activation might fail withing transaction blocks. + */ + if (ClusterHasDistributedFunctionWithDistArgument()) + { + MarkNodeHasMetadata(nodeName, nodePort, true); + TriggerMetadataSync(MyDatabaseId); + } } return workerNode->nodeId; @@ -1073,6 +1084,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, { List *workerNodeList = list_make1(workerNode); char *nodeInsertCommand = NodeListInsertCommand(workerNodeList); + SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand); } diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index d2892a238..c609ca795 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -18,6 +18,7 @@ extern bool ObjectExists(const ObjectAddress *address); extern bool IsObjectDistributed(const ObjectAddress *address); +extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); diff --git a/src/include/distributed/metadata/pg_dist_object.h b/src/include/distributed/metadata/pg_dist_object.h index 2eea47a0b..442bb408b 100644 --- a/src/include/distributed/metadata/pg_dist_object.h +++ b/src/include/distributed/metadata/pg_dist_object.h @@ -28,13 +28,13 @@ typedef struct FormData_pg_dist_object Oid objid; /* object id of the distributed object */ int32 objsubid; /* object sub id of the distributed object, eg. attnum */ - uint32 distribution_argument_index; /* only valid for distributed functions/procedures */ - uint32 colocationid; /* only valid for distributed functions/procedures */ - #ifdef CATALOG_VARLEN /* variable-length fields start here */ text type; text[] object_names; text[] object_arguments; + + uint32 distribution_argument_index; /* only valid for distributed functions/procedures */ + uint32 colocationid; /* only valid for distributed functions/procedures */ #endif } FormData_pg_dist_object; diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 05cc00b7b..aaf79d8b1 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -21,6 +21,7 @@ extern int MetadataSyncInterval; extern int MetadataSyncRetryInterval; /* Functions declarations for metadata syncing */ +extern void StartMetadatSyncToNode(char *nodeNameString, int32 nodePort); extern bool ClusterHasKnownMetadataWorkers(void); extern bool ShouldSyncTableMetadata(Oid relationId); extern List * MetadataCreateCommands(void); @@ -40,6 +41,7 @@ extern char * CreateSchemaDDLCommand(Oid schemaId); extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, uint64 shardLength, int32 groupId); extern void CreateTableMetadataOnWorkers(Oid relationId); +extern void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata); extern void MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced); extern bool SyncMetadataToNodes(void); extern bool SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort, diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 4c2c08125..63a4cbc3e 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -18,19 +18,6 @@ CREATE FUNCTION add(integer, integer) RETURNS integer LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -SELECT create_distributed_function('add(int,int)', '$1'); - create_distributed_function ------------------------------ - -(1 row) - -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; - nodename | nodeport | success | result ------------+----------+---------+-------- - localhost | 57637 | t | 5 - localhost | 57638 | t | 5 -(2 rows) - -- Test some combination of functions without ddl propagation -- This will prevent the workers from having those types created. They are -- created just-in-time on function distribution @@ -39,19 +26,6 @@ CREATE TYPE dup_result AS (f1 int, f2 text); CREATE FUNCTION dup(int) RETURNS dup_result AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; -SELECT create_distributed_function('dup(int)', '$1'); - create_distributed_function ------------------------------ - -(1 row) - -SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; - nodename | nodeport | success | result ------------+----------+---------+------------------- - localhost | 57637 | t | (42,"42 is text") - localhost | 57638 | t | (42,"42 is text") -(2 rows) - CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer AS 'select $1 + $2;' LANGUAGE SQL @@ -67,6 +41,71 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; +-- make sure that none of the active and primary nodes hasmetadata +-- at the start of the test +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + bool_or +--------- + f +(1 row) + +-- if not paremeters are supplied, we'd see that function doesn't have +-- distribution_argument_index and colocationid +SELECT create_distributed_function('add_mixed_param_names(int, int)'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object +WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure; + ?column? | ?column? +----------+---------- + t | t +(1 row) + +-- also show that we can use the function +SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 5 + localhost | 57638 | t | 5 +(2 rows) + +-- make sure that none of the active and primary nodes hasmetadata +-- since the function doesn't have a parameter +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + bool_or +--------- + f +(1 row) + +SELECT create_distributed_function('dup(int)', '$1'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+------------------- + localhost | 57637 | t | (42,"42 is text") + localhost | 57638 | t | (42,"42 is text") +(2 rows) + +SELECT create_distributed_function('add(int,int)', '$1'); + create_distributed_function +----------------------------- + +(1 row) + +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + nodename | nodeport | success | result +-----------+----------+---------+-------- + localhost | 57637 | t | 5 + localhost | 57638 | t | 5 +(2 rows) + -- postgres doesn't accept parameter names in the regprocedure input SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); ERROR: syntax error at or near "int" @@ -111,6 +150,31 @@ HINT: To use the default value, set colocate_with option to "default" SELECT create_distributed_function('add_with_param_names(int, int)', ''); ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() +-- The first distributed function syncs the metadata to nodes +-- and metadata syncing is not supported within transaction blocks +BEGIN; + SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); + create_distributed_function +----------------------------- + +(1 row) + +ROLLBACK; +-- make sure that none of the nodes have the function because we've rollbacked +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); + run_command_on_workers +------------------------ + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + +-- make sure that none of the active and primary nodes hasmetadata +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + bool_or +--------- + t +(1 row) + -- valid distribution with distribution_arg_name SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); create_distributed_function @@ -118,6 +182,21 @@ SELECT create_distributed_function('add_with_param_names(int, int)', distributio (1 row) +-- make sure that the primary nodes are now metadata synced +select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + bool_and +---------- + t +(1 row) + +-- make sure that both of the nodes have the function because we've succeeded +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); + run_command_on_workers +------------------------ + (localhost,57637,t,1) + (localhost,57638,t,1) +(2 rows) + -- valid distribution with distribution_arg_name -- case insensitive SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1'); create_distributed_function @@ -218,39 +297,44 @@ WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass t (1 row) --- if not paremeters are supplied, we'd see that function doesn't have --- distribution_argument_index and colocationid -SELECT create_distributed_function('add_mixed_param_names(int, int)'); - create_distributed_function ------------------------------ +-- clear objects +SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; + stop_metadata_sync_to_node +---------------------------- + -(1 row) - -SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object -WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure; - ?column? | ?column? -----------+---------- - t | t -(1 row) - --- also show that we can use the function -SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2; - nodename | nodeport | success | result ------------+----------+---------+-------- - localhost | 57637 | t | 5 - localhost | 57638 | t | 5 (2 rows) --- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; -SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$); - run_command_on_workers ------------------------------------ - (localhost,57637,t,"DROP SCHEMA") - (localhost,57638,t,"DROP SCHEMA") -(2 rows) +-- This is hacky, but we should clean-up the resources as below +\c - - - :worker_1_port +SET client_min_messages TO error; -- suppress cascading objects dropping +UPDATE pg_dist_local_group SET groupid = 0; +SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; + worker_drop_distributed_table +------------------------------- + + + +(3 rows) +TRUNCATE pg_dist_node; +DROP SCHEMA function_tests CASCADE; +\c - - - :worker_2_port +SET client_min_messages TO error; -- suppress cascading objects dropping +UPDATE pg_dist_local_group SET groupid = 0; +SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; + worker_drop_distributed_table +------------------------------- + + + +(3 rows) + +TRUNCATE pg_dist_node; +DROP SCHEMA function_tests CASCADE; +\c - - - :master_port DROP USER functionuser; SELECT run_command_on_workers($$DROP USER functionuser;$$); run_command_on_workers diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 85a92131d..6976f987d 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -281 280 f +282 281 f transactionnumberwaitingtransactionnumbers -280 -281 280 +281 +282 281 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -285 284 f -286 284 f -286 285 t +286 285 f +287 285 f +287 286 t transactionnumberwaitingtransactionnumbers -284 -285 284 -286 284,285 +285 +286 285 +287 285,286 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges_0.out b/src/test/regress/expected/isolation_dump_global_wait_edges_0.out index 6976f987d..2153c8ec5 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges_0.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges_0.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -282 281 f +283 282 f transactionnumberwaitingtransactionnumbers -281 -282 281 +282 +283 282 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -286 285 f -287 285 f -287 286 t +287 286 f +288 286 f +288 287 t transactionnumberwaitingtransactionnumbers -285 -286 285 -287 285,286 +286 +287 286 +288 286,287 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_ensure_dependency_activate_node.out b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out index 1180a3f69..25b778060 100644 --- a/src/test/regress/expected/isolation_ensure_dependency_activate_node.out +++ b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out @@ -1739,7 +1739,7 @@ master_remove_node -starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-distribute-function s1-commit s2-print-distributed-objects +starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-distribute-function s1-commit s2-begin s2-commit s3-wait-for-metadata-sync s1-print-distributed-objects ?column? 1 @@ -1815,7 +1815,21 @@ step s2-distribute-function: <... completed> create_distributed_function -step s2-print-distributed-objects: +step s2-begin: + BEGIN; + +step s2-commit: + COMMIT; + +step s3-wait-for-metadata-sync: + SELECT public.wait_until_metadata_sync(5000); + +wait_until_metadata_sync + + +step s1-print-distributed-objects: + SELECT 1 FROM master_add_node('localhost', 57638); + -- print an overview of all distributed objects SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1; @@ -1831,6 +1845,11 @@ step s2-print-distributed-objects: SELECT count(*) FROM pg_proc WHERE proname='add'; SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add';$$); + SELECT master_remove_node('localhost', 57638); + +?column? + +1 pg_identify_object_as_address (function,"{public,add}","{integer,integer}") @@ -1858,9 +1877,12 @@ run_command_on_workers master_remove_node +master_remove_node + + -starting permutation: s1-print-distributed-objects s1-begin s2-public-schema s2-distribute-function s1-add-worker s1-commit s2-print-distributed-objects +starting permutation: s1-print-distributed-objects s1-begin s2-public-schema s2-distribute-function s2-begin s2-commit s3-wait-for-metadata-sync s1-add-worker s1-commit s3-wait-for-metadata-sync s2-print-distributed-objects ?column? 1 @@ -1926,6 +1948,18 @@ step s2-distribute-function: create_distributed_function +step s2-begin: + BEGIN; + +step s2-commit: + COMMIT; + +step s3-wait-for-metadata-sync: + SELECT public.wait_until_metadata_sync(5000); + +wait_until_metadata_sync + + step s1-add-worker: SELECT 1 FROM master_add_node('localhost', 57638); @@ -1935,6 +1969,12 @@ step s1-add-worker: step s1-commit: COMMIT; +step s3-wait-for-metadata-sync: + SELECT public.wait_until_metadata_sync(5000); + +wait_until_metadata_sync + + step s2-print-distributed-objects: -- print an overview of all distributed objects SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1; @@ -1980,7 +2020,7 @@ master_remove_node -starting permutation: s1-print-distributed-objects s1-begin s2-begin s2-create-schema s2-distribute-function s1-add-worker s2-commit s1-commit s2-print-distributed-objects +starting permutation: s1-print-distributed-objects s2-begin s2-create-schema s2-distribute-function s2-commit s3-wait-for-metadata-sync s1-begin s1-add-worker s1-commit s3-wait-for-metadata-sync s2-print-distributed-objects ?column? 1 @@ -2033,9 +2073,6 @@ run_command_on_workers master_remove_node -step s1-begin: - BEGIN; - step s2-begin: BEGIN; @@ -2050,19 +2087,33 @@ step s2-distribute-function: create_distributed_function -step s1-add-worker: - SELECT 1 FROM master_add_node('localhost', 57638); - step s2-commit: COMMIT; -step s1-add-worker: <... completed> +step s3-wait-for-metadata-sync: + SELECT public.wait_until_metadata_sync(5000); + +wait_until_metadata_sync + + +step s1-begin: + BEGIN; + +step s1-add-worker: + SELECT 1 FROM master_add_node('localhost', 57638); + ?column? 1 step s1-commit: COMMIT; +step s3-wait-for-metadata-sync: + SELECT public.wait_until_metadata_sync(5000); + +wait_until_metadata_sync + + step s2-print-distributed-objects: -- print an overview of all distributed objects SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1; diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 210852645..3b325431c 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -376,6 +376,8 @@ if($isolationtester) push(@pgOptions, '-c', "citus.log_distributed_deadlock_detection=on"); push(@pgOptions, '-c', "citus.distributed_deadlock_detection_factor=-1"); push(@pgOptions, '-c', "citus.shard_count=4"); + push(@pgOptions, '-c', "citus.metadata_sync_interval=1000"); + push(@pgOptions, '-c', "citus.metadata_sync_retry_interval=100"); } # Add externally added options last, so they overwrite the default ones above diff --git a/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec index 7c6d00eef..d249ed363 100644 --- a/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec +++ b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec @@ -2,13 +2,21 @@ # add single one of the nodes for the purpose of the test setup { + CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER) + RETURNS void + LANGUAGE C STRICT VOLATILE + AS 'citus', $$wait_until_metadata_sync$$; + SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; - SELECT 1 FROM master_add_node('localhost', 57637); + SELECT 1 FROM master_add_node('localhost', 57637); } # ensure that both nodes exists for the remaining of the isolation tests teardown { + SELECT 1 FROM master_add_node('localhost', 57637); + SELECT 1 FROM master_add_node('localhost', 57638); + -- schema drops are not cascaded SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS myschema CASCADE;$$); DROP SCHEMA IF EXISTS myschema CASCADE; @@ -36,7 +44,7 @@ teardown -- similarly drop the function in the workers manually SELECT run_command_on_workers($$DROP FUNCTION IF EXISTS add(INT,INT) CASCADE;$$); - SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; + SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; } session "s1" @@ -150,6 +158,7 @@ step "s2-print-distributed-objects" SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add';$$); } + session "s3" step "s3-public-schema" @@ -180,6 +189,17 @@ step "s3-commit" COMMIT; } + +step "s3-wait-for-metadata-sync" +{ + SELECT public.wait_until_metadata_sync(5000); +} + +step "s3-listen-channel" +{ + LISTEN metadata_sync; +} + session "s4" step "s4-public-schema" @@ -216,6 +236,7 @@ step "s4-commit" COMMIT; } + # schema only tests permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-print-distributed-objects" permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-commit" "s2-print-distributed-objects" @@ -236,6 +257,14 @@ permutation "s1-print-distributed-objects" "s1-begin" "s2-public-schema" "s2-cre permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-schema" "s2-create-type" "s2-create-table-with-type" "s1-add-worker" "s2-commit" "s1-commit" "s2-print-distributed-objects" # distributed function tests -permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-distribute-function" "s1-commit" "s2-print-distributed-objects" -permutation "s1-print-distributed-objects" "s1-begin" "s2-public-schema" "s2-distribute-function" "s1-add-worker" "s1-commit" "s2-print-distributed-objects" -permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-schema" "s2-distribute-function" "s1-add-worker" "s2-commit" "s1-commit" "s2-print-distributed-objects" +# isolation tests are not very simple psql, so trigger NOTIFY reliably for +# s3-wait-for-metadata-sync step, we do "s2-begin" followed directly by +# "s2-commit", because "COMMIT" syncs the messages + +permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-distribute-function" "s1-commit" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s1-print-distributed-objects" +permutation "s1-print-distributed-objects" "s1-begin" "s2-public-schema" "s2-distribute-function" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s1-add-worker" "s1-commit" "s3-wait-for-metadata-sync" "s2-print-distributed-objects" + +# we cannot run the following operations concurrently +# the problem is that NOTIFY event doesn't (reliably) happen before COMMIT +# so we have to commit s2 before s1 starts +permutation "s1-print-distributed-objects" "s2-begin" "s2-create-schema" "s2-distribute-function" "s2-commit" "s3-wait-for-metadata-sync" "s1-begin" "s1-add-worker" "s1-commit" "s3-wait-for-metadata-sync" "s2-print-distributed-objects" diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index 2dc6f7a42..35a23782a 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -8,16 +8,12 @@ CREATE SCHEMA function_tests AUTHORIZATION functionuser; SET search_path TO function_tests; SET citus.shard_count TO 4; - -- Create and distribute a simple function CREATE FUNCTION add(integer, integer) RETURNS integer AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; -SELECT create_distributed_function('add(int,int)', '$1'); -SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; - -- Test some combination of functions without ddl propagation -- This will prevent the workers from having those types created. They are @@ -30,9 +26,6 @@ CREATE FUNCTION dup(int) RETURNS dup_result AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ LANGUAGE SQL; -SELECT create_distributed_function('dup(int)', '$1'); -SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; - CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer AS 'select $1 + $2;' LANGUAGE SQL @@ -51,6 +44,29 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer IMMUTABLE RETURNS NULL ON NULL INPUT; +-- make sure that none of the active and primary nodes hasmetadata +-- at the start of the test +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + +-- if not paremeters are supplied, we'd see that function doesn't have +-- distribution_argument_index and colocationid +SELECT create_distributed_function('add_mixed_param_names(int, int)'); +SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object +WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure; + +-- also show that we can use the function +SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2; + +-- make sure that none of the active and primary nodes hasmetadata +-- since the function doesn't have a parameter +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + +SELECT create_distributed_function('dup(int)', '$1'); +SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2; + +SELECT create_distributed_function('add(int,int)', '$1'); +SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2; + -- postgres doesn't accept parameter names in the regprocedure input SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); @@ -77,9 +93,27 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL) -- empty string distribution_arg_index SELECT create_distributed_function('add_with_param_names(int, int)', ''); +-- The first distributed function syncs the metadata to nodes +-- and metadata syncing is not supported within transaction blocks +BEGIN; + SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); +ROLLBACK; + +-- make sure that none of the nodes have the function because we've rollbacked +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); + +-- make sure that none of the active and primary nodes hasmetadata +select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + -- valid distribution with distribution_arg_name SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); +-- make sure that the primary nodes are now metadata synced +select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary'; + +-- make sure that both of the nodes have the function because we've succeeded +SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$); + -- valid distribution with distribution_arg_name -- case insensitive SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1'); @@ -132,18 +166,29 @@ FROM pg_dist_partition, citus.pg_dist_object as objects WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'add_with_param_names(int, int)'::regprocedure; --- if not paremeters are supplied, we'd see that function doesn't have --- distribution_argument_index and colocationid -SELECT create_distributed_function('add_mixed_param_names(int, int)'); -SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object -WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure; - --- also show that we can use the function -SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2; - -- clear objects +SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; + SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; -SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$); + +-- This is hacky, but we should clean-up the resources as below + +\c - - - :worker_1_port +SET client_min_messages TO error; -- suppress cascading objects dropping +UPDATE pg_dist_local_group SET groupid = 0; +SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; +TRUNCATE pg_dist_node; +DROP SCHEMA function_tests CASCADE; + +\c - - - :worker_2_port +SET client_min_messages TO error; -- suppress cascading objects dropping +UPDATE pg_dist_local_group SET groupid = 0; +SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; +TRUNCATE pg_dist_node; +DROP SCHEMA function_tests CASCADE; + +\c - - - :master_port + DROP USER functionuser; SELECT run_command_on_workers($$DROP USER functionuser;$$); diff --git a/src/test/regress/sql/fast_path_router_modify.sql b/src/test/regress/sql/fast_path_router_modify.sql index c3f6f9cd2..ee55284b6 100644 --- a/src/test/regress/sql/fast_path_router_modify.sql +++ b/src/test/regress/sql/fast_path_router_modify.sql @@ -110,4 +110,4 @@ SELECT modify_fast_path_plpsql(6,6); RESET client_min_messages; -DROP SCHEMA fast_path_router_modify CASCADE; \ No newline at end of file +DROP SCHEMA fast_path_router_modify CASCADE;