Add handling for grant/revoke all tables in schema

pull/2195/head
Murat Tuncer 2018-05-30 11:06:03 +03:00
parent b90c54abc0
commit ba50e3f33e
3 changed files with 99 additions and 14 deletions

View File

@ -171,6 +171,7 @@ static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid ol
static void CheckCopyPermissions(CopyStmt *copyStatement);
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
static void PostProcessUtility(Node *parsetree);
static List * CollectGrantTableIdList(GrantStmt *grantStmt);
static void ProcessDropTableStmt(DropStmt *dropTableStatement);
@ -3688,7 +3689,8 @@ PlanGrantStmt(GrantStmt *grantStmt)
StringInfoData targetString;
StringInfoData ddlString;
ListCell *granteeCell = NULL;
ListCell *objectCell = NULL;
List *tableIdList = NIL;
ListCell *tableListCell = NULL;
bool isFirst = true;
List *ddlJobs = NIL;
@ -3701,8 +3703,15 @@ PlanGrantStmt(GrantStmt *grantStmt)
* So far only table level grants are supported. Most other types of
* grants aren't interesting anyway.
*/
if (grantStmt->targtype != ACL_TARGET_OBJECT ||
grantStmt->objtype != RELATION_OBJECT_TYPE)
if (grantStmt->objtype != RELATION_OBJECT_TYPE)
{
return NIL;
}
tableIdList = CollectGrantTableIdList(grantStmt);
/* nothing to do if there is no distributed table in the grant list */
if (tableIdList == NIL)
{
return NIL;
}
@ -3727,7 +3736,13 @@ PlanGrantStmt(GrantStmt *grantStmt)
}
isFirst = false;
Assert(priv->cols == NIL);
if (priv->cols != NIL)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("grant/revoke on column list is currently "
"unsupported")));
}
Assert(priv->priv_name != NULL);
appendStringInfo(&privsString, "%s", priv->priv_name);
@ -3770,20 +3785,16 @@ PlanGrantStmt(GrantStmt *grantStmt)
* only to distributed relations.
*/
isFirst = true;
foreach(objectCell, grantStmt->objects)
foreach(tableListCell, tableIdList)
{
RangeVar *relvar = (RangeVar *) lfirst(objectCell);
Oid relOid = RangeVarGetRelid(relvar, NoLock, false);
Oid relationId = lfirst_oid(tableListCell);
const char *grantOption = "";
DDLJob *ddlJob = NULL;
if (!IsDistributedTable(relOid))
{
continue;
}
Assert(IsDistributedTable(relationId));
resetStringInfo(&targetString);
appendStringInfo(&targetString, "%s", generate_relation_name(relOid, NIL));
appendStringInfo(&targetString, "%s", generate_relation_name(relationId, NIL));
if (grantStmt->is_grant)
{
@ -3809,10 +3820,10 @@ PlanGrantStmt(GrantStmt *grantStmt)
}
ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relOid;
ddlJob->targetRelationId = relationId;
ddlJob->concurrentIndexCmd = false;
ddlJob->commandString = pstrdup(ddlString.data);
ddlJob->taskList = DDLTaskList(relOid, ddlString.data);
ddlJob->taskList = DDLTaskList(relationId, ddlString.data);
ddlJobs = lappend(ddlJobs, ddlJob);
@ -3823,6 +3834,78 @@ PlanGrantStmt(GrantStmt *grantStmt)
}
/*
* CollectGrantTableIdList determines and returns a list of distributed table
* Oids from grant statement.
* Grant statement may appear in two forms
* 1 - grant on table:
* each distributed table oid in grant object list is added to returned list.
* 2 - grant all tables in schema:
* Collect namespace oid list from grant statement
* Add each distributed table oid in the target namespace list to the returned list.
*/
static List *
CollectGrantTableIdList(GrantStmt *grantStmt)
{
List *grantTableList = NIL;
bool grantOnTableCommand = false;
bool grantAllTablesOnSchemaCommand = false;
grantOnTableCommand = (grantStmt->targtype == ACL_TARGET_OBJECT &&
grantStmt->objtype == RELATION_OBJECT_TYPE);
grantAllTablesOnSchemaCommand = (grantStmt->targtype == ACL_TARGET_ALL_IN_SCHEMA &&
grantStmt->objtype == RELATION_OBJECT_TYPE);
/* we are only interested in table level grants */
if (!grantOnTableCommand && !grantAllTablesOnSchemaCommand)
{
return NIL;
}
if (grantAllTablesOnSchemaCommand)
{
List *distTableOidList = DistTableOidList();
ListCell *distributedTableOidCell = NULL;
List *namespaceOidList = NIL;
ListCell *objectCell = NULL;
foreach(objectCell, grantStmt->objects)
{
char *nspname = strVal(lfirst(objectCell));
bool missing_ok = false;
Oid namespaceOid = get_namespace_oid(nspname, missing_ok);
Assert(namespaceOid != InvalidOid);
namespaceOidList = list_append_unique_oid(namespaceOidList, namespaceOid);
}
foreach(distributedTableOidCell, distTableOidList)
{
Oid relationId = lfirst_oid(distributedTableOidCell);
Oid namespaceOid = get_rel_namespace(relationId);
if (list_member_oid(namespaceOidList, namespaceOid))
{
grantTableList = lappend_oid(grantTableList, relationId);
}
}
}
else
{
ListCell *objectCell = NULL;
foreach(objectCell, grantStmt->objects)
{
RangeVar *relvar = (RangeVar *) lfirst(objectCell);
Oid relationId = RangeVarGetRelid(relvar, NoLock, false);
if (IsDistributedTable(relationId))
{
grantTableList = lappend_oid(grantTableList, relationId);
}
}
}
return grantTableList;
}
/*
* ProcessDropTableStmt processes DROP TABLE commands for partitioned tables.
* If we are trying to DROP partitioned tables, we first need to go to MX nodes

View File

@ -2264,6 +2264,7 @@ SELECT master_create_worker_shards('failure_test', 2);
(1 row)
SET citus.enable_ddl_propagation TO off;
CREATE USER router_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.

View File

@ -1058,6 +1058,7 @@ CREATE TABLE failure_test (a int, b int);
SELECT master_create_distributed_table('failure_test', 'a', 'hash');
SELECT master_create_worker_shards('failure_test', 2);
SET citus.enable_ddl_propagation TO off;
CREATE USER router_user;
GRANT INSERT ON ALL TABLES IN SCHEMA public TO router_user;
\c - - - :worker_1_port