Sync metadata to worker nodes after create_distributed_function

Since the distributed functions are useful when the workers have
metadata, we automatically sync it.

Also, after master_add_node(). We do it lazily and let the deamon
sync it. That's mainly because the metadata syncing cannot be done
in transaction blocks, and we don't want to add lots of transactional
limitations to master_add_node() and create_distributed_function().
pull/3005/head
Onder Kalaci 2019-09-19 18:54:52 +02:00
parent 59fe461d4a
commit d37745bfc7
17 changed files with 494 additions and 118 deletions

View File

@ -202,7 +202,9 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
/* /*
* ReplicateAllDependenciesToNode replicate all previously marked objects to a worker node * ReplicateAllDependenciesToNode replicate all previously marked objects to a worker
* node. The function also sets clusterHasDistributedFunction if there are any
* distributed functions.
*/ */
void void
ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) ReplicateAllDependenciesToNode(const char *nodeName, int nodePort)

View File

@ -16,6 +16,7 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h"
#include "funcapi.h" #include "funcapi.h"
#include "access/htup_details.h" #include "access/htup_details.h"
@ -26,12 +27,14 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h" #include "distributed/metadata/distobject.h"
#include "distributed/metadata/pg_dist_object.h" #include "distributed/metadata/pg_dist_object.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "storage/lmgr.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgrprotos.h" #include "utils/fmgrprotos.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
@ -54,9 +57,12 @@ static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
int *distribution_argument_index, int *distribution_argument_index,
int *colocationId); int *colocationId);
static void EnsureSequentialModeForFunctionDDL(void); static void EnsureSequentialModeForFunctionDDL(void);
static void TriggerSyncMetadataToPrimaryNodes(void);
PG_FUNCTION_INFO_V1(create_distributed_function); PG_FUNCTION_INFO_V1(create_distributed_function);
/* /*
* create_distributed_function gets a function or procedure name with their list of * create_distributed_function gets a function or procedure name with their list of
* argument types in parantheses, then it creates a new distributed function. * argument types in parantheses, then it creates a new distributed function.
@ -165,6 +171,13 @@ create_distributed_function(PG_FUNCTION_ARGS)
/* if provided, make sure to record the distribution argument and colocationId */ /* if provided, make sure to record the distribution argument and colocationId */
UpdateFunctionDistributionInfo(&functionAddress, &distributionArgumentIndex, UpdateFunctionDistributionInfo(&functionAddress, &distributionArgumentIndex,
&colocationId); &colocationId);
/*
* Once we have at least one distributed function/procedure with distribution
* argument, we sync the metadata to nodes so that the function/procedure
* delegation can be handled locally on the nodes.
*/
TriggerSyncMetadataToPrimaryNodes();
} }
PG_RETURN_VOID(); PG_RETURN_VOID();
@ -561,3 +574,41 @@ EnsureSequentialModeForFunctionDDL(void)
"use only one connection for all future commands"))); "use only one connection for all future commands")));
SetLocalMultiShardModifyModeToSequential(); SetLocalMultiShardModifyModeToSequential();
} }
/*
* TriggerSyncMetadataToPrimaryNodes iterates over the active primary nodes,
* and triggers the metadata syncs if the node has not the metadata. Later,
* maintenance daemon will sync the metadata to nodes.
*/
static void
TriggerSyncMetadataToPrimaryNodes(void)
{
List *workerList = ActivePrimaryNodeList(ShareLock);
ListCell *workerCell = NULL;
bool triggerMetadataSync = false;
foreach(workerCell, workerList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerCell);
/* if already has metadata, no need to do it again */
if (!workerNode->hasMetadata)
{
/*
* Let the maintanince deamon do the hard work of syncing the metadata. We prefer
* this because otherwise node activation might fail withing transaction blocks.
*/
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
MarkNodeHasMetadata(workerNode->workerName, workerNode->workerPort, true);
triggerMetadataSync = true;
}
}
/* let the maintanince deamon know about the metadata sync */
if (triggerMetadataSync)
{
TriggerMetadataSync(MyDatabaseId);
}
}

View File

@ -20,6 +20,7 @@
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/objectaddress.h" #include "catalog/objectaddress.h"
#include "catalog/pg_namespace.h" #include "catalog/pg_namespace.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/metadata/distobject.h" #include "distributed/metadata/distobject.h"
#include "distributed/metadata/pg_dist_object.h" #include "distributed/metadata/pg_dist_object.h"
@ -260,6 +261,63 @@ IsObjectDistributed(const ObjectAddress *address)
} }
/*
* ClusterHasDistributedFunctionWithDistArgument returns true if there
* is at least one distributed function in the cluster with distribution
* argument index set.
*/
bool
ClusterHasDistributedFunctionWithDistArgument(void)
{
bool foundDistributedFunction = false;
SysScanDesc pgDistObjectScan = NULL;
HeapTuple pgDistObjectTup = NULL;
Relation pgDistObjectRel = heap_open(DistObjectRelationId(), AccessShareLock);
#if (PG_VERSION_NUM >= 110000)
TupleDesc tupleDescriptor = RelationGetDescr(pgDistObjectRel);
#endif
pgDistObjectScan =
systable_beginscan(pgDistObjectRel, InvalidOid, false, NULL, 0, NULL);
while (HeapTupleIsValid(pgDistObjectTup = systable_getnext(pgDistObjectScan)))
{
Form_pg_dist_object pg_dist_object =
(Form_pg_dist_object) GETSTRUCT(pgDistObjectTup);
if (pg_dist_object->classid == ProcedureRelationId)
{
bool distArgumentIsNull = false;
#if (PG_VERSION_NUM >= 110000)
distArgumentIsNull =
heap_attisnull(pgDistObjectTup,
Anum_pg_dist_object_distribution_argument_index,
tupleDescriptor);
#else
distArgumentIsNull =
heap_attisnull(pgDistObjectTup,
Anum_pg_dist_object_distribution_argument_index);
#endif
/* we found one distributed function that has an distribution argument */
if (!distArgumentIsNull)
{
foundDistributedFunction = true;
break;
}
}
}
systable_endscan(pgDistObjectScan);
relation_close(pgDistObjectRel, AccessShareLock);
return foundDistributedFunction;
}
/* /*
* GetDistributedObjectAddressList returns a list of ObjectAddresses that contains all * GetDistributedObjectAddressList returns a list of ObjectAddresses that contains all
* distributed objects as marked in pg_dist_object * distributed objects as marked in pg_dist_object

View File

@ -53,7 +53,6 @@
static char * LocalGroupIdUpdateCommand(int32 groupId); static char * LocalGroupIdUpdateCommand(int32 groupId);
static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata);
static void UpdateDistNodeBoolAttr(char *nodeName, int32 nodePort, int attrNum, static void UpdateDistNodeBoolAttr(char *nodeName, int32 nodePort, int attrNum,
bool value); bool value);
static List * SequenceDDLCommandsForTable(Oid relationId); static List * SequenceDDLCommandsForTable(Oid relationId);
@ -76,10 +75,24 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
{ {
text *nodeName = PG_GETARG_TEXT_P(0); text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1); int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
char *escapedNodeName = quote_literal_cstr(nodeNameString);
char *nodeNameString = text_to_cstring(nodeName);
StartMetadatSyncToNode(nodeNameString, nodePort);
PG_RETURN_VOID();
}
/*
* StartMetadatSyncToNode is the internal API for
* start_metadata_sync_to_node().
*/
void
StartMetadatSyncToNode(char *nodeNameString, int32 nodePort)
{
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
char *escapedNodeName = quote_literal_cstr(nodeNameString);
/* fail if metadata synchronization doesn't succeed */ /* fail if metadata synchronization doesn't succeed */
bool raiseInterrupts = true; bool raiseInterrupts = true;
@ -119,13 +132,11 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
* If this is a secondary node we can't actually sync metadata to it; we assume * If this is a secondary node we can't actually sync metadata to it; we assume
* the primary node is receiving metadata. * the primary node is receiving metadata.
*/ */
PG_RETURN_VOID(); return;
} }
SyncMetadataSnapshotToNode(workerNode, raiseInterrupts); SyncMetadataSnapshotToNode(workerNode, raiseInterrupts);
MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true); MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true);
PG_RETURN_VOID();
} }
@ -945,7 +956,7 @@ LocalGroupIdUpdateCommand(int32 groupId)
* MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in * MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in
* pg_dist_node to hasMetadata. * pg_dist_node to hasMetadata.
*/ */
static void void
MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata) MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata)
{ {
UpdateDistNodeBoolAttr(nodeName, nodePort, UpdateDistNodeBoolAttr(nodeName, nodePort,

View File

@ -80,8 +80,36 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
uint32 timeout = PG_GETARG_UINT32(0); uint32 timeout = PG_GETARG_UINT32(0);
int waitResult = 0; int waitResult = 0;
MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION, List *workerList = ActivePrimaryNodeList(NoLock);
"localhost", PostPortNumber); ListCell *workerCell = NULL;
bool waitNotifications = false;
MultiConnection *connection = NULL;
foreach(workerCell, workerList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerCell);
/* if already has metadata, no need to do it again */
if (workerNode->hasMetadata && !workerNode->metadataSynced)
{
waitNotifications = true;
break;
}
}
/*
* If all the metadata nodes have already been synced, we should not wait.
* That's primarily because the maintenance deamon might have already sent
* the notification and we'd wait unnecessarily here. Worse, the test outputs
* might be inconsistent across executions due to the warning.
*/
if (!waitNotifications)
{
PG_RETURN_VOID();
}
connection = GetNodeConnection(FORCE_NEW_CONNECTION,
"localhost", PostPortNumber);
ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL); ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL);
waitResult = WaitLatchOrSocket(NULL, WL_SOCKET_READABLE | WL_TIMEOUT, waitResult = WaitLatchOrSocket(NULL, WL_SOCKET_READABLE | WL_TIMEOUT,

View File

@ -26,6 +26,7 @@
#include "distributed/maintenanced.h" #include "distributed/maintenanced.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
@ -475,6 +476,16 @@ ActivateNode(char *nodeName, int nodePort)
EnsureNoModificationsHaveBeenDone(); EnsureNoModificationsHaveBeenDone();
ReplicateAllDependenciesToNode(nodeName, nodePort); ReplicateAllDependenciesToNode(nodeName, nodePort);
ReplicateAllReferenceTablesToNode(nodeName, nodePort); ReplicateAllReferenceTablesToNode(nodeName, nodePort);
/*
* Let the maintanince deamon do the hard work of syncing the metadata. We prefer
* this because otherwise node activation might fail withing transaction blocks.
*/
if (ClusterHasDistributedFunctionWithDistArgument())
{
MarkNodeHasMetadata(nodeName, nodePort, true);
TriggerMetadataSync(MyDatabaseId);
}
} }
return workerNode->nodeId; return workerNode->nodeId;
@ -1073,6 +1084,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
{ {
List *workerNodeList = list_make1(workerNode); List *workerNodeList = list_make1(workerNode);
char *nodeInsertCommand = NodeListInsertCommand(workerNodeList); char *nodeInsertCommand = NodeListInsertCommand(workerNodeList);
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand); SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand);
} }

View File

@ -18,6 +18,7 @@
extern bool ObjectExists(const ObjectAddress *address); extern bool ObjectExists(const ObjectAddress *address);
extern bool IsObjectDistributed(const ObjectAddress *address); extern bool IsObjectDistributed(const ObjectAddress *address);
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address); extern void UnmarkObjectDistributed(const ObjectAddress *address);

View File

@ -28,13 +28,13 @@ typedef struct FormData_pg_dist_object
Oid objid; /* object id of the distributed object */ Oid objid; /* object id of the distributed object */
int32 objsubid; /* object sub id of the distributed object, eg. attnum */ int32 objsubid; /* object sub id of the distributed object, eg. attnum */
uint32 distribution_argument_index; /* only valid for distributed functions/procedures */
uint32 colocationid; /* only valid for distributed functions/procedures */
#ifdef CATALOG_VARLEN /* variable-length fields start here */ #ifdef CATALOG_VARLEN /* variable-length fields start here */
text type; text type;
text[] object_names; text[] object_names;
text[] object_arguments; text[] object_arguments;
uint32 distribution_argument_index; /* only valid for distributed functions/procedures */
uint32 colocationid; /* only valid for distributed functions/procedures */
#endif #endif
} FormData_pg_dist_object; } FormData_pg_dist_object;

View File

@ -21,6 +21,7 @@ extern int MetadataSyncInterval;
extern int MetadataSyncRetryInterval; extern int MetadataSyncRetryInterval;
/* Functions declarations for metadata syncing */ /* Functions declarations for metadata syncing */
extern void StartMetadatSyncToNode(char *nodeNameString, int32 nodePort);
extern bool ClusterHasKnownMetadataWorkers(void); extern bool ClusterHasKnownMetadataWorkers(void);
extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadata(Oid relationId);
extern List * MetadataCreateCommands(void); extern List * MetadataCreateCommands(void);
@ -40,6 +41,7 @@ extern char * CreateSchemaDDLCommand(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, int32 groupId); uint64 shardLength, int32 groupId);
extern void CreateTableMetadataOnWorkers(Oid relationId); extern void CreateTableMetadataOnWorkers(Oid relationId);
extern void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata);
extern void MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced); extern void MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced);
extern bool SyncMetadataToNodes(void); extern bool SyncMetadataToNodes(void);
extern bool SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort, extern bool SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort,

View File

@ -18,19 +18,6 @@ CREATE FUNCTION add(integer, integer) RETURNS integer
LANGUAGE SQL LANGUAGE SQL
IMMUTABLE IMMUTABLE
RETURNS NULL ON NULL INPUT; RETURNS NULL ON NULL INPUT;
SELECT create_distributed_function('add(int,int)', '$1');
create_distributed_function
-----------------------------
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
(2 rows)
-- Test some combination of functions without ddl propagation -- Test some combination of functions without ddl propagation
-- This will prevent the workers from having those types created. They are -- This will prevent the workers from having those types created. They are
-- created just-in-time on function distribution -- created just-in-time on function distribution
@ -39,19 +26,6 @@ CREATE TYPE dup_result AS (f1 int, f2 text);
CREATE FUNCTION dup(int) RETURNS dup_result CREATE FUNCTION dup(int) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL; LANGUAGE SQL;
SELECT create_distributed_function('dup(int)', '$1');
create_distributed_function
-----------------------------
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+-------------------
localhost | 57637 | t | (42,"42 is text")
localhost | 57638 | t | (42,"42 is text")
(2 rows)
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
AS 'select $1 + $2;' AS 'select $1 + $2;'
LANGUAGE SQL LANGUAGE SQL
@ -67,6 +41,71 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
LANGUAGE SQL LANGUAGE SQL
IMMUTABLE IMMUTABLE
RETURNS NULL ON NULL INPUT; RETURNS NULL ON NULL INPUT;
-- make sure that none of the active and primary nodes hasmetadata
-- at the start of the test
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
bool_or
---------
f
(1 row)
-- if not paremeters are supplied, we'd see that function doesn't have
-- distribution_argument_index and colocationid
SELECT create_distributed_function('add_mixed_param_names(int, int)');
create_distributed_function
-----------------------------
(1 row)
SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object
WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure;
?column? | ?column?
----------+----------
t | t
(1 row)
-- also show that we can use the function
SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
(2 rows)
-- make sure that none of the active and primary nodes hasmetadata
-- since the function doesn't have a parameter
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
bool_or
---------
f
(1 row)
SELECT create_distributed_function('dup(int)', '$1');
create_distributed_function
-----------------------------
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+-------------------
localhost | 57637 | t | (42,"42 is text")
localhost | 57638 | t | (42,"42 is text")
(2 rows)
SELECT create_distributed_function('add(int,int)', '$1');
create_distributed_function
-----------------------------
(1 row)
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
(2 rows)
-- postgres doesn't accept parameter names in the regprocedure input -- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
ERROR: syntax error at or near "int" ERROR: syntax error at or near "int"
@ -111,6 +150,31 @@ HINT: To use the default value, set colocate_with option to "default"
SELECT create_distributed_function('add_with_param_names(int, int)', ''); SELECT create_distributed_function('add_with_param_names(int, int)', '');
ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
-- The first distributed function syncs the metadata to nodes
-- and metadata syncing is not supported within transaction blocks
BEGIN;
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
create_distributed_function
-----------------------------
(1 row)
ROLLBACK;
-- make sure that none of the nodes have the function because we've rollbacked
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$);
run_command_on_workers
------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
-- make sure that none of the active and primary nodes hasmetadata
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
bool_or
---------
t
(1 row)
-- valid distribution with distribution_arg_name -- valid distribution with distribution_arg_name
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
create_distributed_function create_distributed_function
@ -118,6 +182,21 @@ SELECT create_distributed_function('add_with_param_names(int, int)', distributio
(1 row) (1 row)
-- make sure that the primary nodes are now metadata synced
select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
bool_and
----------
t
(1 row)
-- make sure that both of the nodes have the function because we've succeeded
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$);
run_command_on_workers
------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
-- valid distribution with distribution_arg_name -- case insensitive -- valid distribution with distribution_arg_name -- case insensitive
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1'); SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1');
create_distributed_function create_distributed_function
@ -218,39 +297,44 @@ WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass
t t
(1 row) (1 row)
-- if not paremeters are supplied, we'd see that function doesn't have -- clear objects
-- distribution_argument_index and colocationid SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
SELECT create_distributed_function('add_mixed_param_names(int, int)'); stop_metadata_sync_to_node
create_distributed_function ----------------------------
-----------------------------
(1 row)
SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object
WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure;
?column? | ?column?
----------+----------
t | t
(1 row)
-- also show that we can use the function
SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2;
nodename | nodeport | success | result
-----------+----------+---------+--------
localhost | 57637 | t | 5
localhost | 57638 | t | 5
(2 rows) (2 rows)
-- clear objects
SET client_min_messages TO error; -- suppress cascading objects dropping SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$); -- This is hacky, but we should clean-up the resources as below
run_command_on_workers \c - - - :worker_1_port
----------------------------------- SET client_min_messages TO error; -- suppress cascading objects dropping
(localhost,57637,t,"DROP SCHEMA") UPDATE pg_dist_local_group SET groupid = 0;
(localhost,57638,t,"DROP SCHEMA") SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
(2 rows) worker_drop_distributed_table
-------------------------------
(3 rows)
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
\c - - - :worker_2_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
worker_drop_distributed_table
-------------------------------
(3 rows)
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
\c - - - :master_port
DROP USER functionuser; DROP USER functionuser;
SELECT run_command_on_workers($$DROP USER functionuser;$$); SELECT run_command_on_workers($$DROP USER functionuser;$$);
run_command_on_workers run_command_on_workers

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
281 280 f 282 281 f
transactionnumberwaitingtransactionnumbers transactionnumberwaitingtransactionnumbers
280 281
281 280 282 281
step s1-abort: step s1-abort:
ABORT; ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
285 284 f 286 285 f
286 284 f 287 285 f
286 285 t 287 286 t
transactionnumberwaitingtransactionnumbers transactionnumberwaitingtransactionnumbers
284 285
285 284 286 285
286 284,285 287 285,286
step s1-abort: step s1-abort:
ABORT; ABORT;

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
282 281 f 283 282 f
transactionnumberwaitingtransactionnumbers transactionnumberwaitingtransactionnumbers
281 282
282 281 283 282
step s1-abort: step s1-abort:
ABORT; ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
286 285 f 287 286 f
287 285 f 288 286 f
287 286 t 288 287 t
transactionnumberwaitingtransactionnumbers transactionnumberwaitingtransactionnumbers
285 286
286 285 287 286
287 285,286 288 286,287
step s1-abort: step s1-abort:
ABORT; ABORT;

View File

@ -1739,7 +1739,7 @@ master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-distribute-function s1-commit s2-print-distributed-objects starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-distribute-function s1-commit s2-begin s2-commit s3-wait-for-metadata-sync s1-print-distributed-objects
?column? ?column?
1 1
@ -1815,7 +1815,21 @@ step s2-distribute-function: <... completed>
create_distributed_function create_distributed_function
step s2-print-distributed-objects: step s2-begin:
BEGIN;
step s2-commit:
COMMIT;
step s3-wait-for-metadata-sync:
SELECT public.wait_until_metadata_sync(5000);
wait_until_metadata_sync
step s1-print-distributed-objects:
SELECT 1 FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects -- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1; SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1;
@ -1831,6 +1845,11 @@ step s2-print-distributed-objects:
SELECT count(*) FROM pg_proc WHERE proname='add'; SELECT count(*) FROM pg_proc WHERE proname='add';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add';$$); SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add';$$);
SELECT master_remove_node('localhost', 57638);
?column?
1
pg_identify_object_as_address pg_identify_object_as_address
(function,"{public,add}","{integer,integer}") (function,"{public,add}","{integer,integer}")
@ -1858,9 +1877,12 @@ run_command_on_workers
master_remove_node master_remove_node
master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s2-public-schema s2-distribute-function s1-add-worker s1-commit s2-print-distributed-objects starting permutation: s1-print-distributed-objects s1-begin s2-public-schema s2-distribute-function s2-begin s2-commit s3-wait-for-metadata-sync s1-add-worker s1-commit s3-wait-for-metadata-sync s2-print-distributed-objects
?column? ?column?
1 1
@ -1926,6 +1948,18 @@ step s2-distribute-function:
create_distributed_function create_distributed_function
step s2-begin:
BEGIN;
step s2-commit:
COMMIT;
step s3-wait-for-metadata-sync:
SELECT public.wait_until_metadata_sync(5000);
wait_until_metadata_sync
step s1-add-worker: step s1-add-worker:
SELECT 1 FROM master_add_node('localhost', 57638); SELECT 1 FROM master_add_node('localhost', 57638);
@ -1935,6 +1969,12 @@ step s1-add-worker:
step s1-commit: step s1-commit:
COMMIT; COMMIT;
step s3-wait-for-metadata-sync:
SELECT public.wait_until_metadata_sync(5000);
wait_until_metadata_sync
step s2-print-distributed-objects: step s2-print-distributed-objects:
-- print an overview of all distributed objects -- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1; SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1;
@ -1980,7 +2020,7 @@ master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s2-begin s2-create-schema s2-distribute-function s1-add-worker s2-commit s1-commit s2-print-distributed-objects starting permutation: s1-print-distributed-objects s2-begin s2-create-schema s2-distribute-function s2-commit s3-wait-for-metadata-sync s1-begin s1-add-worker s1-commit s3-wait-for-metadata-sync s2-print-distributed-objects
?column? ?column?
1 1
@ -2033,9 +2073,6 @@ run_command_on_workers
master_remove_node master_remove_node
step s1-begin:
BEGIN;
step s2-begin: step s2-begin:
BEGIN; BEGIN;
@ -2050,19 +2087,33 @@ step s2-distribute-function:
create_distributed_function create_distributed_function
step s1-add-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit: step s2-commit:
COMMIT; COMMIT;
step s1-add-worker: <... completed> step s3-wait-for-metadata-sync:
SELECT public.wait_until_metadata_sync(5000);
wait_until_metadata_sync
step s1-begin:
BEGIN;
step s1-add-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
?column? ?column?
1 1
step s1-commit: step s1-commit:
COMMIT; COMMIT;
step s3-wait-for-metadata-sync:
SELECT public.wait_until_metadata_sync(5000);
wait_until_metadata_sync
step s2-print-distributed-objects: step s2-print-distributed-objects:
-- print an overview of all distributed objects -- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1; SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object ORDER BY 1;

View File

@ -376,6 +376,8 @@ if($isolationtester)
push(@pgOptions, '-c', "citus.log_distributed_deadlock_detection=on"); push(@pgOptions, '-c', "citus.log_distributed_deadlock_detection=on");
push(@pgOptions, '-c', "citus.distributed_deadlock_detection_factor=-1"); push(@pgOptions, '-c', "citus.distributed_deadlock_detection_factor=-1");
push(@pgOptions, '-c', "citus.shard_count=4"); push(@pgOptions, '-c', "citus.shard_count=4");
push(@pgOptions, '-c', "citus.metadata_sync_interval=1000");
push(@pgOptions, '-c', "citus.metadata_sync_retry_interval=100");
} }
# Add externally added options last, so they overwrite the default ones above # Add externally added options last, so they overwrite the default ones above

View File

@ -2,13 +2,21 @@
# add single one of the nodes for the purpose of the test # add single one of the nodes for the purpose of the test
setup setup
{ {
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER)
RETURNS void
LANGUAGE C STRICT VOLATILE
AS 'citus', $$wait_until_metadata_sync$$;
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
SELECT 1 FROM master_add_node('localhost', 57637); SELECT 1 FROM master_add_node('localhost', 57637);
} }
# ensure that both nodes exists for the remaining of the isolation tests # ensure that both nodes exists for the remaining of the isolation tests
teardown teardown
{ {
SELECT 1 FROM master_add_node('localhost', 57637);
SELECT 1 FROM master_add_node('localhost', 57638);
-- schema drops are not cascaded -- schema drops are not cascaded
SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS myschema CASCADE;$$); SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS myschema CASCADE;$$);
DROP SCHEMA IF EXISTS myschema CASCADE; DROP SCHEMA IF EXISTS myschema CASCADE;
@ -36,7 +44,7 @@ teardown
-- similarly drop the function in the workers manually -- similarly drop the function in the workers manually
SELECT run_command_on_workers($$DROP FUNCTION IF EXISTS add(INT,INT) CASCADE;$$); SELECT run_command_on_workers($$DROP FUNCTION IF EXISTS add(INT,INT) CASCADE;$$);
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
} }
session "s1" session "s1"
@ -150,6 +158,7 @@ step "s2-print-distributed-objects"
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add';$$); SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add';$$);
} }
session "s3" session "s3"
step "s3-public-schema" step "s3-public-schema"
@ -180,6 +189,17 @@ step "s3-commit"
COMMIT; COMMIT;
} }
step "s3-wait-for-metadata-sync"
{
SELECT public.wait_until_metadata_sync(5000);
}
step "s3-listen-channel"
{
LISTEN metadata_sync;
}
session "s4" session "s4"
step "s4-public-schema" step "s4-public-schema"
@ -216,6 +236,7 @@ step "s4-commit"
COMMIT; COMMIT;
} }
# schema only tests # schema only tests
permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-print-distributed-objects" permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-commit" "s2-print-distributed-objects" permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-commit" "s2-print-distributed-objects"
@ -236,6 +257,14 @@ permutation "s1-print-distributed-objects" "s1-begin" "s2-public-schema" "s2-cre
permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-schema" "s2-create-type" "s2-create-table-with-type" "s1-add-worker" "s2-commit" "s1-commit" "s2-print-distributed-objects" permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-schema" "s2-create-type" "s2-create-table-with-type" "s1-add-worker" "s2-commit" "s1-commit" "s2-print-distributed-objects"
# distributed function tests # distributed function tests
permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-distribute-function" "s1-commit" "s2-print-distributed-objects" # isolation tests are not very simple psql, so trigger NOTIFY reliably for
permutation "s1-print-distributed-objects" "s1-begin" "s2-public-schema" "s2-distribute-function" "s1-add-worker" "s1-commit" "s2-print-distributed-objects" # s3-wait-for-metadata-sync step, we do "s2-begin" followed directly by
permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-schema" "s2-distribute-function" "s1-add-worker" "s2-commit" "s1-commit" "s2-print-distributed-objects" # "s2-commit", because "COMMIT" syncs the messages
permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-distribute-function" "s1-commit" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s1-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s2-public-schema" "s2-distribute-function" "s2-begin" "s2-commit" "s3-wait-for-metadata-sync" "s1-add-worker" "s1-commit" "s3-wait-for-metadata-sync" "s2-print-distributed-objects"
# we cannot run the following operations concurrently
# the problem is that NOTIFY event doesn't (reliably) happen before COMMIT
# so we have to commit s2 before s1 starts
permutation "s1-print-distributed-objects" "s2-begin" "s2-create-schema" "s2-distribute-function" "s2-commit" "s3-wait-for-metadata-sync" "s1-begin" "s1-add-worker" "s1-commit" "s3-wait-for-metadata-sync" "s2-print-distributed-objects"

View File

@ -8,16 +8,12 @@ CREATE SCHEMA function_tests AUTHORIZATION functionuser;
SET search_path TO function_tests; SET search_path TO function_tests;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
-- Create and distribute a simple function -- Create and distribute a simple function
CREATE FUNCTION add(integer, integer) RETURNS integer CREATE FUNCTION add(integer, integer) RETURNS integer
AS 'select $1 + $2;' AS 'select $1 + $2;'
LANGUAGE SQL LANGUAGE SQL
IMMUTABLE IMMUTABLE
RETURNS NULL ON NULL INPUT; RETURNS NULL ON NULL INPUT;
SELECT create_distributed_function('add(int,int)', '$1');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
-- Test some combination of functions without ddl propagation -- Test some combination of functions without ddl propagation
-- This will prevent the workers from having those types created. They are -- This will prevent the workers from having those types created. They are
@ -30,9 +26,6 @@ CREATE FUNCTION dup(int) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$ AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL; LANGUAGE SQL;
SELECT create_distributed_function('dup(int)', '$1');
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
AS 'select $1 + $2;' AS 'select $1 + $2;'
LANGUAGE SQL LANGUAGE SQL
@ -51,6 +44,29 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
IMMUTABLE IMMUTABLE
RETURNS NULL ON NULL INPUT; RETURNS NULL ON NULL INPUT;
-- make sure that none of the active and primary nodes hasmetadata
-- at the start of the test
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
-- if not paremeters are supplied, we'd see that function doesn't have
-- distribution_argument_index and colocationid
SELECT create_distributed_function('add_mixed_param_names(int, int)');
SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object
WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure;
-- also show that we can use the function
SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2;
-- make sure that none of the active and primary nodes hasmetadata
-- since the function doesn't have a parameter
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
SELECT create_distributed_function('dup(int)', '$1');
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
SELECT create_distributed_function('add(int,int)', '$1');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
-- postgres doesn't accept parameter names in the regprocedure input -- postgres doesn't accept parameter names in the regprocedure input
SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1'); SELECT create_distributed_function('add_with_param_names(val1 int, int)', 'val1');
@ -77,9 +93,27 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$1', NULL)
-- empty string distribution_arg_index -- empty string distribution_arg_index
SELECT create_distributed_function('add_with_param_names(int, int)', ''); SELECT create_distributed_function('add_with_param_names(int, int)', '');
-- The first distributed function syncs the metadata to nodes
-- and metadata syncing is not supported within transaction blocks
BEGIN;
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
ROLLBACK;
-- make sure that none of the nodes have the function because we've rollbacked
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$);
-- make sure that none of the active and primary nodes hasmetadata
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
-- valid distribution with distribution_arg_name -- valid distribution with distribution_arg_name
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1'); SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='val1');
-- make sure that the primary nodes are now metadata synced
select bool_and(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
-- make sure that both of the nodes have the function because we've succeeded
SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='add_with_param_names';$$);
-- valid distribution with distribution_arg_name -- case insensitive -- valid distribution with distribution_arg_name -- case insensitive
SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1'); SELECT create_distributed_function('add_with_param_names(int, int)', distribution_arg_name:='VaL1');
@ -132,18 +166,29 @@ FROM pg_dist_partition, citus.pg_dist_object as objects
WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND
objects.objid = 'add_with_param_names(int, int)'::regprocedure; objects.objid = 'add_with_param_names(int, int)'::regprocedure;
-- if not paremeters are supplied, we'd see that function doesn't have
-- distribution_argument_index and colocationid
SELECT create_distributed_function('add_mixed_param_names(int, int)');
SELECT distribution_argument_index is NULL, colocationid is NULL from citus.pg_dist_object
WHERE objid = 'add_mixed_param_names(int, int)'::regprocedure;
-- also show that we can use the function
SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_names(2,3);') ORDER BY 1,2;
-- clear objects -- clear objects
SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
SET client_min_messages TO error; -- suppress cascading objects dropping SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA function_tests CASCADE;$$);
-- This is hacky, but we should clean-up the resources as below
\c - - - :worker_1_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
\c - - - :worker_2_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
\c - - - :master_port
DROP USER functionuser; DROP USER functionuser;
SELECT run_command_on_workers($$DROP USER functionuser;$$); SELECT run_command_on_workers($$DROP USER functionuser;$$);

View File

@ -110,4 +110,4 @@ SELECT modify_fast_path_plpsql(6,6);
RESET client_min_messages; RESET client_min_messages;
DROP SCHEMA fast_path_router_modify CASCADE; DROP SCHEMA fast_path_router_modify CASCADE;