Fixes create user queries from Citus non-main databases with other users (#7442)

This PR makes the connections to other nodes for
`mark_object_distributed` use the same user as
`execute_command_on_remote_nodes_as_user` so they'll use the same
connection.
pull/7432/head^2
Halil Ozan Akgül 2024-01-24 12:57:54 +03:00 committed by GitHub
parent 3ffb831beb
commit 1cb2e1e4e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 32 additions and 22 deletions

View File

@ -92,7 +92,7 @@
#define START_MANAGEMENT_TRANSACTION \ #define START_MANAGEMENT_TRANSACTION \
"SELECT citus_internal.start_management_transaction('%lu')" "SELECT citus_internal.start_management_transaction('%lu')"
#define MARK_OBJECT_DISTRIBUTED \ #define MARK_OBJECT_DISTRIBUTED \
"SELECT citus_internal.mark_object_distributed(%d, %s, %d)" "SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)"
bool EnableDDLPropagation = true; /* ddl propagation is enabled */ bool EnableDDLPropagation = true; /* ddl propagation is enabled */
@ -1636,7 +1636,8 @@ RunPostprocessMainDBCommand(Node *parsetree)
MARK_OBJECT_DISTRIBUTED, MARK_OBJECT_DISTRIBUTED,
AuthIdRelationId, AuthIdRelationId,
quote_literal_cstr(createRoleStmt->role), quote_literal_cstr(createRoleStmt->role),
roleOid); roleOid,
quote_literal_cstr(CurrentUserName()));
RunCitusMainDBQuery(mainDBQuery->data); RunCitusMainDBQuery(mainDBQuery->data);
} }
} }

View File

@ -67,7 +67,8 @@ PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
/* /*
* mark_object_distributed adds an object to pg_dist_object * mark_object_distributed adds an object to pg_dist_object
* in all of the nodes. * in all of the nodes, for the connections to the other nodes this function
* uses the user passed.
*/ */
Datum Datum
mark_object_distributed(PG_FUNCTION_ARGS) mark_object_distributed(PG_FUNCTION_ARGS)
@ -81,6 +82,8 @@ mark_object_distributed(PG_FUNCTION_ARGS)
Oid objectId = PG_GETARG_OID(2); Oid objectId = PG_GETARG_OID(2);
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress)); ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*objectAddress, classId, objectId); ObjectAddressSet(*objectAddress, classId, objectId);
text *connectionUserText = PG_GETARG_TEXT_P(3);
char *connectionUser = text_to_cstring(connectionUserText);
/* /*
* This function is called when a query is run from a Citus non-main database. * This function is called when a query is run from a Citus non-main database.
@ -88,7 +91,8 @@ mark_object_distributed(PG_FUNCTION_ARGS)
* 2PC still works. * 2PC still works.
*/ */
bool useConnectionForLocalQuery = true; bool useConnectionForLocalQuery = true;
MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery); MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery,
connectionUser);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -193,7 +197,8 @@ void
MarkObjectDistributed(const ObjectAddress *distAddress) MarkObjectDistributed(const ObjectAddress *distAddress)
{ {
bool useConnectionForLocalQuery = false; bool useConnectionForLocalQuery = false;
MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery); MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery,
CurrentUserName());
} }
@ -204,7 +209,7 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
*/ */
void void
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName, MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName,
bool useConnectionForLocalQuery) bool useConnectionForLocalQuery, char *connectionUser)
{ {
if (!CitusHasBeenLoaded()) if (!CitusHasBeenLoaded())
{ {
@ -234,7 +239,8 @@ MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName
{ {
char *workerPgDistObjectUpdateCommand = char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress, objectName); CreatePgDistObjectEntryCommand(distAddress, objectName);
SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand); SendCommandToRemoteMetadataNodesParams(workerPgDistObjectUpdateCommand,
connectionUser, 0, NULL, NULL);
} }
} }

View File

@ -15,7 +15,7 @@ DROP FUNCTION citus_internal.execute_command_on_remote_nodes_as_user(
); );
DROP FUNCTION citus_internal.mark_object_distributed( DROP FUNCTION citus_internal.mark_object_distributed(
classId Oid, objectName text, objectId Oid classId Oid, objectName text, objectId Oid, connectionUser text
); );
DROP FUNCTION citus_internal.commit_management_command_2pc(); DROP FUNCTION citus_internal.commit_management_command_2pc();

View File

@ -1,7 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text)
RETURNS VOID RETURNS VOID
LANGUAGE C LANGUAGE C
AS 'MODULE_PATHNAME', $$mark_object_distributed$$; AS 'MODULE_PATHNAME', $$mark_object_distributed$$;
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text)
IS 'adds an object to pg_dist_object on all nodes'; IS 'adds an object to pg_dist_object on all nodes';

View File

@ -1,7 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) CREATE OR REPLACE FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text)
RETURNS VOID RETURNS VOID
LANGUAGE C LANGUAGE C
AS 'MODULE_PATHNAME', $$mark_object_distributed$$; AS 'MODULE_PATHNAME', $$mark_object_distributed$$;
COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid) COMMENT ON FUNCTION citus_internal.mark_object_distributed(classId Oid, objectName text, objectId Oid, connectionUser text)
IS 'adds an object to pg_dist_object on all nodes'; IS 'adds an object to pg_dist_object on all nodes';

View File

@ -36,10 +36,6 @@
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
static void SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues);
static void SendBareCommandListToMetadataNodesInternal(List *commandList, static void SendBareCommandListToMetadataNodesInternal(List *commandList,
TargetWorkerSet targetWorkerSet); TargetWorkerSet targetWorkerSet);
static void SendCommandToMetadataWorkersParams(const char *command, static void SendCommandToMetadataWorkersParams(const char *command,
@ -209,7 +205,7 @@ SendCommandListToRemoteNodesWithMetadata(List *commands)
* SendCommandToWorkersParamsInternal() that can be used to send commands * SendCommandToWorkersParamsInternal() that can be used to send commands
* to remote metadata nodes. * to remote metadata nodes.
*/ */
static void void
SendCommandToRemoteMetadataNodesParams(const char *command, SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount, const char *user, int parameterCount,
const Oid *parameterTypes, const Oid *parameterTypes,

View File

@ -24,7 +24,8 @@ extern bool IsAnyObjectDistributed(const List *addresses);
extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name, extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name,
bool useConnectionForLocalQuery); bool useConnectionForLocalQuery,
char *connectionUser);
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address); extern void UnmarkObjectDistributed(const ObjectAddress *address);

View File

@ -68,6 +68,10 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet,
const char *nodeUser, const char *command); const char *nodeUser, const char *command);
extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort,
const char *nodeUser, const char *command); const char *nodeUser, const char *command);
extern void SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues);
extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName,
int32 nodePort, int32 nodePort,
const char *nodeUser, const char *nodeUser,

View File

@ -22,7 +22,7 @@ def test_main_commited_outer_not_yet(cluster):
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')" "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u1;', 'postgres')"
) )
cur2.execute( cur2.execute(
"SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123)" "SELECT citus_internal.mark_object_distributed(1260, 'u1', 123123, 'postgres')"
) )
cur2.execute("COMMIT") cur2.execute("COMMIT")
@ -133,7 +133,7 @@ def test_main_commited_outer_aborted(cluster):
"SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u2;', 'postgres')" "SELECT citus_internal.execute_command_on_remote_nodes_as_user('CREATE USER u2;', 'postgres')"
) )
cur2.execute( cur2.execute(
"SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321)" "SELECT citus_internal.mark_object_distributed(1260, 'u2', 321321, 'postgres')"
) )
cur2.execute("COMMIT") cur2.execute("COMMIT")

View File

@ -1424,7 +1424,7 @@ SELECT * FROM multi_extension.print_extension_changes();
--------------------------------------------------------------------- ---------------------------------------------------------------------
| function citus_internal.commit_management_command_2pc() void | function citus_internal.commit_management_command_2pc() void
| function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void | function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void
| function citus_internal.mark_object_distributed(oid,text,oid) void | function citus_internal.mark_object_distributed(oid,text,oid,text) void
| function citus_internal.start_management_transaction(xid8) void | function citus_internal.start_management_transaction(xid8) void
| function citus_internal_acquire_citus_advisory_object_class_lock(integer,cstring) void | function citus_internal_acquire_citus_advisory_object_class_lock(integer,cstring) void
| function citus_internal_database_command(text) void | function citus_internal_database_command(text) void

View File

@ -71,6 +71,7 @@ SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerou
ERROR: operation is not allowed ERROR: operation is not allowed
HINT: Run the command with a superuser. HINT: Run the command with a superuser.
\c other_db1 \c other_db1
SET ROLE nonsuperuser;
CREATE USER other_db_user9; CREATE USER other_db_user9;
RESET ROLE; RESET ROLE;
\c regression \c regression

View File

@ -59,7 +59,7 @@ ORDER BY 1;
function citus_internal.commit_management_command_2pc() function citus_internal.commit_management_command_2pc()
function citus_internal.execute_command_on_remote_nodes_as_user(text,text) function citus_internal.execute_command_on_remote_nodes_as_user(text,text)
function citus_internal.find_groupid_for_node(text,integer) function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.mark_object_distributed(oid,text,oid) function citus_internal.mark_object_distributed(oid,text,oid,text)
function citus_internal.pg_dist_node_trigger_func() function citus_internal.pg_dist_node_trigger_func()
function citus_internal.pg_dist_rebalance_strategy_trigger_func() function citus_internal.pg_dist_rebalance_strategy_trigger_func()
function citus_internal.pg_dist_shard_placement_trigger_func() function citus_internal.pg_dist_shard_placement_trigger_func()

View File

@ -51,6 +51,7 @@ SET ROLE nonsuperuser;
SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres'); SELECT citus_internal.execute_command_on_remote_nodes_as_user($$SELECT 'dangerous query'$$, 'postgres');
\c other_db1 \c other_db1
SET ROLE nonsuperuser;
CREATE USER other_db_user9; CREATE USER other_db_user9;
RESET ROLE; RESET ROLE;