mirror of https://github.com/citusdata/citus.git
wip
parent
b072b9235e
commit
a02668a38e
|
@ -122,7 +122,8 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
|
|||
*/
|
||||
foreach_ptr(dependency, dependenciesWithCommands)
|
||||
{
|
||||
MarkObjectDistributed(dependency);
|
||||
/* TODO: add comment why we do this, and why we should NOT user supreuser in any other case */
|
||||
MarkObjectDistributedViaSuperUser(dependency);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -46,6 +46,8 @@
|
|||
#include "utils/rel.h"
|
||||
|
||||
|
||||
static void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
||||
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
|
||||
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
|
||||
Datum *paramValues);
|
||||
|
||||
|
@ -149,6 +151,44 @@ ObjectExists(const ObjectAddress *address)
|
|||
*/
|
||||
void
|
||||
MarkObjectDistributed(const ObjectAddress *distAddress)
|
||||
{
|
||||
MarkObjectDistributedLocally(distAddress);
|
||||
|
||||
if (EnableDependencyCreation)
|
||||
{
|
||||
char *workerPgDistObjectUpdateCommand =
|
||||
CreatePgDistObjectEntryCommand(distAddress);
|
||||
SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TODO:add comment
|
||||
*/
|
||||
void
|
||||
MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
|
||||
{
|
||||
MarkObjectDistributedLocally(distAddress);
|
||||
|
||||
if (EnableDependencyCreation)
|
||||
{
|
||||
char *workerPgDistObjectUpdateCommand =
|
||||
CreatePgDistObjectEntryCommand(distAddress);
|
||||
SendCommandToWorkersWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkObjectDistributedLocally marks an object as a distributed object by citus.
|
||||
* Marking is done by adding appropriate entries to citus.pg_dist_object.
|
||||
*
|
||||
* This function should never be called alone, MarkObjectDistributed() or
|
||||
* MarkObjectDistributedViaSuperUser() should be called.
|
||||
*/
|
||||
static void
|
||||
MarkObjectDistributedLocally(const ObjectAddress *distAddress)
|
||||
{
|
||||
int paramCount = 3;
|
||||
Oid paramTypes[3] = {
|
||||
|
@ -171,22 +211,29 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
|
|||
{
|
||||
ereport(ERROR, (errmsg("failed to insert object into citus.pg_dist_object")));
|
||||
}
|
||||
}
|
||||
|
||||
if (EnableDependencyCreation)
|
||||
{
|
||||
/* create a list by adding the address of value to not to have warning */
|
||||
List *objectAddressList = list_make1((ObjectAddress *) distAddress);
|
||||
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
|
||||
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
|
||||
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);
|
||||
|
||||
char *workerPgDistObjectUpdateCommand =
|
||||
MarkObjectsDistributedCreateCommand(objectAddressList,
|
||||
distArgumetIndexList,
|
||||
colocationIdList,
|
||||
forceDelegationList);
|
||||
SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand);
|
||||
}
|
||||
/*
|
||||
* tODO: add comment
|
||||
*/
|
||||
static char *
|
||||
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress)
|
||||
{
|
||||
/* create a list by adding the address of value to not to have warning */
|
||||
List *objectAddressList =
|
||||
list_make1((ObjectAddress *) objectAddress);
|
||||
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
|
||||
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
|
||||
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);
|
||||
|
||||
char *workerPgDistObjectUpdateCommand =
|
||||
MarkObjectsDistributedCreateCommand(objectAddressList,
|
||||
distArgumetIndexList,
|
||||
colocationIdList,
|
||||
forceDelegationList);
|
||||
|
||||
return workerPgDistObjectUpdateCommand;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -123,6 +123,18 @@ SendCommandToWorkersWithMetadata(const char *command)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* TODO: add comment
|
||||
* Discourged to use.
|
||||
*/
|
||||
void
|
||||
SendCommandToWorkersWithMetadataViaSuperUser(const char *command)
|
||||
{
|
||||
SendCommandToMetadataWorkersParams(command, CitusExtensionOwnerName(),
|
||||
0, NULL, NULL);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the
|
||||
* TargetWorkerSet.
|
||||
|
|
|
@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
|
|||
extern bool IsObjectDistributed(const ObjectAddress *address);
|
||||
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
||||
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
||||
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
||||
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
||||
extern bool IsTableOwnedByExtension(Oid relationId);
|
||||
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
||||
|
|
|
@ -49,6 +49,7 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons
|
|||
List *
|
||||
commandList);
|
||||
extern void SendCommandToWorkersWithMetadata(const char *command);
|
||||
extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command);
|
||||
extern void SendBareCommandListToMetadataWorkers(List *commandList);
|
||||
extern void EnsureNoModificationsHaveBeenDone(void);
|
||||
extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
|
||||
|
|
Loading…
Reference in New Issue