Distribute functions with CREATE FUNCTION command

velioglu/tmpfuncprop
Burak Velioglu 2022-02-09 13:46:26 +03:00
parent 6376eaf0e0
commit c4fddf1406
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
2 changed files with 56 additions and 39 deletions

View File

@ -25,6 +25,7 @@
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_aggregate.h"
#include "catalog/dependency.h"
#include "catalog/namespace.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
@ -38,6 +39,7 @@
#include "distributed/listutils.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata_utility.h"
#include "distributed/metadata/dependency.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata/pg_dist_object.h"
@ -744,7 +746,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
/*
* GetFunctionDDLCommand returns the complete "CREATE OR REPLACE FUNCTION ..." statement for
* the specified function followed by "ALTER FUNCTION .. SET OWNER ..".
* the specified function.
*
* useCreateOrReplace is ignored for non-aggregate functions.
*/
@ -1191,46 +1193,23 @@ EnsureSequentialModeForFunctionDDL(void)
/*
* ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION
* statement. We only propagate replace's of distributed functions to keep the function on
* the workers in sync with the one on the coordinator.
* statement.
*/
static bool
ShouldPropagateCreateFunction(CreateFunctionStmt *stmt)
{
if (creating_extension)
if (!ShouldPropagate())
{
/*
* extensions should be created separately on the workers, functions cascading
* from an extension should therefore not be propagated.
*/
return false;
}
if (!EnableMetadataSync)
{
/*
* we are configured to disable object propagation, should not propagate anything
*/
return false;
}
if (!stmt->replace)
{
/*
* Since we only care for a replace of distributed functions if the statement is
* not a replace we are going to ignore.
*/
return false;
}
/*
* Even though its a replace we should accept an non-existing function, it will just
* not be distributed
* by not propagating in a transaction block we allow for parallelism to be used when
* this function will be used as a column in a table that will be created and distributed
* in this same transaction.
*/
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, true);
if (!IsObjectDistributed(&address))
if (IsMultiStatementTransaction())
{
/* do not propagate alter function for non-distributed functions */
return false;
}
@ -1274,12 +1253,10 @@ ShouldPropagateAlterFunction(const ObjectAddress *address)
/*
* PreprocessCreateFunctionStmt is called during the planning phase for CREATE [OR REPLACE]
* FUNCTION. We primarily care for the replace variant of this statement to keep
* distributed functions in sync. We bail via a check on ShouldPropagateCreateFunction
* which checks for the OR REPLACE modifier.
* FUNCTION before it is created on the local node internally.
*
* Since we use pg_get_functiondef to get the ddl command we actually do not do any
* planning here, instead we defer the plan creation to the processing step.
* planning here, instead we defer the plan creation to the postprocessing step.
*
* Instead we do our basic housekeeping where we make sure we are on the coordinator and
* can propagate the function in sequential mode.
@ -1300,7 +1277,7 @@ PreprocessCreateFunctionStmt(Node *node, const char *queryString,
EnsureSequentialModeForFunctionDDL();
/*
* ddl jobs will be generated during the Processing phase as we need the function to
* ddl jobs will be generated during the postprocessing phase as we need the function to
* be updated in the catalog to get its sql representation
*/
return NIL;
@ -1311,6 +1288,10 @@ PreprocessCreateFunctionStmt(Node *node, const char *queryString,
* PostprocessCreateFunctionStmt actually creates the plan we need to execute for function
* propagation. This is the downside of using pg_get_functiondef to get the sql statement.
*
* If function depends on any non-distributed table or sequence, Citus can not distribute
* it. In order to not to prevent users from creating local functions on the coordinator
* WARNING message will be sent to the customer about the case instead of erroring out.
*
* Besides creating the plan we also make sure all (new) dependencies of the function are
* created on all nodes.
*/
@ -1325,12 +1306,39 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString)
}
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
List *dependencies = GetDependenciesForObject(&address);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies)
{
if (getObjectClass(dependency) != OCLASS_CLASS)
{
continue;
}
char relKind = get_rel_relkind(dependency->objectId);
/* only distributed ones are allowed */
if (relKind == RELKIND_RELATION ||
relKind == RELKIND_PARTITIONED_TABLE ||
relKind == RELKIND_FOREIGN_TABLE ||
relKind == RELKIND_SEQUENCE)
{
/* TODO: Consider changing the check and log message */
ereport(WARNING, (errmsg(
"Citus can't distribute functions having dependency on non-distributed relations or sequences"),
errdetail("Function will be created only locally"),
errhint(
"To distribute function, distribute dependent relations and sequences first")));
return NIL;
}
}
EnsureDependenciesExistOnAllNodes(&address);
List *commands = list_make4(DISABLE_DDL_PROPAGATION,
GetFunctionDDLCommand(address.objectId, true),
GetFunctionAlterOwnerCommand(address.objectId),
ENABLE_DDL_PROPAGATION);
List *commands = list_make1(DISABLE_DDL_PROPAGATION);
commands = list_concat(commands, CreateFunctionDDLCommandsIdempotent(&address));
commands = list_concat(commands, list_make1(ENABLE_DDL_PROPAGATION));
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -958,6 +958,15 @@ CreateTypeDDLCommandsIdempotent(const ObjectAddress *typeAddress)
return NIL;
}
char type = get_typtype(typeAddress->objectId);
char relKind = get_rel_relkind(typeAddress->objectId);
/* Don't send anything if the type is a table's row type */
if (type == TYPTYPE_COMPOSITE && relKind != RELKIND_COMPOSITE_TYPE)
{
return NIL;
}
Node *stmt = CreateTypeStmtByObjectAddress(typeAddress);
/* capture ddl command for recreation and wrap in create if not exists construct */