diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 2415f598f..41db0cbb8 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -25,12 +25,15 @@ #include "postgres.h" #include "access/hash.h" +#include "miscadmin.h" +#include "utils/builtins.h" #include "distributed/hash_helpers.h" #include "distributed/directed_acylic_graph_execution.h" #include "distributed/multi_physical_planner.h" #include "distributed/adaptive_executor.h" #include "distributed/worker_manager.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_server_executor.h" #include "distributed/repartition_join_execution.h" #include "distributed/worker_transaction.h" @@ -46,7 +49,7 @@ static List * CreateTemporarySchemasForMergeTasks(Job *topLevelJob); static List * ExtractJobsInJobTree(Job *job); static void TraverseJobTree(Job *curJob, List **jobs); -static char * GenerateCreateSchemasCommand(List *jobIds); +static char * GenerateCreateSchemasCommand(List *jobIds, char *schemaOwner); static char * GenerateJobCommands(List *jobIds, char *templateCommand); static char * GenerateDeleteJobsCommand(List *jobIds); @@ -79,7 +82,7 @@ static List * CreateTemporarySchemasForMergeTasks(Job *topLeveLJob) { List *jobIds = ExtractJobsInJobTree(topLeveLJob); - char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds); + char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName()); SendCommandToAllWorkers(createSchemasCommand, CitusExtensionOwnerName()); return jobIds; } @@ -120,9 +123,18 @@ TraverseJobTree(Job *curJob, List **jobIds) * GenerateCreateSchemasCommand returns concatanated create schema commands. */ static char * -GenerateCreateSchemasCommand(List *jobIds) +GenerateCreateSchemasCommand(List *jobIds, char *ownerName) { - return GenerateJobCommands(jobIds, WORKER_CREATE_SCHEMA_QUERY); + StringInfo createSchemaCommand = makeStringInfo(); + ListCell *jobIdCell = NULL; + + foreach(jobIdCell, jobIds) + { + uint64 jobId = (uint64) lfirst(jobIdCell); + appendStringInfo(createSchemaCommand, WORKER_CREATE_SCHEMA_QUERY, + jobId, quote_literal_cstr(ownerName)); + } + return createSchemaCommand->data; } diff --git a/src/backend/distributed/sql/citus--9.2-1--9.2-2.sql b/src/backend/distributed/sql/citus--9.2-1--9.2-2.sql index 6576b11b5..7bc033d0b 100644 --- a/src/backend/distributed/sql/citus--9.2-1--9.2-2.sql +++ b/src/backend/distributed/sql/citus--9.2-1--9.2-2.sql @@ -1,2 +1,4 @@ +#include "udfs/worker_create_schema/9.2-2.sql" + -- reserve UINT32_MAX (4294967295) for a special node ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq MAXVALUE 4294967294; diff --git a/src/backend/distributed/sql/udfs/worker_create_schema/9.2-2.sql b/src/backend/distributed/sql/udfs/worker_create_schema/9.2-2.sql new file mode 100644 index 000000000..b521cb45a --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_create_schema/9.2-2.sql @@ -0,0 +1,10 @@ +DROP FUNCTION IF EXISTS pg_catalog.worker_create_schema(jobid bigint); + +CREATE FUNCTION pg_catalog.worker_create_schema(jobid bigint, username text) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_create_schema$$; +COMMENT ON FUNCTION pg_catalog.worker_create_schema(bigint, text) + IS 'create schema in remote node'; + +REVOKE ALL ON FUNCTION pg_catalog.worker_create_schema(bigint, text) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/worker_create_schema/latest.sql b/src/backend/distributed/sql/udfs/worker_create_schema/latest.sql index c8d311823..b521cb45a 100644 --- a/src/backend/distributed/sql/udfs/worker_create_schema/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_create_schema/latest.sql @@ -1,8 +1,10 @@ -CREATE FUNCTION pg_catalog.worker_create_schema(bigint) +DROP FUNCTION IF EXISTS pg_catalog.worker_create_schema(jobid bigint); + +CREATE FUNCTION pg_catalog.worker_create_schema(jobid bigint, username text) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_create_schema$$; -COMMENT ON FUNCTION pg_catalog.worker_create_schema(bigint) +COMMENT ON FUNCTION pg_catalog.worker_create_schema(bigint, text) IS 'create schema in remote node'; -REVOKE ALL ON FUNCTION pg_catalog.worker_create_schema(bigint) FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.worker_create_schema(bigint, text) FROM PUBLIC; diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index 44ce69ae4..bb4e439ca 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -102,7 +102,7 @@ task_tracker_assign_task(PG_FUNCTION_ARGS) if (!schemaExists) { /* lock gets automatically released upon return from this function */ - CreateJobSchema(jobSchemaName); + CreateJobSchema(jobSchemaName, NULL); } else { @@ -306,9 +306,11 @@ TaskTrackerRunning(void) * this function ensures that our pg_ prefixed schema names can be created. * Further note that the created schema does not become visible to other * processes until the transaction commits. + * + * If schemaOwner is NULL, then current user is used. */ void -CreateJobSchema(StringInfo schemaName) +CreateJobSchema(StringInfo schemaName, char *schemaOwner) { const char *queryString = NULL; @@ -324,10 +326,15 @@ CreateJobSchema(StringInfo schemaName) GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + if (schemaOwner == NULL) + { + schemaOwner = GetUserNameFromId(savedUserId, false); + } + /* build a CREATE SCHEMA statement */ currentUserRole.type = T_RoleSpec; currentUserRole.roletype = ROLESPEC_CSTRING; - currentUserRole.rolename = GetUserNameFromId(savedUserId, false); + currentUserRole.rolename = schemaOwner; currentUserRole.location = -1; CreateSchemaStmt *createSchemaStmt = makeNode(CreateSchemaStmt); diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index 98b1a3f86..2f84f545e 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -68,6 +68,9 @@ Datum worker_create_schema(PG_FUNCTION_ARGS) { uint64 jobId = PG_GETARG_INT64(0); + text *ownerText = PG_GETARG_TEXT_P(1); + char *ownerString = TextDatumGetCString(ownerText); + StringInfo jobSchemaName = JobSchemaName(jobId); CheckCitusVersion(ERROR); @@ -75,7 +78,7 @@ worker_create_schema(PG_FUNCTION_ARGS) bool schemaExists = JobSchemaExists(jobSchemaName); if (!schemaExists) { - CreateJobSchema(jobSchemaName); + CreateJobSchema(jobSchemaName, ownerString); } PG_RETURN_VOID(); diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index cf4fe30cb..d49f09d0c 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -37,7 +37,7 @@ #define JOB_CLEANUP_TASK_ID INT_MAX /* Adaptive executor repartioning related defines */ -#define WORKER_CREATE_SCHEMA_QUERY "SELECT worker_create_schema (" UINT64_FORMAT ");" +#define WORKER_CREATE_SCHEMA_QUERY "SELECT worker_create_schema (" UINT64_FORMAT ", %s);" #define WORKER_REPARTITION_CLEANUP_QUERY "SELECT worker_repartition_cleanup (" \ UINT64_FORMAT \ ");" diff --git a/src/include/distributed/task_tracker_protocol.h b/src/include/distributed/task_tracker_protocol.h index 38028cc00..7fd82ea94 100644 --- a/src/include/distributed/task_tracker_protocol.h +++ b/src/include/distributed/task_tracker_protocol.h @@ -17,7 +17,7 @@ #include "fmgr.h" -extern void CreateJobSchema(StringInfo schemaName); +extern void CreateJobSchema(StringInfo schemaName, char *schemaOwner); /* Function declarations for distributed task management */ extern Datum task_tracker_assign_task(PG_FUNCTION_ARGS); diff --git a/src/test/regress/expected/multi_multiuser.out b/src/test/regress/expected/multi_multiuser.out index d88340208..d1cee6f75 100644 --- a/src/test/regress/expected/multi_multiuser.out +++ b/src/test/regress/expected/multi_multiuser.out @@ -155,6 +155,14 @@ SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b. 0 (1 row) +SET citus.task_executor_type TO 'adaptive'; +SET citus.enable_repartition_joins TO true; +SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; + count +--------------------------------------------------------------------- + 0 +(1 row) + -- should not be able to transmit directly COPY "postgresql.conf" TO STDOUT WITH (format transmit); ERROR: operation is not allowed @@ -235,6 +243,14 @@ SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b. 0 (1 row) +SET citus.task_executor_type TO 'adaptive'; +SET citus.enable_repartition_joins TO true; +SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; + count +--------------------------------------------------------------------- + 0 +(1 row) + -- should not be able to transmit directly COPY "postgresql.conf" TO STDOUT WITH (format transmit); ERROR: operation is not allowed @@ -277,6 +293,10 @@ ERROR: permission denied for table test -- test re-partition query SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; ERROR: permission denied for table test +SET citus.task_executor_type TO 'adaptive'; +SET citus.enable_repartition_joins TO true; +SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; +ERROR: permission denied for table test -- should not be able to transmit directly COPY "postgresql.conf" TO STDOUT WITH (format transmit); ERROR: operation is not allowed diff --git a/src/test/regress/sql/multi_multiuser.sql b/src/test/regress/sql/multi_multiuser.sql index 651e489e5..2d3ed8019 100644 --- a/src/test/regress/sql/multi_multiuser.sql +++ b/src/test/regress/sql/multi_multiuser.sql @@ -112,6 +112,10 @@ SELECT count(*), min(current_user) FROM test; -- test re-partition query (needs to transmit intermediate results) SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; +SET citus.task_executor_type TO 'adaptive'; +SET citus.enable_repartition_joins TO true; +SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; + -- should not be able to transmit directly COPY "postgresql.conf" TO STDOUT WITH (format transmit); @@ -144,6 +148,10 @@ SELECT count(*), min(current_user) FROM test; -- test re-partition query (needs to transmit intermediate results) SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; +SET citus.task_executor_type TO 'adaptive'; +SET citus.enable_repartition_joins TO true; +SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; + -- should not be able to transmit directly COPY "postgresql.conf" TO STDOUT WITH (format transmit); @@ -176,6 +184,10 @@ SELECT count(*), min(current_user) FROM test; -- test re-partition query SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; +SET citus.task_executor_type TO 'adaptive'; +SET citus.enable_repartition_joins TO true; +SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2; + -- should not be able to transmit directly COPY "postgresql.conf" TO STDOUT WITH (format transmit);