From 8ae7577581256289c6e47f32f829ad577bbac735 Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Fri, 4 Feb 2022 15:37:49 +0300 Subject: [PATCH] Use superuser connection while syncing dependent objects' pg_dist_object tuples --- .../distributed/commands/dependencies.c | 10 +- src/backend/distributed/metadata/distobject.c | 94 +++++++++++++++---- .../transaction/worker_transaction.c | 21 ++++- src/include/distributed/metadata/distobject.h | 1 + src/include/distributed/worker_transaction.h | 1 + 5 files changed, 104 insertions(+), 23 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index ea1c59064..f82ddf065 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -120,7 +120,15 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) */ foreach_ptr(dependency, dependenciesWithCommands) { - MarkObjectDistributed(dependency); + /* + * pg_dist_object entries must be propagated with the super user, since + * the owner of the target object may not own dependencies but we must + * propagate as we send objects itself with the superuser. + * + * Only dependent object's metadata should be propagated with super user. + * Metadata of the table itself must be propagated with the current user. + */ + MarkObjectDistributedViaSuperUser(dependency); } } diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index 37aaa3aed..ba67a073b 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -46,6 +46,8 @@ #include "utils/rel.h" +static void MarkObjectDistributedLocally(const ObjectAddress *distAddress); +static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, Datum *paramValues); @@ -141,14 +143,60 @@ ObjectExists(const ObjectAddress *address) /* - * MarkObjectDistributed marks an object as a distributed object by citus. Marking is done - * by adding appropriate entries to citus.pg_dist_object. + * MarkObjectDistributed marks an object as a distributed object. Marking is done + * by adding appropriate entries to citus.pg_dist_object and also marking the object + * as distributed by opening a connection using current user to all of the workers + * with metadata if object propagation is on. * - * This also marks the object as distributed on all of the workers with metadata - * if object propagation is on. + * This function should be used if the user creating the given object. If you want + * to mark dependent objects as distributed check MarkObjectDistributedViaSuperUser. */ void MarkObjectDistributed(const ObjectAddress *distAddress) +{ + MarkObjectDistributedLocally(distAddress); + + if (EnableMetadataSync) + { + char *workerPgDistObjectUpdateCommand = + CreatePgDistObjectEntryCommand(distAddress); + SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand); + } +} + + +/* + * MarkObjectDistributedViaSuperUser marks an object as a distributed object. Marking + * is done by adding appropriate entries to citus.pg_dist_object and also marking the + * object as distributed by opening a connection using super user to all of the workers + * with metadata if object propagation is on. + * + * This function should be used to mark dependent object as distributed. If you want + * to mark the object you are creating please check MarkObjectDistributed. + */ +void +MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress) +{ + MarkObjectDistributedLocally(distAddress); + + if (EnableMetadataSync) + { + char *workerPgDistObjectUpdateCommand = + CreatePgDistObjectEntryCommand(distAddress); + SendCommandToWorkersWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand); + } +} + + +/* + * MarkObjectDistributedLocally marks an object as a distributed object by citus. + * Marking is done by adding appropriate entries to citus.pg_dist_object. + * + * This function should never be called alone, MarkObjectDistributed() or + * MarkObjectDistributedViaSuperUser() should be called. + */ +static void +MarkObjectDistributedLocally(const ObjectAddress *distAddress) { int paramCount = 3; Oid paramTypes[3] = { @@ -161,32 +209,38 @@ MarkObjectDistributed(const ObjectAddress *distAddress) ObjectIdGetDatum(distAddress->objectId), Int32GetDatum(distAddress->objectSubId) }; - char *insertQuery = "INSERT INTO citus.pg_dist_object (classid, objid, objsubid) " "VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"; - int spiStatus = ExecuteCommandAsSuperuser(insertQuery, paramCount, paramTypes, paramValues); if (spiStatus < 0) { ereport(ERROR, (errmsg("failed to insert object into citus.pg_dist_object"))); } +} - if (EnableMetadataSync) - { - /* create a list by adding the address of value to not to have warning */ - List *objectAddressList = list_make1((ObjectAddress *) distAddress); - List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX); - List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID); - List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN); - char *workerPgDistObjectUpdateCommand = - MarkObjectsDistributedCreateCommand(objectAddressList, - distArgumetIndexList, - colocationIdList, - forceDelegationList); - SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand); - } +/* + * CreatePgDistObjectEntryCommand creates command to insert pg_dist_object tuple + * for the given object address. + */ +static char * +CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress) +{ + /* create a list by adding the address of value to not to have warning */ + List *objectAddressList = + list_make1((ObjectAddress *) objectAddress); + List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX); + List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID); + List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN); + + char *workerPgDistObjectUpdateCommand = + MarkObjectsDistributedCreateCommand(objectAddressList, + distArgumetIndexList, + colocationIdList, + forceDelegationList); + + return workerPgDistObjectUpdateCommand; } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 61baff4fe..e94abed53 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -112,8 +112,7 @@ SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *node /* * SendCommandToWorkers sends a command to all workers in * parallel. Commands are committed on the workers when the local - * transaction commits. The connection are made as the extension - * owner to ensure write access to the Citus metadata tables. + * transaction commits. */ void SendCommandToWorkersWithMetadata(const char *command) @@ -123,6 +122,24 @@ SendCommandToWorkersWithMetadata(const char *command) } +/* + * SendCommandToWorkersWithMetadataViaSuperUser sends a command to all workers in + * parallel by opening a super user connection. Commands are committed on the workers + * when the local transaction commits. The connection are made as the extension + * owner to ensure write access to the Citus metadata tables. + * + * Since we prevent to open superuser connections for metadata tables, it is + * discourated to use it. Consider using it only for propagating pg_dist_object + * tuples for dependent objects. + */ +void +SendCommandToWorkersWithMetadataViaSuperUser(const char *command) +{ + SendCommandToMetadataWorkersParams(command, CitusExtensionOwnerName(), + 0, NULL, NULL); +} + + /* * TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the * TargetWorkerSet. diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index 659e8ab7f..472cd83e2 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress); extern bool IsObjectDistributed(const ObjectAddress *address); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); +extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); extern bool IsTableOwnedByExtension(Oid relationId); extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target, diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 63d419c66..c3748ee5b 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -49,6 +49,7 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons List * commandList); extern void SendCommandToWorkersWithMetadata(const char *command); +extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command); extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void EnsureNoModificationsHaveBeenDone(void); extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,