From c4fddf140694b9d0d09be3772bf6ae288c93f7a0 Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Wed, 9 Feb 2022 13:46:26 +0300 Subject: [PATCH] Distribute functions with CREATE FUNCTION command --- src/backend/distributed/commands/function.c | 86 +++++++++++---------- src/backend/distributed/commands/type.c | 9 +++ 2 files changed, 56 insertions(+), 39 deletions(-) diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 497a32dbb..8577957c4 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -25,6 +25,7 @@ #include "access/htup_details.h" #include "access/xact.h" #include "catalog/pg_aggregate.h" +#include "catalog/dependency.h" #include "catalog/namespace.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" @@ -38,6 +39,7 @@ #include "distributed/listutils.h" #include "distributed/maintenanced.h" #include "distributed/metadata_utility.h" +#include "distributed/metadata/dependency.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata/pg_dist_object.h" @@ -744,7 +746,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress, /* * GetFunctionDDLCommand returns the complete "CREATE OR REPLACE FUNCTION ..." statement for - * the specified function followed by "ALTER FUNCTION .. SET OWNER ..". + * the specified function. * * useCreateOrReplace is ignored for non-aggregate functions. */ @@ -1191,46 +1193,23 @@ EnsureSequentialModeForFunctionDDL(void) /* * ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION - * statement. We only propagate replace's of distributed functions to keep the function on - * the workers in sync with the one on the coordinator. + * statement. */ static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt) { - if (creating_extension) + if (!ShouldPropagate()) { - /* - * extensions should be created separately on the workers, functions cascading - * from an extension should therefore not be propagated. - */ - return false; - } - - if (!EnableMetadataSync) - { - /* - * we are configured to disable object propagation, should not propagate anything - */ - return false; - } - - if (!stmt->replace) - { - /* - * Since we only care for a replace of distributed functions if the statement is - * not a replace we are going to ignore. - */ return false; } /* - * Even though its a replace we should accept an non-existing function, it will just - * not be distributed + * by not propagating in a transaction block we allow for parallelism to be used when + * this function will be used as a column in a table that will be created and distributed + * in this same transaction. */ - ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, true); - if (!IsObjectDistributed(&address)) + if (IsMultiStatementTransaction()) { - /* do not propagate alter function for non-distributed functions */ return false; } @@ -1274,12 +1253,10 @@ ShouldPropagateAlterFunction(const ObjectAddress *address) /* * PreprocessCreateFunctionStmt is called during the planning phase for CREATE [OR REPLACE] - * FUNCTION. We primarily care for the replace variant of this statement to keep - * distributed functions in sync. We bail via a check on ShouldPropagateCreateFunction - * which checks for the OR REPLACE modifier. + * FUNCTION before it is created on the local node internally. * * Since we use pg_get_functiondef to get the ddl command we actually do not do any - * planning here, instead we defer the plan creation to the processing step. + * planning here, instead we defer the plan creation to the postprocessing step. * * Instead we do our basic housekeeping where we make sure we are on the coordinator and * can propagate the function in sequential mode. @@ -1300,7 +1277,7 @@ PreprocessCreateFunctionStmt(Node *node, const char *queryString, EnsureSequentialModeForFunctionDDL(); /* - * ddl jobs will be generated during the Processing phase as we need the function to + * ddl jobs will be generated during the postprocessing phase as we need the function to * be updated in the catalog to get its sql representation */ return NIL; @@ -1311,6 +1288,10 @@ PreprocessCreateFunctionStmt(Node *node, const char *queryString, * PostprocessCreateFunctionStmt actually creates the plan we need to execute for function * propagation. This is the downside of using pg_get_functiondef to get the sql statement. * + * If function depends on any non-distributed table or sequence, Citus can not distribute + * it. In order to not to prevent users from creating local functions on the coordinator + * WARNING message will be sent to the customer about the case instead of erroring out. + * * Besides creating the plan we also make sure all (new) dependencies of the function are * created on all nodes. */ @@ -1325,12 +1306,39 @@ PostprocessCreateFunctionStmt(Node *node, const char *queryString) } ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false); + + List *dependencies = GetDependenciesForObject(&address); + ObjectAddress *dependency = NULL; + foreach_ptr(dependency, dependencies) + { + if (getObjectClass(dependency) != OCLASS_CLASS) + { + continue; + } + + char relKind = get_rel_relkind(dependency->objectId); + + /* only distributed ones are allowed */ + if (relKind == RELKIND_RELATION || + relKind == RELKIND_PARTITIONED_TABLE || + relKind == RELKIND_FOREIGN_TABLE || + relKind == RELKIND_SEQUENCE) + { + /* TODO: Consider changing the check and log message */ + ereport(WARNING, (errmsg( + "Citus can't distribute functions having dependency on non-distributed relations or sequences"), + errdetail("Function will be created only locally"), + errhint( + "To distribute function, distribute dependent relations and sequences first"))); + return NIL; + } + } + EnsureDependenciesExistOnAllNodes(&address); - List *commands = list_make4(DISABLE_DDL_PROPAGATION, - GetFunctionDDLCommand(address.objectId, true), - GetFunctionAlterOwnerCommand(address.objectId), - ENABLE_DDL_PROPAGATION); + List *commands = list_make1(DISABLE_DDL_PROPAGATION); + commands = list_concat(commands, CreateFunctionDDLCommandsIdempotent(&address)); + commands = list_concat(commands, list_make1(ENABLE_DDL_PROPAGATION)); return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); } diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 78b56c2ff..ecce4cb9f 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -958,6 +958,15 @@ CreateTypeDDLCommandsIdempotent(const ObjectAddress *typeAddress) return NIL; } + char type = get_typtype(typeAddress->objectId); + char relKind = get_rel_relkind(typeAddress->objectId); + + /* Don't send anything if the type is a table's row type */ + if (type == TYPTYPE_COMPOSITE && relKind != RELKIND_COMPOSITE_TYPE) + { + return NIL; + } + Node *stmt = CreateTypeStmtByObjectAddress(typeAddress); /* capture ddl command for recreation and wrap in create if not exists construct */