mirror of https://github.com/citusdata/citus.git
Merge pull request #3460 from citusdata/fix_permissions
Create merge task temporary schemas with current userpull/3467/head
commit
3826e81056
|
@ -25,12 +25,15 @@
|
|||
|
||||
#include "postgres.h"
|
||||
#include "access/hash.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
|
||||
#include "distributed/directed_acylic_graph_execution.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/adaptive_executor.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/repartition_join_execution.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
|
@ -46,7 +49,7 @@
|
|||
static List * CreateTemporarySchemasForMergeTasks(Job *topLevelJob);
|
||||
static List * ExtractJobsInJobTree(Job *job);
|
||||
static void TraverseJobTree(Job *curJob, List **jobs);
|
||||
static char * GenerateCreateSchemasCommand(List *jobIds);
|
||||
static char * GenerateCreateSchemasCommand(List *jobIds, char *schemaOwner);
|
||||
static char * GenerateJobCommands(List *jobIds, char *templateCommand);
|
||||
static char * GenerateDeleteJobsCommand(List *jobIds);
|
||||
|
||||
|
@ -79,7 +82,7 @@ static List *
|
|||
CreateTemporarySchemasForMergeTasks(Job *topLeveLJob)
|
||||
{
|
||||
List *jobIds = ExtractJobsInJobTree(topLeveLJob);
|
||||
char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds);
|
||||
char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds, CurrentUserName());
|
||||
SendCommandToAllWorkers(createSchemasCommand, CitusExtensionOwnerName());
|
||||
return jobIds;
|
||||
}
|
||||
|
@ -120,9 +123,18 @@ TraverseJobTree(Job *curJob, List **jobIds)
|
|||
* GenerateCreateSchemasCommand returns concatanated create schema commands.
|
||||
*/
|
||||
static char *
|
||||
GenerateCreateSchemasCommand(List *jobIds)
|
||||
GenerateCreateSchemasCommand(List *jobIds, char *ownerName)
|
||||
{
|
||||
return GenerateJobCommands(jobIds, WORKER_CREATE_SCHEMA_QUERY);
|
||||
StringInfo createSchemaCommand = makeStringInfo();
|
||||
ListCell *jobIdCell = NULL;
|
||||
|
||||
foreach(jobIdCell, jobIds)
|
||||
{
|
||||
uint64 jobId = (uint64) lfirst(jobIdCell);
|
||||
appendStringInfo(createSchemaCommand, WORKER_CREATE_SCHEMA_QUERY,
|
||||
jobId, quote_literal_cstr(ownerName));
|
||||
}
|
||||
return createSchemaCommand->data;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1,2 +1,4 @@
|
|||
#include "udfs/worker_create_schema/9.2-2.sql"
|
||||
|
||||
-- reserve UINT32_MAX (4294967295) for a special node
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq MAXVALUE 4294967294;
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
DROP FUNCTION IF EXISTS pg_catalog.worker_create_schema(jobid bigint);
|
||||
|
||||
CREATE FUNCTION pg_catalog.worker_create_schema(jobid bigint, username text)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$worker_create_schema$$;
|
||||
COMMENT ON FUNCTION pg_catalog.worker_create_schema(bigint, text)
|
||||
IS 'create schema in remote node';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.worker_create_schema(bigint, text) FROM PUBLIC;
|
|
@ -1,8 +1,10 @@
|
|||
CREATE FUNCTION pg_catalog.worker_create_schema(bigint)
|
||||
DROP FUNCTION IF EXISTS pg_catalog.worker_create_schema(jobid bigint);
|
||||
|
||||
CREATE FUNCTION pg_catalog.worker_create_schema(jobid bigint, username text)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$worker_create_schema$$;
|
||||
COMMENT ON FUNCTION pg_catalog.worker_create_schema(bigint)
|
||||
COMMENT ON FUNCTION pg_catalog.worker_create_schema(bigint, text)
|
||||
IS 'create schema in remote node';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.worker_create_schema(bigint) FROM PUBLIC;
|
||||
REVOKE ALL ON FUNCTION pg_catalog.worker_create_schema(bigint, text) FROM PUBLIC;
|
||||
|
|
|
@ -102,7 +102,7 @@ task_tracker_assign_task(PG_FUNCTION_ARGS)
|
|||
if (!schemaExists)
|
||||
{
|
||||
/* lock gets automatically released upon return from this function */
|
||||
CreateJobSchema(jobSchemaName);
|
||||
CreateJobSchema(jobSchemaName, NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -306,9 +306,11 @@ TaskTrackerRunning(void)
|
|||
* this function ensures that our pg_ prefixed schema names can be created.
|
||||
* Further note that the created schema does not become visible to other
|
||||
* processes until the transaction commits.
|
||||
*
|
||||
* If schemaOwner is NULL, then current user is used.
|
||||
*/
|
||||
void
|
||||
CreateJobSchema(StringInfo schemaName)
|
||||
CreateJobSchema(StringInfo schemaName, char *schemaOwner)
|
||||
{
|
||||
const char *queryString = NULL;
|
||||
|
||||
|
@ -324,10 +326,15 @@ CreateJobSchema(StringInfo schemaName)
|
|||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||
|
||||
if (schemaOwner == NULL)
|
||||
{
|
||||
schemaOwner = GetUserNameFromId(savedUserId, false);
|
||||
}
|
||||
|
||||
/* build a CREATE SCHEMA statement */
|
||||
currentUserRole.type = T_RoleSpec;
|
||||
currentUserRole.roletype = ROLESPEC_CSTRING;
|
||||
currentUserRole.rolename = GetUserNameFromId(savedUserId, false);
|
||||
currentUserRole.rolename = schemaOwner;
|
||||
currentUserRole.location = -1;
|
||||
|
||||
CreateSchemaStmt *createSchemaStmt = makeNode(CreateSchemaStmt);
|
||||
|
|
|
@ -68,6 +68,9 @@ Datum
|
|||
worker_create_schema(PG_FUNCTION_ARGS)
|
||||
{
|
||||
uint64 jobId = PG_GETARG_INT64(0);
|
||||
text *ownerText = PG_GETARG_TEXT_P(1);
|
||||
char *ownerString = TextDatumGetCString(ownerText);
|
||||
|
||||
|
||||
StringInfo jobSchemaName = JobSchemaName(jobId);
|
||||
CheckCitusVersion(ERROR);
|
||||
|
@ -75,7 +78,7 @@ worker_create_schema(PG_FUNCTION_ARGS)
|
|||
bool schemaExists = JobSchemaExists(jobSchemaName);
|
||||
if (!schemaExists)
|
||||
{
|
||||
CreateJobSchema(jobSchemaName);
|
||||
CreateJobSchema(jobSchemaName, ownerString);
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
|
|
|
@ -37,7 +37,7 @@
|
|||
#define JOB_CLEANUP_TASK_ID INT_MAX
|
||||
|
||||
/* Adaptive executor repartioning related defines */
|
||||
#define WORKER_CREATE_SCHEMA_QUERY "SELECT worker_create_schema (" UINT64_FORMAT ");"
|
||||
#define WORKER_CREATE_SCHEMA_QUERY "SELECT worker_create_schema (" UINT64_FORMAT ", %s);"
|
||||
#define WORKER_REPARTITION_CLEANUP_QUERY "SELECT worker_repartition_cleanup (" \
|
||||
UINT64_FORMAT \
|
||||
");"
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
#include "fmgr.h"
|
||||
|
||||
extern void CreateJobSchema(StringInfo schemaName);
|
||||
extern void CreateJobSchema(StringInfo schemaName, char *schemaOwner);
|
||||
|
||||
/* Function declarations for distributed task management */
|
||||
extern Datum task_tracker_assign_task(PG_FUNCTION_ARGS);
|
||||
|
|
|
@ -155,6 +155,14 @@ SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.
|
|||
0
|
||||
(1 row)
|
||||
|
||||
SET citus.task_executor_type TO 'adaptive';
|
||||
SET citus.enable_repartition_joins TO true;
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
ERROR: operation is not allowed
|
||||
|
@ -235,6 +243,14 @@ SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.
|
|||
0
|
||||
(1 row)
|
||||
|
||||
SET citus.task_executor_type TO 'adaptive';
|
||||
SET citus.enable_repartition_joins TO true;
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
ERROR: operation is not allowed
|
||||
|
@ -277,6 +293,10 @@ ERROR: permission denied for table test
|
|||
-- test re-partition query
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
ERROR: permission denied for table test
|
||||
SET citus.task_executor_type TO 'adaptive';
|
||||
SET citus.enable_repartition_joins TO true;
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
ERROR: permission denied for table test
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
ERROR: operation is not allowed
|
||||
|
|
|
@ -112,6 +112,10 @@ SELECT count(*), min(current_user) FROM test;
|
|||
-- test re-partition query (needs to transmit intermediate results)
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
||||
SET citus.task_executor_type TO 'adaptive';
|
||||
SET citus.enable_repartition_joins TO true;
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
|
||||
|
@ -144,6 +148,10 @@ SELECT count(*), min(current_user) FROM test;
|
|||
-- test re-partition query (needs to transmit intermediate results)
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
||||
SET citus.task_executor_type TO 'adaptive';
|
||||
SET citus.enable_repartition_joins TO true;
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
|
||||
|
@ -176,6 +184,10 @@ SELECT count(*), min(current_user) FROM test;
|
|||
-- test re-partition query
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
||||
SET citus.task_executor_type TO 'adaptive';
|
||||
SET citus.enable_repartition_joins TO true;
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
|
||||
|
|
Loading…
Reference in New Issue