Address reviews

velioglu/tmpfuncprop
Burak Velioglu 2022-02-15 00:48:18 +03:00
parent afe8ce50c3
commit 110aff95c8
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
2 changed files with 35 additions and 24 deletions

View File

@ -83,7 +83,8 @@ static void EnsureSequentialModeForFunctionDDL(void);
static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt);
static bool ShouldPropagateAlterFunction(const ObjectAddress *address);
static bool ShouldAddFunctionSignature(FunctionParameterMode mode);
static bool DependentRelationsOfFunctionCanBeDistributed(ObjectAddress *functionAddress);
static ObjectAddress * UndistributableRelationDependencyOfFunction(
ObjectAddress *functionAddress);
static ObjectAddress FunctionToObjectAddress(ObjectType objectType,
ObjectWithArgs *objectWithArgs,
bool missing_ok);
@ -1227,11 +1228,11 @@ ShouldPropagateCreateFunction(CreateFunctionStmt *stmt)
}
/*
* by not propagating in a transaction block we allow for parallelism to be used when
* this function will be used as a column in a table that will be created and distributed
* in this same transaction.
* If the create command is a part of a multi-statement transaction that is not in
* sequential mode, don't propagate. Instead we will rely on back filling.
*/
if (IsMultiStatementTransaction())
if (IsMultiStatementTransaction() &&
MultiShardConnectionType != SEQUENTIAL_CONNECTION)
{
return false;
}
@ -1330,21 +1331,29 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString)
}
ObjectAddress functionAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
if (!DependentRelationsOfFunctionCanBeDistributed(&functionAddress))
{
ereport(WARNING, (errmsg("Citus can't distribute functions having dependency on"
" non-distributed relations"),
errdetail("Function will be created only locally"),
errhint("To distribute function, distribute dependent relations"
" first")));
return NIL;
}
if (IsObjectAddressOwnedByExtension(&functionAddress, NULL))
{
return NIL;
}
ObjectAddress *undistributableDependency =
UndistributableRelationDependencyOfFunction(&functionAddress);
if (undistributableDependency != NULL)
{
RangeVar *functionRangeVar = makeRangeVarFromNameList(stmt->funcname);
char *functionName = functionRangeVar->relname;
char *dependentRelationName = get_rel_name(undistributableDependency->objectId);
ereport(WARNING, (errmsg("Citus can't distribute function %s having dependency on"
" non-distributed relation %s", functionName,
dependentRelationName),
errdetail("Function will be created only locally"),
errhint("To distribute function, distribute dependent relations"
" first. Then, re-create the function")));
return NIL;
}
EnsureDependenciesExistOnAllNodes(&functionAddress);
List *commands = list_make1(DISABLE_DDL_PROPAGATION);
@ -1357,11 +1366,12 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString)
/*
* DependentRelationsOfFunctionCanBeDistributed checks whether Citus can distribute
* dependent relations of the given function.
* UndistributableRelationDependencyOfFunction checks whether Citus can distribute
* dependent relations of the given function. If any non-distributable one found, it
* will be returned.
*/
static bool
DependentRelationsOfFunctionCanBeDistributed(ObjectAddress *functionAddress)
static ObjectAddress *
UndistributableRelationDependencyOfFunction(ObjectAddress *functionAddress)
{
Assert(getObjectClass(functionAddress) == OCLASS_PROC);
@ -1387,10 +1397,10 @@ DependentRelationsOfFunctionCanBeDistributed(ObjectAddress *functionAddress)
continue;
}
return false;
return dependency;
}
return true;
return NULL;
}

View File

@ -4,9 +4,9 @@ SET search_path TO 'function_propagation_schema';
-- Check whether supported dependencies can be distributed while propagating functions
-- Check types
BEGIN;
SET citus.enable_metadata_sync TO OFF;
CREATE TYPE function_prop_type AS (a int, b int);
COMMIT;
RESET citus.enable_metadata_sync;
CREATE OR REPLACE FUNCTION func_1(param_1 function_prop_type)
RETURNS int
@ -25,9 +25,9 @@ SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(clas
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.function_prop_type'::regtype::oid;$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_1'::regproc::oid;$$) ORDER BY 1,2;
BEGIN;
SET citus.enable_metadata_sync TO OFF;
CREATE TYPE function_prop_type_2 AS (a int, b int);
COMMIT;
RESET citus.enable_metadata_sync;
CREATE OR REPLACE FUNCTION func_2(param_1 int)
RETURNS function_prop_type_2
@ -43,6 +43,7 @@ SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dis
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.function_prop_type_2'::regtype::oid;$$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'function_propagation_schema.func_2'::regproc::oid;$$) ORDER BY 1,2;
-- Have a separate check for type created in transaction
BEGIN;
CREATE TYPE function_prop_type_3 AS (a int, b int);
COMMIT;