mirror of https://github.com/citusdata/citus.git
Merge pull request #5684 from citusdata/velioglu/superuser_connection_for_dep
Use super user connection while propagating dependent objects' pg_dist_object entriespull/5601/head
commit
6376eaf0e0
|
@ -120,7 +120,15 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
|
||||||
*/
|
*/
|
||||||
foreach_ptr(dependency, dependenciesWithCommands)
|
foreach_ptr(dependency, dependenciesWithCommands)
|
||||||
{
|
{
|
||||||
MarkObjectDistributed(dependency);
|
/*
|
||||||
|
* pg_dist_object entries must be propagated with the super user, since
|
||||||
|
* the owner of the target object may not own dependencies but we must
|
||||||
|
* propagate as we send objects itself with the superuser.
|
||||||
|
*
|
||||||
|
* Only dependent object's metadata should be propagated with super user.
|
||||||
|
* Metadata of the table itself must be propagated with the current user.
|
||||||
|
*/
|
||||||
|
MarkObjectDistributedViaSuperUser(dependency);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,8 @@
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
|
||||||
|
static void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
||||||
|
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
||||||
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
||||||
Datum *paramValues);
|
Datum *paramValues);
|
||||||
|
|
||||||
|
@ -141,14 +143,60 @@ ObjectExists(const ObjectAddress *address)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MarkObjectDistributed marks an object as a distributed object by citus. Marking is done
|
* MarkObjectDistributed marks an object as a distributed object. Marking is done
|
||||||
* by adding appropriate entries to citus.pg_dist_object.
|
* by adding appropriate entries to citus.pg_dist_object and also marking the object
|
||||||
|
* as distributed by opening a connection using current user to all of the workers
|
||||||
|
* with metadata if object propagation is on.
|
||||||
*
|
*
|
||||||
* This also marks the object as distributed on all of the workers with metadata
|
* This function should be used if the user creating the given object. If you want
|
||||||
* if object propagation is on.
|
* to mark dependent objects as distributed check MarkObjectDistributedViaSuperUser.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
MarkObjectDistributed(const ObjectAddress *distAddress)
|
MarkObjectDistributed(const ObjectAddress *distAddress)
|
||||||
|
{
|
||||||
|
MarkObjectDistributedLocally(distAddress);
|
||||||
|
|
||||||
|
if (EnableMetadataSync)
|
||||||
|
{
|
||||||
|
char *workerPgDistObjectUpdateCommand =
|
||||||
|
CreatePgDistObjectEntryCommand(distAddress);
|
||||||
|
SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MarkObjectDistributedViaSuperUser marks an object as a distributed object. Marking
|
||||||
|
* is done by adding appropriate entries to citus.pg_dist_object and also marking the
|
||||||
|
* object as distributed by opening a connection using super user to all of the workers
|
||||||
|
* with metadata if object propagation is on.
|
||||||
|
*
|
||||||
|
* This function should be used to mark dependent object as distributed. If you want
|
||||||
|
* to mark the object you are creating please check MarkObjectDistributed.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
|
||||||
|
{
|
||||||
|
MarkObjectDistributedLocally(distAddress);
|
||||||
|
|
||||||
|
if (EnableMetadataSync)
|
||||||
|
{
|
||||||
|
char *workerPgDistObjectUpdateCommand =
|
||||||
|
CreatePgDistObjectEntryCommand(distAddress);
|
||||||
|
SendCommandToWorkersWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MarkObjectDistributedLocally marks an object as a distributed object by citus.
|
||||||
|
* Marking is done by adding appropriate entries to citus.pg_dist_object.
|
||||||
|
*
|
||||||
|
* This function should never be called alone, MarkObjectDistributed() or
|
||||||
|
* MarkObjectDistributedViaSuperUser() should be called.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
MarkObjectDistributedLocally(const ObjectAddress *distAddress)
|
||||||
{
|
{
|
||||||
int paramCount = 3;
|
int paramCount = 3;
|
||||||
Oid paramTypes[3] = {
|
Oid paramTypes[3] = {
|
||||||
|
@ -161,32 +209,38 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
|
||||||
ObjectIdGetDatum(distAddress->objectId),
|
ObjectIdGetDatum(distAddress->objectId),
|
||||||
Int32GetDatum(distAddress->objectSubId)
|
Int32GetDatum(distAddress->objectSubId)
|
||||||
};
|
};
|
||||||
|
|
||||||
char *insertQuery = "INSERT INTO citus.pg_dist_object (classid, objid, objsubid) "
|
char *insertQuery = "INSERT INTO citus.pg_dist_object (classid, objid, objsubid) "
|
||||||
"VALUES ($1, $2, $3) ON CONFLICT DO NOTHING";
|
"VALUES ($1, $2, $3) ON CONFLICT DO NOTHING";
|
||||||
|
|
||||||
int spiStatus = ExecuteCommandAsSuperuser(insertQuery, paramCount, paramTypes,
|
int spiStatus = ExecuteCommandAsSuperuser(insertQuery, paramCount, paramTypes,
|
||||||
paramValues);
|
paramValues);
|
||||||
if (spiStatus < 0)
|
if (spiStatus < 0)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("failed to insert object into citus.pg_dist_object")));
|
ereport(ERROR, (errmsg("failed to insert object into citus.pg_dist_object")));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (EnableMetadataSync)
|
|
||||||
{
|
|
||||||
/* create a list by adding the address of value to not to have warning */
|
|
||||||
List *objectAddressList = list_make1((ObjectAddress *) distAddress);
|
|
||||||
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
|
|
||||||
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
|
|
||||||
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);
|
|
||||||
|
|
||||||
char *workerPgDistObjectUpdateCommand =
|
/*
|
||||||
MarkObjectsDistributedCreateCommand(objectAddressList,
|
* CreatePgDistObjectEntryCommand creates command to insert pg_dist_object tuple
|
||||||
distArgumetIndexList,
|
* for the given object address.
|
||||||
colocationIdList,
|
*/
|
||||||
forceDelegationList);
|
static char *
|
||||||
SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand);
|
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress)
|
||||||
}
|
{
|
||||||
|
/* create a list by adding the address of value to not to have warning */
|
||||||
|
List *objectAddressList =
|
||||||
|
list_make1((ObjectAddress *) objectAddress);
|
||||||
|
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
|
||||||
|
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
|
||||||
|
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);
|
||||||
|
|
||||||
|
char *workerPgDistObjectUpdateCommand =
|
||||||
|
MarkObjectsDistributedCreateCommand(objectAddressList,
|
||||||
|
distArgumetIndexList,
|
||||||
|
colocationIdList,
|
||||||
|
forceDelegationList);
|
||||||
|
|
||||||
|
return workerPgDistObjectUpdateCommand;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,6 @@ static void ErrorIfCurrentUserCanNotDistributeObject(ObjectType type,
|
||||||
ObjectAddress *addr,
|
ObjectAddress *addr,
|
||||||
Node *node,
|
Node *node,
|
||||||
Relation *relation);
|
Relation *relation);
|
||||||
static void ErrorIfUserNotAllowedToPropagateExtension(char *extensionName);
|
|
||||||
static List * textarray_to_strvaluelist(ArrayType *arr);
|
static List * textarray_to_strvaluelist(ArrayType *arr);
|
||||||
|
|
||||||
/* It is defined on PG >= 13 versions by default */
|
/* It is defined on PG >= 13 versions by default */
|
||||||
|
@ -398,9 +397,6 @@ ErrorIfCurrentUserCanNotDistributeObject(ObjectType type, ObjectAddress *addr,
|
||||||
Node *node, Relation *relation)
|
Node *node, Relation *relation)
|
||||||
{
|
{
|
||||||
Oid userId = GetUserId();
|
Oid userId = GetUserId();
|
||||||
AclMode aclMaskResult = 0;
|
|
||||||
bool skipAclCheck = false;
|
|
||||||
Oid idToCheck = InvalidOid;
|
|
||||||
|
|
||||||
if (!SupportedDependencyByCitus(addr))
|
if (!SupportedDependencyByCitus(addr))
|
||||||
{
|
{
|
||||||
|
@ -410,27 +406,19 @@ ErrorIfCurrentUserCanNotDistributeObject(ObjectType type, ObjectAddress *addr,
|
||||||
switch (type)
|
switch (type)
|
||||||
{
|
{
|
||||||
case OBJECT_SCHEMA:
|
case OBJECT_SCHEMA:
|
||||||
{
|
case OBJECT_DATABASE:
|
||||||
idToCheck = addr->objectId;
|
|
||||||
aclMaskResult = pg_namespace_aclmask(idToCheck, userId, ACL_USAGE,
|
|
||||||
ACLMASK_ANY);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case OBJECT_FUNCTION:
|
case OBJECT_FUNCTION:
|
||||||
case OBJECT_PROCEDURE:
|
case OBJECT_PROCEDURE:
|
||||||
case OBJECT_AGGREGATE:
|
case OBJECT_AGGREGATE:
|
||||||
|
case OBJECT_TYPE:
|
||||||
|
case OBJECT_FOREIGN_SERVER:
|
||||||
|
case OBJECT_SEQUENCE:
|
||||||
|
case OBJECT_FOREIGN_TABLE:
|
||||||
|
case OBJECT_TABLE:
|
||||||
|
case OBJECT_EXTENSION:
|
||||||
|
case OBJECT_COLLATION:
|
||||||
{
|
{
|
||||||
check_object_ownership(userId, type, *addr, node, *relation);
|
check_object_ownership(userId, type, *addr, node, *relation);
|
||||||
skipAclCheck = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case OBJECT_DATABASE:
|
|
||||||
{
|
|
||||||
idToCheck = addr->objectId;
|
|
||||||
aclMaskResult = pg_database_aclmask(idToCheck, userId, ACL_CONNECT,
|
|
||||||
ACLMASK_ANY);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -443,54 +431,6 @@ ErrorIfCurrentUserCanNotDistributeObject(ObjectType type, ObjectAddress *addr,
|
||||||
"access privileges on role %d with type %d",
|
"access privileges on role %d with type %d",
|
||||||
addr->objectId, type)));
|
addr->objectId, type)));
|
||||||
}
|
}
|
||||||
skipAclCheck = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case OBJECT_TYPE:
|
|
||||||
{
|
|
||||||
idToCheck = addr->objectId;
|
|
||||||
aclMaskResult = pg_type_aclmask(idToCheck, userId, ACL_USAGE,
|
|
||||||
ACLMASK_ANY);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case OBJECT_FOREIGN_SERVER:
|
|
||||||
{
|
|
||||||
idToCheck = addr->objectId;
|
|
||||||
aclMaskResult = pg_foreign_server_aclmask(idToCheck, userId, ACL_USAGE,
|
|
||||||
ACLMASK_ANY);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case OBJECT_SEQUENCE:
|
|
||||||
{
|
|
||||||
idToCheck = addr->objectId;
|
|
||||||
aclMaskResult = pg_class_aclmask(idToCheck, userId, ACL_USAGE, ACLMASK_ANY);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case OBJECT_FOREIGN_TABLE:
|
|
||||||
case OBJECT_TABLE:
|
|
||||||
{
|
|
||||||
/* table distribution already does the ownership check, so we can stick to that over acl_check */
|
|
||||||
check_object_ownership(userId, type, *addr, node, *relation);
|
|
||||||
skipAclCheck = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case OBJECT_EXTENSION:
|
|
||||||
{
|
|
||||||
Value *valueNode = (Value *) node;
|
|
||||||
char *extensionName = strVal(valueNode);
|
|
||||||
ErrorIfUserNotAllowedToPropagateExtension(extensionName);
|
|
||||||
skipAclCheck = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case OBJECT_COLLATION:
|
|
||||||
{
|
|
||||||
skipAclCheck = true;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -501,119 +441,6 @@ ErrorIfCurrentUserCanNotDistributeObject(ObjectType type, ObjectAddress *addr,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!skipAclCheck && aclMaskResult == ACL_NO_RIGHTS)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("Current user does not have required privileges "
|
|
||||||
"on %d with type id %d to distribute it",
|
|
||||||
idToCheck, type)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ErrorIfUserNotAllowedToPropagateExtension errors out if the current user does
|
|
||||||
* not have required privileges to propagate extension
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ErrorIfUserNotAllowedToPropagateExtension(char *extensionName)
|
|
||||||
{
|
|
||||||
const int nameAttributeIndex = 1;
|
|
||||||
const int superuserAttributeIndex = 4;
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
|
||||||
const int trustedAttributeIndex = 5;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
LOCAL_FCINFO(fcinfo, 0);
|
|
||||||
FmgrInfo flinfo;
|
|
||||||
|
|
||||||
bool goForward = true;
|
|
||||||
bool doCopy = false;
|
|
||||||
|
|
||||||
EState *estate = CreateExecutorState();
|
|
||||||
ReturnSetInfo *extensionsResultSet = makeNode(ReturnSetInfo);
|
|
||||||
extensionsResultSet->econtext = GetPerTupleExprContext(estate);
|
|
||||||
extensionsResultSet->allowedModes = SFRM_Materialize;
|
|
||||||
|
|
||||||
fmgr_info(F_PG_AVAILABLE_EXTENSION_VERSIONS, &flinfo);
|
|
||||||
InitFunctionCallInfoData(*fcinfo, &flinfo, 0, InvalidOid, NULL,
|
|
||||||
(Node *) extensionsResultSet);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* pg_available_extensions_versions returns result set containing all
|
|
||||||
* available extension versions with whether the extension requires
|
|
||||||
* superuser and it is trusted information.
|
|
||||||
*/
|
|
||||||
(*pg_available_extension_versions)(fcinfo);
|
|
||||||
|
|
||||||
TupleTableSlot *tupleTableSlot = MakeSingleTupleTableSlotCompat(
|
|
||||||
extensionsResultSet->setDesc,
|
|
||||||
&TTSOpsMinimalTuple);
|
|
||||||
bool hasTuple = tuplestore_gettupleslot(extensionsResultSet->setResult,
|
|
||||||
goForward,
|
|
||||||
doCopy,
|
|
||||||
tupleTableSlot);
|
|
||||||
while (hasTuple)
|
|
||||||
{
|
|
||||||
bool isNull = false;
|
|
||||||
Datum curExtensionNameDatum = slot_getattr(tupleTableSlot,
|
|
||||||
nameAttributeIndex,
|
|
||||||
&isNull);
|
|
||||||
char *curExtensionName = NameStr(*DatumGetName(curExtensionNameDatum));
|
|
||||||
if (strcmp(extensionName, curExtensionName) == 0)
|
|
||||||
{
|
|
||||||
Datum superuserExpectedDatum = slot_getattr(tupleTableSlot,
|
|
||||||
superuserAttributeIndex,
|
|
||||||
&isNull);
|
|
||||||
bool superuserExpected = DatumGetBool(superuserExpectedDatum);
|
|
||||||
|
|
||||||
#if PG_VERSION_NUM < PG_VERSION_13
|
|
||||||
if (superuserExpected)
|
|
||||||
{
|
|
||||||
EnsureSuperUser();
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
if (superuserExpected)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* After PG 13, if the extension is trusted it can be created
|
|
||||||
* by the user having CREATE privilege on the database even if
|
|
||||||
* the extension requires superuser.
|
|
||||||
*/
|
|
||||||
Datum trustedExtensionDatum = slot_getattr(tupleTableSlot,
|
|
||||||
trustedAttributeIndex,
|
|
||||||
&isNull);
|
|
||||||
bool trustedExtension = DatumGetBool(trustedExtensionDatum);
|
|
||||||
|
|
||||||
if (trustedExtension)
|
|
||||||
{
|
|
||||||
/* Allow if user has CREATE privilege on current database */
|
|
||||||
AclResult aclresult = pg_database_aclcheck(MyDatabaseId,
|
|
||||||
GetUserId(),
|
|
||||||
ACL_CREATE);
|
|
||||||
if (aclresult != ACLCHECK_OK)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("operation is not allowed"),
|
|
||||||
errhint("Must have CREATE privilege "
|
|
||||||
"on database to propagate "
|
|
||||||
"extension %s", curExtensionName)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
EnsureSuperUser();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
ExecClearTuple(tupleTableSlot);
|
|
||||||
hasTuple = tuplestore_gettupleslot(extensionsResultSet->setResult, goForward,
|
|
||||||
doCopy, tupleTableSlot);
|
|
||||||
}
|
|
||||||
|
|
||||||
ExecDropSingleTupleTableSlot(tupleTableSlot);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -112,8 +112,7 @@ SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *node
|
||||||
/*
|
/*
|
||||||
* SendCommandToWorkers sends a command to all workers in
|
* SendCommandToWorkers sends a command to all workers in
|
||||||
* parallel. Commands are committed on the workers when the local
|
* parallel. Commands are committed on the workers when the local
|
||||||
* transaction commits. The connection are made as the extension
|
* transaction commits.
|
||||||
* owner to ensure write access to the Citus metadata tables.
|
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
SendCommandToWorkersWithMetadata(const char *command)
|
SendCommandToWorkersWithMetadata(const char *command)
|
||||||
|
@ -123,6 +122,24 @@ SendCommandToWorkersWithMetadata(const char *command)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SendCommandToWorkersWithMetadataViaSuperUser sends a command to all workers in
|
||||||
|
* parallel by opening a super user connection. Commands are committed on the workers
|
||||||
|
* when the local transaction commits. The connection are made as the extension
|
||||||
|
* owner to ensure write access to the Citus metadata tables.
|
||||||
|
*
|
||||||
|
* Since we prevent to open superuser connections for metadata tables, it is
|
||||||
|
* discourated to use it. Consider using it only for propagating pg_dist_object
|
||||||
|
* tuples for dependent objects.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
SendCommandToWorkersWithMetadataViaSuperUser(const char *command)
|
||||||
|
{
|
||||||
|
SendCommandToMetadataWorkersParams(command, CitusExtensionOwnerName(),
|
||||||
|
0, NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the
|
* TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the
|
||||||
* TargetWorkerSet.
|
* TargetWorkerSet.
|
||||||
|
|
|
@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
|
||||||
extern bool IsObjectDistributed(const ObjectAddress *address);
|
extern bool IsObjectDistributed(const ObjectAddress *address);
|
||||||
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
||||||
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
||||||
|
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
||||||
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
||||||
extern bool IsTableOwnedByExtension(Oid relationId);
|
extern bool IsTableOwnedByExtension(Oid relationId);
|
||||||
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
||||||
|
|
|
@ -49,6 +49,7 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons
|
||||||
List *
|
List *
|
||||||
commandList);
|
commandList);
|
||||||
extern void SendCommandToWorkersWithMetadata(const char *command);
|
extern void SendCommandToWorkersWithMetadata(const char *command);
|
||||||
|
extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command);
|
||||||
extern void SendBareCommandListToMetadataWorkers(List *commandList);
|
extern void SendBareCommandListToMetadataWorkers(List *commandList);
|
||||||
extern void EnsureNoModificationsHaveBeenDone(void);
|
extern void EnsureNoModificationsHaveBeenDone(void);
|
||||||
extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
|
extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
|
||||||
|
|
|
@ -664,6 +664,22 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
|
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
|
||||||
ERROR: must be owner of function distribution_test_function
|
ERROR: must be owner of function distribution_test_function
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus_internal';
|
||||||
|
\set VERBOSITY terse
|
||||||
|
CREATE TYPE distributed_test_type AS (a int, b int);
|
||||||
|
SET ROLE metadata_sync_helper_role;
|
||||||
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
|
||||||
|
AS (VALUES ('type', ARRAY['distributed_test_type']::text[], ARRAY[]::text[], -1, 0, false))
|
||||||
|
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
|
||||||
|
ERROR: must be owner of type distributed_test_type
|
||||||
|
ROLLBACK;
|
||||||
-- we do not allow wrong partmethod
|
-- we do not allow wrong partmethod
|
||||||
-- so manually insert wrong partmethod for the sake of the test
|
-- so manually insert wrong partmethod for the sake of the test
|
||||||
SET search_path TO metadata_sync_helpers;
|
SET search_path TO metadata_sync_helpers;
|
||||||
|
|
|
@ -79,6 +79,21 @@ SELECT create_distributed_function('test_function(int)');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Create and distribute plpgsql extension's function
|
||||||
|
CREATE OR REPLACE FUNCTION plpgsql_dist_function(text)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE plpgsql AS
|
||||||
|
$$
|
||||||
|
BEGIN
|
||||||
|
RAISE NOTICE '%', $1;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
SELECT create_distributed_function('plpgsql_dist_function(text)');
|
||||||
|
create_distributed_function
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- show that schema, types, function and sequence has marked as distributed
|
-- show that schema, types, function and sequence has marked as distributed
|
||||||
-- on the coordinator node
|
-- on the coordinator node
|
||||||
RESET ROLE;
|
RESET ROLE;
|
||||||
|
@ -124,6 +139,12 @@ SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dis
|
||||||
(function,"{local_schema,test_function}",{integer})
|
(function,"{local_schema,test_function}",{integer})
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.plpgsql_dist_function'::regproc::oid;
|
||||||
|
pg_identify_object_as_address
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(function,"{local_schema,plpgsql_dist_function}",{pg_catalog.text})
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- show those objects marked as distributed on metadata worker node as well
|
-- show those objects marked as distributed on metadata worker node as well
|
||||||
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema'::regnamespace::oid;$$) ORDER BY 1,2;
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema'::regnamespace::oid;$$) ORDER BY 1,2;
|
||||||
nodename | nodeport | success | result
|
nodename | nodeport | success | result
|
||||||
|
@ -174,6 +195,27 @@ SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(clas
|
||||||
localhost | 57638 | t | (function,"{local_schema,test_function}",{integer})
|
localhost | 57638 | t | (function,"{local_schema,test_function}",{integer})
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.plpgsql_dist_function'::regproc::oid;$$) ORDER BY 1,2;
|
||||||
|
nodename | nodeport | success | result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
localhost | 57637 | t | (function,"{local_schema,plpgsql_dist_function}",{pg_catalog.text})
|
||||||
|
localhost | 57638 | t | (function,"{local_schema,plpgsql_dist_function}",{pg_catalog.text})
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Show that extension plpgsql is also marked as distributed as a dependency of plpgsl_dist_function
|
||||||
|
SELECT * FROM (SELECT pg_identify_object_as_address(classid, objid, objsubid) as obj_identifier from citus.pg_dist_object) as obj_identifiers where obj_identifier::text like '%{plpgsql}%';
|
||||||
|
obj_identifier
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(extension,{plpgsql},{})
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM run_command_on_workers($$SELECT * FROM (SELECT pg_identify_object_as_address(classid, objid, objsubid) as obj_identifier from citus.pg_dist_object) as obj_identifiers where obj_identifier::text like '%{plpgsql}%';$$) ORDER BY 1,2;
|
||||||
|
nodename | nodeport | success | result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
localhost | 57637 | t | (extension,{plpgsql},{})
|
||||||
|
localhost | 57638 | t | (extension,{plpgsql},{})
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
-- show that schema is owned by the superuser
|
-- show that schema is owned by the superuser
|
||||||
SELECT rolname FROM pg_roles JOIN pg_namespace ON(pg_namespace.nspowner = pg_roles.oid) WHERE nspname = 'local_schema';
|
SELECT rolname FROM pg_roles JOIN pg_namespace ON(pg_namespace.nspowner = pg_roles.oid) WHERE nspname = 'local_schema';
|
||||||
rolname
|
rolname
|
||||||
|
@ -372,8 +414,9 @@ SELECT * FROM run_command_on_workers($$ SELECT distribution_argument_index FROM
|
||||||
|
|
||||||
-- Show that dropping schema doesn't affect the worker node
|
-- Show that dropping schema doesn't affect the worker node
|
||||||
DROP SCHEMA local_schema CASCADE;
|
DROP SCHEMA local_schema CASCADE;
|
||||||
NOTICE: drop cascades to 2 other objects
|
NOTICE: drop cascades to 3 other objects
|
||||||
DETAIL: drop cascades to table metadata_dist_test_table
|
DETAIL: drop cascades to function plpgsql_dist_function(text)
|
||||||
|
drop cascades to table metadata_dist_test_table
|
||||||
drop cascades to function metadata_dist_test_proc(integer,integer)
|
drop cascades to function metadata_dist_test_proc(integer,integer)
|
||||||
SELECT * FROM (SELECT pg_identify_object_as_address(classid, objid, objsubid) as obj_identifier from citus.pg_dist_object) as obj_identifiers where obj_identifier::text like '%{local_schema}%';
|
SELECT * FROM (SELECT pg_identify_object_as_address(classid, objid, objsubid) as obj_identifier from citus.pg_dist_object) as obj_identifiers where obj_identifier::text like '%{local_schema}%';
|
||||||
obj_identifier
|
obj_identifier
|
||||||
|
|
|
@ -425,6 +425,19 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
|
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus_internal';
|
||||||
|
\set VERBOSITY terse
|
||||||
|
|
||||||
|
CREATE TYPE distributed_test_type AS (a int, b int);
|
||||||
|
|
||||||
|
SET ROLE metadata_sync_helper_role;
|
||||||
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
|
||||||
|
AS (VALUES ('type', ARRAY['distributed_test_type']::text[], ARRAY[]::text[], -1, 0, false))
|
||||||
|
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
-- we do not allow wrong partmethod
|
-- we do not allow wrong partmethod
|
||||||
-- so manually insert wrong partmethod for the sake of the test
|
-- so manually insert wrong partmethod for the sake of the test
|
||||||
SET search_path TO metadata_sync_helpers;
|
SET search_path TO metadata_sync_helpers;
|
||||||
|
|
|
@ -55,6 +55,18 @@ SET search_path TO local_schema;
|
||||||
SELECT create_distributed_table('dist_table', 'a');
|
SELECT create_distributed_table('dist_table', 'a');
|
||||||
SELECT create_distributed_function('test_function(int)');
|
SELECT create_distributed_function('test_function(int)');
|
||||||
|
|
||||||
|
-- Create and distribute plpgsql extension's function
|
||||||
|
CREATE OR REPLACE FUNCTION plpgsql_dist_function(text)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE plpgsql AS
|
||||||
|
$$
|
||||||
|
BEGIN
|
||||||
|
RAISE NOTICE '%', $1;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
|
SELECT create_distributed_function('plpgsql_dist_function(text)');
|
||||||
|
|
||||||
-- show that schema, types, function and sequence has marked as distributed
|
-- show that schema, types, function and sequence has marked as distributed
|
||||||
-- on the coordinator node
|
-- on the coordinator node
|
||||||
RESET ROLE;
|
RESET ROLE;
|
||||||
|
@ -65,6 +77,7 @@ SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dis
|
||||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'test_sequence_schema.test_sequence'::regclass::oid;
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'test_sequence_schema.test_sequence'::regclass::oid;
|
||||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.dist_table_e_seq'::regclass::oid;
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.dist_table_e_seq'::regclass::oid;
|
||||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.test_function'::regproc::oid;
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.test_function'::regproc::oid;
|
||||||
|
SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.plpgsql_dist_function'::regproc::oid;
|
||||||
|
|
||||||
-- show those objects marked as distributed on metadata worker node as well
|
-- show those objects marked as distributed on metadata worker node as well
|
||||||
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema'::regnamespace::oid;$$) ORDER BY 1,2;
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema'::regnamespace::oid;$$) ORDER BY 1,2;
|
||||||
|
@ -74,6 +87,11 @@ SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(clas
|
||||||
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'test_sequence_schema.test_sequence'::regclass::oid;$$) ORDER BY 1,2;
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'test_sequence_schema.test_sequence'::regclass::oid;$$) ORDER BY 1,2;
|
||||||
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.dist_table_e_seq'::regclass::oid;$$) ORDER BY 1,2;
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.dist_table_e_seq'::regclass::oid;$$) ORDER BY 1,2;
|
||||||
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.test_function'::regproc::oid;$$) ORDER BY 1,2;
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.test_function'::regproc::oid;$$) ORDER BY 1,2;
|
||||||
|
SELECT * FROM run_command_on_workers($$SELECT pg_identify_object_as_address(classid, objid, objsubid) from citus.pg_dist_object where objid = 'local_schema.plpgsql_dist_function'::regproc::oid;$$) ORDER BY 1,2;
|
||||||
|
|
||||||
|
-- Show that extension plpgsql is also marked as distributed as a dependency of plpgsl_dist_function
|
||||||
|
SELECT * FROM (SELECT pg_identify_object_as_address(classid, objid, objsubid) as obj_identifier from citus.pg_dist_object) as obj_identifiers where obj_identifier::text like '%{plpgsql}%';
|
||||||
|
SELECT * FROM run_command_on_workers($$SELECT * FROM (SELECT pg_identify_object_as_address(classid, objid, objsubid) as obj_identifier from citus.pg_dist_object) as obj_identifiers where obj_identifier::text like '%{plpgsql}%';$$) ORDER BY 1,2;
|
||||||
|
|
||||||
-- show that schema is owned by the superuser
|
-- show that schema is owned by the superuser
|
||||||
SELECT rolname FROM pg_roles JOIN pg_namespace ON(pg_namespace.nspowner = pg_roles.oid) WHERE nspname = 'local_schema';
|
SELECT rolname FROM pg_roles JOIN pg_namespace ON(pg_namespace.nspowner = pg_roles.oid) WHERE nspname = 'local_schema';
|
||||||
|
|
Loading…
Reference in New Issue