Allow create_distributed_function() on a function owned by an extension

Implement #5649
Allow create_distributed_function() on functions owned by extensions

1) Only update pg_dist_object, and do not propagate CREATE FUNCTION.
2) Ensure corresponding extension is in pg_dist_object.
3) Verify if dependencies exist on the function they should resolve to the extension.
4) Impact on node-scaling: We build a list of ddl commands based on all objects in
   pg_dist_object. We need to omit the ddl's for the extension-function, as it
   will get propagated by the virtue of the extension creation.
5) Extra checks for functions coming from extensions, to not propagate changes
   via ddl commands, even though the function is marked as distributed in pg_dist_object
pull/5697/head
Teja Mupparti 2022-01-26 22:01:02 -08:00 committed by Teja Mupparti
parent 474e36a405
commit 1e3c8e34c0
6 changed files with 593 additions and 44 deletions

View File

@ -396,6 +396,15 @@ ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort)
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies)
{
if (IsObjectAddressOwnedByExtension(dependency, NULL))
{
/*
* we expect extension-owned objects to be created as a result
* of the extension being created.
*/
continue;
}
ddlCommands = list_concat(ddlCommands,
GetDependencyCreateDDLCommands(dependency));
}

View File

@ -85,7 +85,6 @@ static ObjectAddress FunctionToObjectAddress(ObjectType objectType,
ObjectWithArgs *objectWithArgs,
bool missing_ok);
static void ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt);
static void ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress);
static char * quote_qualified_func_name(Oid funcOid);
static void DistributeFunctionWithDistributionArgument(RegProcedure funcOid,
char *distributionArgumentName,
@ -101,6 +100,9 @@ static void DistributeFunctionColocatedWithDistributedTable(RegProcedure funcOid
static void DistributeFunctionColocatedWithReferenceTable(const
ObjectAddress *functionAddress);
static void EnsureExtensionFunctionCanBeDistributed(const ObjectAddress functionAddress,
const ObjectAddress extensionAddress,
char *distributionArgumentName);
PG_FUNCTION_INFO_V1(create_distributed_function);
@ -127,6 +129,7 @@ create_distributed_function(PG_FUNCTION_ARGS)
char *colocateWithTableName = NULL;
bool *forceDelegationAddress = NULL;
bool forceDelegation = false;
ObjectAddress extensionAddress = { 0 };
/* if called on NULL input, error out */
if (funcOid == InvalidOid)
@ -187,22 +190,35 @@ create_distributed_function(PG_FUNCTION_ARGS)
EnsureFunctionOwner(funcOid);
ObjectAddressSet(functionAddress, ProcedureRelationId, funcOid);
ErrorIfFunctionDependsOnExtension(&functionAddress);
/*
* when we allow propagation within a transaction block we should make sure to only
* allow this in sequential mode
* If the function is owned by an extension, only update the
* pg_dist_object, and not propagate the CREATE FUNCTION. Function
* will be created by the virtue of the extension creation.
*/
EnsureSequentialModeForFunctionDDL();
if (IsObjectAddressOwnedByExtension(&functionAddress, &extensionAddress))
{
EnsureExtensionFunctionCanBeDistributed(functionAddress, extensionAddress,
distributionArgumentName);
}
else
{
/*
* when we allow propagation within a transaction block we should make sure
* to only allow this in sequential mode.
*/
EnsureSequentialModeForFunctionDDL();
EnsureDependenciesExistOnAllNodes(&functionAddress);
EnsureDependenciesExistOnAllNodes(&functionAddress);
const char *createFunctionSQL = GetFunctionDDLCommand(funcOid, true);
const char *alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid);
initStringInfo(&ddlCommand);
appendStringInfo(&ddlCommand, "%s;%s;%s;%s", DISABLE_METADATA_SYNC,
createFunctionSQL, alterFunctionOwnerSQL, ENABLE_METADATA_SYNC);
SendCommandToWorkersAsUser(NON_COORDINATOR_NODES, CurrentUserName(), ddlCommand.data);
const char *createFunctionSQL = GetFunctionDDLCommand(funcOid, true);
const char *alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid);
initStringInfo(&ddlCommand);
appendStringInfo(&ddlCommand, "%s;%s;%s;%s", DISABLE_METADATA_SYNC,
createFunctionSQL, alterFunctionOwnerSQL, ENABLE_METADATA_SYNC);
SendCommandToWorkersAsUser(NON_COORDINATOR_NODES, CurrentUserName(),
ddlCommand.data);
}
MarkObjectDistributed(&functionAddress);
@ -2013,33 +2029,6 @@ ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt)
}
/*
* ErrorIfFunctionDependsOnExtension functions depending on extensions should raise an
* error informing the user why they can't be distributed.
*/
static void
ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress)
{
/* captures the extension address during lookup */
ObjectAddress extensionAddress = { 0 };
if (IsObjectAddressOwnedByExtension(functionAddress, &extensionAddress))
{
char *functionName =
getObjectIdentity_compat(functionAddress, /* missingOk: */ false);
char *extensionName =
getObjectIdentity_compat(&extensionAddress, /* missingOk: */ false);
ereport(ERROR, (errmsg("unable to create a distributed function from functions "
"owned by an extension"),
errdetail("Function \"%s\" has a dependency on extension \"%s\". "
"Functions depending on an extension cannot be "
"distributed. Create the function by creating the "
"extension on the workers.", functionName,
extensionName)));
}
}
/* returns the quoted qualified name of a given function oid */
static char *
quote_qualified_func_name(Oid funcOid)
@ -2048,3 +2037,54 @@ quote_qualified_func_name(Oid funcOid)
get_namespace_name(get_func_namespace(funcOid)),
get_func_name(funcOid));
}
/*
* EnsureExtensionFuncionCanBeCreated checks if the dependent objects
* (including extension) exists on all nodes, if not, creates them. In
* addition, it also checks if distribution argument is passed.
*/
static void
EnsureExtensionFunctionCanBeDistributed(const ObjectAddress functionAddress,
const ObjectAddress extensionAddress,
char *distributionArgumentName)
{
if (CitusExtensionObject(&extensionAddress))
{
/*
* Citus extension is a special case. It's the extension that
* provides the 'distributed capabilities' in the first place.
* Trying to distribute it's own function(s) doesn't make sense.
*/
ereport(ERROR, (errmsg("Citus extension functions(%s) "
"cannot be distributed.",
get_func_name(functionAddress.objectId))));
}
/*
* Distributing functions from extensions has the most benefit when
* distribution argument is specified.
*/
if (distributionArgumentName == NULL)
{
ereport(ERROR, (errmsg("Extension functions(%s) "
"without distribution argument "
"are not supported.",
get_func_name(functionAddress.objectId))));
}
/*
* Ensure corresponding extension is in pg_dist_object.
* Functions owned by an extension are depending internally on that extension,
* hence EnsureDependenciesExistOnAllNodes() creates the extension, which in
* turn creates the function, and thus we don't have to create it ourself like
* we do for non-extension functions.
*/
ereport(DEBUG1, (errmsg("Extension(%s) owning the "
"function(%s) is not distributed, "
"attempting to propogate the extension",
get_extension_name(extensionAddress.objectId),
get_func_name(functionAddress.objectId))));
EnsureDependenciesExistOnAllNodes(&functionAddress);
}

View File

@ -469,8 +469,7 @@ ALTER FUNCTION eq(macaddr,macaddr) DEPENDS ON EXTENSION citus;
ERROR: distrtibuted functions are not allowed to depend on an extension
DETAIL: Function "function_tests.eq(pg_catalog.macaddr,pg_catalog.macaddr)" is already distributed. Functions from extensions are expected to be created on the workers by the extension they depend on.
SELECT create_distributed_function('pg_catalog.citus_drop_trigger()');
ERROR: unable to create a distributed function from functions owned by an extension
DETAIL: Function "pg_catalog.citus_drop_trigger()" has a dependency on extension "citus". Functions depending on an extension cannot be distributed. Create the function by creating the extension on the workers.
ERROR: Citus extension functions(citus_drop_trigger) cannot be distributed.
DROP FUNCTION eq(macaddr,macaddr);
-- call should fail as function should have been dropped
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('0123456789ab','ba9876543210');$$) ORDER BY 1,2;

View File

@ -398,10 +398,34 @@ SELECT 1 from master_remove_node('localhost', :worker_2_port);
1
(1 row)
-- Test extension function incorrect distribution argument
CREATE TABLE test_extension_function(col varchar);
CREATE EXTENSION seg;
-- Missing distribution argument
SELECT create_distributed_function('seg_in(cstring)');
ERROR: Extension functions(seg_in) without distribution argument are not supported.
-- Missing colocation argument
SELECT create_distributed_function('seg_in(cstring)', '$1');
ERROR: cannot distribute the function "seg_in" since there is no table to colocate with
HINT: Provide a distributed table via "colocate_with" option to create_distributed_function()
-- Incorrect distribution argument
SELECT create_distributed_function('seg_in(cstring)', '$2', colocate_with:='test_extension_function');
ERROR: cannot distribute the function "seg_in" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
-- Colocated table is not distributed
SELECT create_distributed_function('seg_in(cstring)', '$1', 'test_extension_function');
ERROR: relation test_extension_function is not distributed
DROP EXTENSION seg;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test_extension_function', 'col', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- now, create a type that depends on another type, which
-- finally depends on an extension
BEGIN;
SET citus.shard_replication_factor TO 1;
CREATE EXTENSION seg;
CREATE EXTENSION isn;
CREATE TYPE test_type AS (a int, b seg);
@ -419,9 +443,39 @@ BEGIN;
create_reference_table
---------------------------------------------------------------------
(1 row)
-- Distribute an extension-function
SELECT create_distributed_function('seg_in(cstring)', '$1', 'test_extension_function');
create_distributed_function
---------------------------------------------------------------------
(1 row)
COMMIT;
-- Check the pg_dist_object
SELECT pg_proc.proname as DistributedFunction
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
distributedfunction
---------------------------------------------------------------------
seg_in
(1 row)
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(1 row)
-- add the node back
SELECT 1 from master_add_node('localhost', :worker_2_port);
?column?
@ -443,5 +497,145 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname
(localhost,57638,t,2)
(2 rows)
-- Check the pg_dist_object on the both nodes
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
DROP EXTENSION seg CASCADE;
-- Recheck the pg_dist_object
SELECT pg_proc.proname as DistributedFunction
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
distributedfunction
---------------------------------------------------------------------
(0 rows)
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
-- Distribute an extension-function where extension is not in pg_dist_object
SET citus.enable_ddl_propagation TO false;
CREATE EXTENSION seg;
SET citus.enable_ddl_propagation TO true;
-- Check the extension in pg_dist_object
SELECT count(*) FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_extension WHERE extname = 'seg');
count
---------------------------------------------------------------------
0
(1 row)
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
SELECT create_distributed_function('seg_in(cstring)', '$1', 'test_extension_function');
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- Recheck the extension in pg_dist_object
SELECT count(*) FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_extension WHERE extname = 'seg');
count
---------------------------------------------------------------------
1
(1 row)
SELECT pg_proc.proname as DistributedFunction
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
distributedfunction
---------------------------------------------------------------------
seg_in
(1 row)
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
DROP EXTENSION seg;
DROP TABLE test_extension_function;
-- Test extension function altering distribution argument
BEGIN;
SET citus.shard_replication_factor = 1;
CREATE TABLE test_extension_function(col1 float8[], col2 float8[]);
SELECT create_distributed_table('test_extension_function', 'col1', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE EXTENSION cube;
SELECT create_distributed_function('cube(float8[], float8[])', '$1', 'test_extension_function');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT distribution_argument_index FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_proc'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
distribution_argument_index
---------------------------------------------------------------------
0
(1 row)
SELECT create_distributed_function('cube(float8[], float8[])', '$2', 'test_extension_function');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT distribution_argument_index FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_proc'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
distribution_argument_index
---------------------------------------------------------------------
1
(1 row)
ROLLBACK;
-- drop the schema and all the objects
DROP SCHEMA "extension'test" CASCADE;

View File

@ -397,10 +397,34 @@ SELECT 1 from master_remove_node('localhost', :worker_2_port);
1
(1 row)
-- Test extension function incorrect distribution argument
CREATE TABLE test_extension_function(col varchar);
CREATE EXTENSION seg;
-- Missing distribution argument
SELECT create_distributed_function('seg_in(cstring)');
ERROR: Extension functions(seg_in) without distribution argument are not supported.
-- Missing colocation argument
SELECT create_distributed_function('seg_in(cstring)', '$1');
ERROR: cannot distribute the function "seg_in" since there is no table to colocate with
HINT: Provide a distributed table via "colocate_with" option to create_distributed_function()
-- Incorrect distribution argument
SELECT create_distributed_function('seg_in(cstring)', '$2', colocate_with:='test_extension_function');
ERROR: cannot distribute the function "seg_in" since the distribution argument is not valid
HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function()
-- Colocated table is not distributed
SELECT create_distributed_function('seg_in(cstring)', '$1', 'test_extension_function');
ERROR: relation test_extension_function is not distributed
DROP EXTENSION seg;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test_extension_function', 'col', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- now, create a type that depends on another type, which
-- finally depends on an extension
BEGIN;
SET citus.shard_replication_factor TO 1;
CREATE EXTENSION seg;
CREATE EXTENSION isn;
CREATE TYPE test_type AS (a int, b seg);
@ -418,9 +442,39 @@ BEGIN;
create_reference_table
---------------------------------------------------------------------
(1 row)
-- Distribute an extension-function
SELECT create_distributed_function('seg_in(cstring)', '$1', 'test_extension_function');
create_distributed_function
---------------------------------------------------------------------
(1 row)
COMMIT;
-- Check the pg_dist_object
SELECT pg_proc.proname as DistributedFunction
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
distributedfunction
---------------------------------------------------------------------
seg_in
(1 row)
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(1 row)
-- add the node back
SELECT 1 from master_add_node('localhost', :worker_2_port);
?column?
@ -442,5 +496,145 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname
(localhost,57638,t,2)
(2 rows)
-- Check the pg_dist_object on the both nodes
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
DROP EXTENSION seg CASCADE;
-- Recheck the pg_dist_object
SELECT pg_proc.proname as DistributedFunction
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
distributedfunction
---------------------------------------------------------------------
(0 rows)
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
-- Distribute an extension-function where extension is not in pg_dist_object
SET citus.enable_ddl_propagation TO false;
CREATE EXTENSION seg;
SET citus.enable_ddl_propagation TO true;
-- Check the extension in pg_dist_object
SELECT count(*) FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_extension WHERE extname = 'seg');
count
---------------------------------------------------------------------
0
(1 row)
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
SELECT create_distributed_function('seg_in(cstring)', '$1', 'test_extension_function');
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- Recheck the extension in pg_dist_object
SELECT count(*) FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_extension WHERE extname = 'seg');
count
---------------------------------------------------------------------
1
(1 row)
SELECT pg_proc.proname as DistributedFunction
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
distributedfunction
---------------------------------------------------------------------
seg_in
(1 row)
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(2 rows)
DROP EXTENSION seg;
DROP TABLE test_extension_function;
-- Test extension function altering distribution argument
BEGIN;
SET citus.shard_replication_factor = 1;
CREATE TABLE test_extension_function(col1 float8[], col2 float8[]);
SELECT create_distributed_table('test_extension_function', 'col1', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE EXTENSION cube;
SELECT create_distributed_function('cube(float8[], float8[])', '$1', 'test_extension_function');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT distribution_argument_index FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_proc'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
distribution_argument_index
---------------------------------------------------------------------
0
(1 row)
SELECT create_distributed_function('cube(float8[], float8[])', '$2', 'test_extension_function');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SELECT distribution_argument_index FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_proc'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
distribution_argument_index
---------------------------------------------------------------------
1
(1 row)
ROLLBACK;
-- drop the schema and all the objects
DROP SCHEMA "extension'test" CASCADE;

View File

@ -227,10 +227,25 @@ SET search_path TO "extension'test";
-- remove the node, we'll add back again
SELECT 1 from master_remove_node('localhost', :worker_2_port);
-- Test extension function incorrect distribution argument
CREATE TABLE test_extension_function(col varchar);
CREATE EXTENSION seg;
-- Missing distribution argument
SELECT create_distributed_function('seg_in(cstring)');
-- Missing colocation argument
SELECT create_distributed_function('seg_in(cstring)', '$1');
-- Incorrect distribution argument
SELECT create_distributed_function('seg_in(cstring)', '$2', colocate_with:='test_extension_function');
-- Colocated table is not distributed
SELECT create_distributed_function('seg_in(cstring)', '$1', 'test_extension_function');
DROP EXTENSION seg;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test_extension_function', 'col', colocate_with := 'none');
-- now, create a type that depends on another type, which
-- finally depends on an extension
BEGIN;
SET citus.shard_replication_factor TO 1;
CREATE EXTENSION seg;
CREATE EXTENSION isn;
CREATE TYPE test_type AS (a int, b seg);
@ -243,8 +258,25 @@ BEGIN;
CREATE TABLE t3 (a int, b test_type_3);
SELECT create_reference_table('t3');
-- Distribute an extension-function
SELECT create_distributed_function('seg_in(cstring)', '$1', 'test_extension_function');
COMMIT;
-- Check the pg_dist_object
SELECT pg_proc.proname as DistributedFunction
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
-- add the node back
SELECT 1 from master_add_node('localhost', :worker_2_port);
@ -252,5 +284,86 @@ SELECT 1 from master_add_node('localhost', :worker_2_port);
SELECT count(*) FROM citus.pg_dist_object WHERE objid IN (SELECT oid FROM pg_extension WHERE extname IN ('seg', 'isn'));
SELECT run_command_on_workers($$SELECT count(*) FROM pg_extension WHERE extname IN ('seg', 'isn')$$);
-- Check the pg_dist_object on the both nodes
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
DROP EXTENSION seg CASCADE;
-- Recheck the pg_dist_object
SELECT pg_proc.proname as DistributedFunction
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
-- Distribute an extension-function where extension is not in pg_dist_object
SET citus.enable_ddl_propagation TO false;
CREATE EXTENSION seg;
SET citus.enable_ddl_propagation TO true;
-- Check the extension in pg_dist_object
SELECT count(*) FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_extension WHERE extname = 'seg');
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
SELECT create_distributed_function('seg_in(cstring)', '$1', 'test_extension_function');
-- Recheck the extension in pg_dist_object
SELECT count(*) FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_extension WHERE extname = 'seg');
SELECT pg_proc.proname as DistributedFunction
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
SELECT run_command_on_workers($$
SELECT count(*)
FROM citus.pg_dist_object, pg_proc
WHERE pg_proc.proname = 'seg_in' and
pg_proc.oid = citus.pg_dist_object.objid and
classid = 'pg_proc'::regclass;
$$);
DROP EXTENSION seg;
DROP TABLE test_extension_function;
-- Test extension function altering distribution argument
BEGIN;
SET citus.shard_replication_factor = 1;
CREATE TABLE test_extension_function(col1 float8[], col2 float8[]);
SELECT create_distributed_table('test_extension_function', 'col1', colocate_with := 'none');
CREATE EXTENSION cube;
SELECT create_distributed_function('cube(float8[], float8[])', '$1', 'test_extension_function');
SELECT distribution_argument_index FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_proc'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
SELECT create_distributed_function('cube(float8[], float8[])', '$2', 'test_extension_function');
SELECT distribution_argument_index FROM citus.pg_dist_object WHERE classid = 'pg_catalog.pg_proc'::pg_catalog.regclass AND
objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
ROLLBACK;
-- drop the schema and all the objects
DROP SCHEMA "extension'test" CASCADE;