Use superuser connection while syncing dependent objects' pg_dist_object tuples

velioglu/sup_reb
Burak Velioglu 2022-02-04 15:37:49 +03:00
parent d7858709b4
commit 8ae7577581
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
5 changed files with 104 additions and 23 deletions

View File

@ -120,7 +120,15 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
*/ */
foreach_ptr(dependency, dependenciesWithCommands) foreach_ptr(dependency, dependenciesWithCommands)
{ {
MarkObjectDistributed(dependency); /*
* pg_dist_object entries must be propagated with the super user, since
* the owner of the target object may not own dependencies but we must
* propagate as we send objects itself with the superuser.
*
* Only dependent object's metadata should be propagated with super user.
* Metadata of the table itself must be propagated with the current user.
*/
MarkObjectDistributedViaSuperUser(dependency);
} }
} }

View File

@ -46,6 +46,8 @@
#include "utils/rel.h" #include "utils/rel.h"
static void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
Datum *paramValues); Datum *paramValues);
@ -141,14 +143,60 @@ ObjectExists(const ObjectAddress *address)
/* /*
* MarkObjectDistributed marks an object as a distributed object by citus. Marking is done * MarkObjectDistributed marks an object as a distributed object. Marking is done
* by adding appropriate entries to citus.pg_dist_object. * by adding appropriate entries to citus.pg_dist_object and also marking the object
* as distributed by opening a connection using current user to all of the workers
* with metadata if object propagation is on.
* *
* This also marks the object as distributed on all of the workers with metadata * This function should be used if the user creating the given object. If you want
* if object propagation is on. * to mark dependent objects as distributed check MarkObjectDistributedViaSuperUser.
*/ */
void void
MarkObjectDistributed(const ObjectAddress *distAddress) MarkObjectDistributed(const ObjectAddress *distAddress)
{
MarkObjectDistributedLocally(distAddress);
if (EnableMetadataSync)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand);
}
}
/*
* MarkObjectDistributedViaSuperUser marks an object as a distributed object. Marking
* is done by adding appropriate entries to citus.pg_dist_object and also marking the
* object as distributed by opening a connection using super user to all of the workers
* with metadata if object propagation is on.
*
* This function should be used to mark dependent object as distributed. If you want
* to mark the object you are creating please check MarkObjectDistributed.
*/
void
MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
{
MarkObjectDistributedLocally(distAddress);
if (EnableMetadataSync)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
SendCommandToWorkersWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
}
}
/*
* MarkObjectDistributedLocally marks an object as a distributed object by citus.
* Marking is done by adding appropriate entries to citus.pg_dist_object.
*
* This function should never be called alone, MarkObjectDistributed() or
* MarkObjectDistributedViaSuperUser() should be called.
*/
static void
MarkObjectDistributedLocally(const ObjectAddress *distAddress)
{ {
int paramCount = 3; int paramCount = 3;
Oid paramTypes[3] = { Oid paramTypes[3] = {
@ -161,32 +209,38 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
ObjectIdGetDatum(distAddress->objectId), ObjectIdGetDatum(distAddress->objectId),
Int32GetDatum(distAddress->objectSubId) Int32GetDatum(distAddress->objectSubId)
}; };
char *insertQuery = "INSERT INTO citus.pg_dist_object (classid, objid, objsubid) " char *insertQuery = "INSERT INTO citus.pg_dist_object (classid, objid, objsubid) "
"VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"; "VALUES ($1, $2, $3) ON CONFLICT DO NOTHING";
int spiStatus = ExecuteCommandAsSuperuser(insertQuery, paramCount, paramTypes, int spiStatus = ExecuteCommandAsSuperuser(insertQuery, paramCount, paramTypes,
paramValues); paramValues);
if (spiStatus < 0) if (spiStatus < 0)
{ {
ereport(ERROR, (errmsg("failed to insert object into citus.pg_dist_object"))); ereport(ERROR, (errmsg("failed to insert object into citus.pg_dist_object")));
} }
}
if (EnableMetadataSync)
{
/* create a list by adding the address of value to not to have warning */
List *objectAddressList = list_make1((ObjectAddress *) distAddress);
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);
char *workerPgDistObjectUpdateCommand = /*
MarkObjectsDistributedCreateCommand(objectAddressList, * CreatePgDistObjectEntryCommand creates command to insert pg_dist_object tuple
distArgumetIndexList, * for the given object address.
colocationIdList, */
forceDelegationList); static char *
SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand); CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress)
} {
/* create a list by adding the address of value to not to have warning */
List *objectAddressList =
list_make1((ObjectAddress *) objectAddress);
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);
char *workerPgDistObjectUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
distArgumetIndexList,
colocationIdList,
forceDelegationList);
return workerPgDistObjectUpdateCommand;
} }

View File

@ -112,8 +112,7 @@ SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *node
/* /*
* SendCommandToWorkers sends a command to all workers in * SendCommandToWorkers sends a command to all workers in
* parallel. Commands are committed on the workers when the local * parallel. Commands are committed on the workers when the local
* transaction commits. The connection are made as the extension * transaction commits.
* owner to ensure write access to the Citus metadata tables.
*/ */
void void
SendCommandToWorkersWithMetadata(const char *command) SendCommandToWorkersWithMetadata(const char *command)
@ -123,6 +122,24 @@ SendCommandToWorkersWithMetadata(const char *command)
} }
/*
* SendCommandToWorkersWithMetadataViaSuperUser sends a command to all workers in
* parallel by opening a super user connection. Commands are committed on the workers
* when the local transaction commits. The connection are made as the extension
* owner to ensure write access to the Citus metadata tables.
*
* Since we prevent to open superuser connections for metadata tables, it is
* discourated to use it. Consider using it only for propagating pg_dist_object
* tuples for dependent objects.
*/
void
SendCommandToWorkersWithMetadataViaSuperUser(const char *command)
{
SendCommandToMetadataWorkersParams(command, CitusExtensionOwnerName(),
0, NULL, NULL);
}
/* /*
* TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the * TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the
* TargetWorkerSet. * TargetWorkerSet.

View File

@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
extern bool IsObjectDistributed(const ObjectAddress *address); extern bool IsObjectDistributed(const ObjectAddress *address);
extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address); extern void UnmarkObjectDistributed(const ObjectAddress *address);
extern bool IsTableOwnedByExtension(Oid relationId); extern bool IsTableOwnedByExtension(Oid relationId);
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target, extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,

View File

@ -49,6 +49,7 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons
List * List *
commandList); commandList);
extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendCommandToWorkersWithMetadata(const char *command);
extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command);
extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void SendBareCommandListToMetadataWorkers(List *commandList);
extern void EnsureNoModificationsHaveBeenDone(void); extern void EnsureNoModificationsHaveBeenDone(void);
extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,