diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index c999ae8da..d7ea50024 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -1110,7 +1110,7 @@ pg_get_indexclusterdef_string(Oid indexRelationId) /* * pg_get_table_grants returns a list of sql statements which recreate the - * permissions for a specific table. + * permissions for a specific table, including attributes privileges. * * This function is modeled after aclexplode(), don't change too heavily. */ @@ -1136,10 +1136,11 @@ pg_get_table_grants(Oid relationId) errmsg("relation with OID %u does not exist", relationId))); } + Form_pg_class classForm = (Form_pg_class) GETSTRUCT(classTuple); + AttrNumber nattrs = classForm->relnatts; Datum aclDatum = SysCacheGetAttr(RELOID, classTuple, Anum_pg_class_relacl, &isNull); - ReleaseSysCache(classTuple); if (!isNull) @@ -1238,6 +1239,110 @@ pg_get_table_grants(Oid relationId) resetStringInfo(&buffer); + /* lookup all attribute level grants */ + for (AttrNumber attNum = 1; attNum <= nattrs; attNum++) + { + HeapTuple attTuple = SearchSysCache2(ATTNUM, ObjectIdGetDatum(relationId), + Int16GetDatum(attNum)); + if (!HeapTupleIsValid(attTuple)) + { + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("attribute with OID %u does not exist", + attNum))); + } + + Form_pg_attribute thisAttribute = (Form_pg_attribute) GETSTRUCT(attTuple); + + /* ignore dropped columns */ + if (thisAttribute->attisdropped) + { + ReleaseSysCache(attTuple); + continue; + } + + Datum aclAttDatum = SysCacheGetAttr(ATTNUM, attTuple, Anum_pg_attribute_attacl, + &isNull); + if (!isNull) + { + /* iterate through the acl datastructure, emit GRANTs */ + Acl *acl = DatumGetAclP(aclAttDatum); + AclItem *aidat = ACL_DAT(acl); + + int offtype = -1; + int i = 0; + while (i < ACL_NUM(acl)) + { + AclItem *aidata = NULL; + AclMode priv_bit = 0; + + offtype++; + + if (offtype == N_ACL_RIGHTS) + { + offtype = 0; + i++; + if (i >= ACL_NUM(acl)) /* done */ + { + break; + } + } + + aidata = &aidat[i]; + priv_bit = 1 << offtype; + + if (ACLITEM_GET_PRIVS(*aidata) & priv_bit) + { + const char *roleName = NULL; + const char *withGrant = ""; + + if (aidata->ai_grantee != 0) + { + + HeapTuple htup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(aidata->ai_grantee)); + if (HeapTupleIsValid(htup)) + { + Form_pg_authid authForm = ((Form_pg_authid) GETSTRUCT(htup)); + + roleName = quote_identifier(NameStr(authForm->rolname)); + + ReleaseSysCache(htup); + } + else + { + elog(ERROR, "cache lookup failed for role %u", aidata->ai_grantee); + } + } + else + { + roleName = "PUBLIC"; + } + + if ((ACLITEM_GET_GOPTIONS(*aidata) & priv_bit) != 0) + { + withGrant = " WITH GRANT OPTION"; + } + + appendStringInfo(&buffer, "GRANT %s(%s) ON %s TO %s%s", + convert_aclright_to_string(priv_bit), + quote_identifier(NameStr(thisAttribute->attname)), + relationName, + roleName, + withGrant); + + defs = lappend(defs, pstrdup(buffer.data)); + + resetStringInfo(&buffer); + } + } + + /* if we have a detoasted copy, free it */ + if ((Pointer) acl != DatumGetPointer(aclAttDatum)) + pfree(acl); + } + ReleaseSysCache(attTuple); + } + relation_close(relation, NoLock); return defs; /* *INDENT-ON* */