citus/src/backend/distributed/commands/dependencies.c

374 lines
11 KiB
C

/*-------------------------------------------------------------------------
*
* dependencies.c
* Commands to create dependencies of an object on all workers.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/dependency.h"
#include "catalog/objectaddress.h"
#include "commands/extension.h"
#include "distributed/commands.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "storage/lmgr.h"
#include "utils/lsyscache.h"
typedef bool (*AddressPredicate)(const ObjectAddress *);
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
static List * FilterObjectAddressListByPredicate(List *objectAddressList,
AddressPredicate predicate);
bool EnableDependencyCreation = true;
/*
* EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes
* sure these are available on all workers. If not available they will be created on the
* workers via a separate session that will be committed directly so that the objects are
* visible to potentially multiple sessions creating the shards.
*
* Note; only the actual objects are created via a separate session, the local records to
* pg_dist_object are created in this session. As a side effect the objects could be
* created on the workers without a catalog entry on the coordinator. Updates to the
* objects on the coordinator are not propagated to the workers until the record is
* visible on the coordinator.
*
* This is solved by creating the dependencies in an idempotent manner, either via
* postgres native CREATE IF NOT EXISTS, or citus helper functions.
*/
void
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
{
List *dependenciesWithCommands = NIL;
List *ddlCommands = NULL;
/* collect all dependencies in creation order and get their ddl commands */
List *dependencies = GetDependenciesForObject(target);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies)
{
List *dependencyCommands = GetDependencyCreateDDLCommands(dependency);
ddlCommands = list_concat(ddlCommands, dependencyCommands);
/* create a new list with dependencies that actually created commands */
if (list_length(dependencyCommands) > 0)
{
dependenciesWithCommands = lappend(dependenciesWithCommands, dependency);
}
}
if (list_length(ddlCommands) <= 0)
{
/* no ddl commands to be executed */
return;
}
/* since we are executing ddl commands lets disable propagation, primarily for mx */
ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands);
/*
* Make sure that no new nodes are added after this point until the end of the
* transaction by taking a RowShareLock on pg_dist_node, which conflicts with the
* ExclusiveLock taken by master_add_node.
* This guarantees that all active nodes will have the object, because they will
* either get it now, or get it in master_add_node after this transaction finishes and
* the pg_dist_object record becomes visible.
*/
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(RowShareLock);
/*
* right after we acquired the lock we mark our objects as distributed, these changes
* will not become visible before we have successfully created all the objects on our
* workers.
*
* It is possible to create distributed tables which depend on other dependencies
* before any node is in the cluster. If we would wait till we actually had connected
* to the nodes before marking the objects as distributed these objects would never be
* created on the workers when they get added, causing shards to fail to create.
*/
foreach_ptr(dependency, dependenciesWithCommands)
{
MarkObjectDistributed(dependency);
}
/*
* collect and connect to all applicable nodes
*/
if (list_length(workerNodeList) <= 0)
{
/* no nodes to execute on */
return;
}
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
const char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort;
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort,
CitusExtensionOwnerName(),
ddlCommands);
}
}
/*
* GetDistributableDependenciesForObject finds all the dependencies that Citus
* can distribute and returns those dependencies regardless of their existency
* on nodes.
*/
List *
GetDistributableDependenciesForObject(const ObjectAddress *target)
{
/* local variables to work with dependencies */
List *distributableDependencies = NIL;
/* collect all dependencies in creation order */
List *dependencies = GetDependenciesForObject(target);
/* filter the ones that can be distributed */
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies)
{
/*
* TODO: maybe we can optimize the logic applied in below line. Actually we
* do not need to create ddl commands as we are not ensuring their existence
* in nodes, but we utilize logic it follows to choose the objects that could
* be distributed
*/
List *dependencyCommands = GetDependencyCreateDDLCommands(dependency);
/* create a new list with dependencies that actually created commands */
if (list_length(dependencyCommands) > 0)
{
distributableDependencies = lappend(distributableDependencies, dependency);
}
}
return distributableDependencies;
}
/*
* GetDependencyCreateDDLCommands returns a list (potentially empty or NIL) of ddl
* commands to execute on a worker to create the object.
*/
static List *
GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
{
switch (getObjectClass(dependency))
{
case OCLASS_CLASS:
{
/*
* types have an intermediate dependency on a relation (aka class), so we do
* support classes when the relkind is composite
*/
if (get_rel_relkind(dependency->objectId) == RELKIND_COMPOSITE_TYPE)
{
return NIL;
}
/* if this relation is not supported, break to the error at the end */
break;
}
case OCLASS_COLLATION:
{
return CreateCollationDDLsIdempotent(dependency->objectId);
}
case OCLASS_PROC:
{
return CreateFunctionDDLCommandsIdempotent(dependency);
}
case OCLASS_ROLE:
{
return GenerateCreateOrAlterRoleCommand(dependency->objectId);
}
case OCLASS_SCHEMA:
{
char *schemaDDLCommand = CreateSchemaDDLCommand(dependency->objectId);
List *DDLCommands = list_make1(schemaDDLCommand);
List *grantDDLCommands = GrantOnSchemaDDLCommands(dependency->objectId);
DDLCommands = list_concat(DDLCommands, grantDDLCommands);
return DDLCommands;
}
case OCLASS_TYPE:
{
return CreateTypeDDLCommandsIdempotent(dependency);
}
case OCLASS_EXTENSION:
{
return CreateExtensionDDLCommand(dependency);
}
default:
{
break;
}
}
/*
* make sure it fails hard when in debug mode, leave a hint for the user if this ever
* happens in production
*/
Assert(false);
ereport(ERROR, (errmsg("unsupported object %s for distribution by citus",
getObjectTypeDescription(dependency)),
errdetail(
"citus tries to recreate an unsupported object on its workers"),
errhint("please report a bug as this should not be happening")));
}
/*
* ReplicateAllDependenciesToNode replicate all previously marked objects to a worker
* node. The function also sets clusterHasDistributedFunction if there are any
* distributed functions.
*/
void
ReplicateAllDependenciesToNode(const char *nodeName, int nodePort)
{
List *ddlCommands = NIL;
/*
* collect all dependencies in creation order and get their ddl commands
*/
List *dependencies = GetDistributedObjectAddressList();
/*
* Depending on changes in the environment, such as the enable_object_propagation guc
* there might be objects in the distributed object address list that should currently
* not be propagated by citus as the are 'not supported'.
*/
dependencies = FilterObjectAddressListByPredicate(dependencies,
&SupportedDependencyByCitus);
/*
* When dependency lists are getting longer we see a delay in the creation time on the
* workers. We would like to inform the user. Currently we warn for lists greater than
* 100 items, where 100 is an arbitrarily chosen number. If we find it too high or too
* low we can adjust this based on experience.
*/
if (list_length(dependencies) > 100)
{
ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d", nodeName,
nodePort),
errdetail("There are %d objects to replicate, depending on your "
"environment this might take a while",
list_length(dependencies))));
}
dependencies = OrderObjectAddressListInDependencyOrder(dependencies);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies)
{
ddlCommands = list_concat(ddlCommands,
GetDependencyCreateDDLCommands(dependency));
}
if (list_length(ddlCommands) <= 0)
{
/* no commands to replicate dependencies to the new worker */
return;
}
/* since we are executing ddl commands lets disable propagation, primarily for mx */
ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands);
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort,
CitusExtensionOwnerName(), ddlCommands);
}
/*
* ShouldPropagate determines if we should be propagating anything
*/
bool
ShouldPropagate(void)
{
if (creating_extension)
{
/*
* extensions should be created separately on the workers, types cascading from an
* extension should therefore not be propagated.
*/
return false;
}
if (!EnableDependencyCreation)
{
/*
* we are configured to disable object propagation, should not propagate anything
*/
return false;
}
return true;
}
/*
* ShouldPropagateObject determines if we should be propagating DDLs based
* on their object address.
*/
bool
ShouldPropagateObject(const ObjectAddress *address)
{
if (!ShouldPropagate())
{
return false;
}
if (!IsObjectDistributed(address))
{
/* do not propagate for non-distributed types */
return false;
}
return true;
}
/*
* FilterObjectAddressListByPredicate takes a list of ObjectAddress *'s and returns a list
* only containing the ObjectAddress *'s for which the predicate returned true.
*/
static List *
FilterObjectAddressListByPredicate(List *objectAddressList, AddressPredicate predicate)
{
List *result = NIL;
ObjectAddress *address = NULL;
foreach_ptr(address, objectAddressList)
{
if (predicate(address))
{
result = lappend(result, address);
}
}
return result;
}