From a02668a38eff8ec14cbfbbbf1f2e15e395de1366 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 3 Feb 2022 15:22:10 +0100 Subject: [PATCH] wip --- .../distributed/commands/dependencies.c | 3 +- src/backend/distributed/metadata/distobject.c | 75 +++++++++++++++---- .../transaction/worker_transaction.c | 12 +++ src/include/distributed/metadata/distobject.h | 1 + src/include/distributed/worker_transaction.h | 1 + 5 files changed, 77 insertions(+), 15 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 9d9839b5a..f5d8cd108 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -122,7 +122,8 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) */ foreach_ptr(dependency, dependenciesWithCommands) { - MarkObjectDistributed(dependency); + /* TODO: add comment why we do this, and why we should NOT user supreuser in any other case */ + MarkObjectDistributedViaSuperUser(dependency); } } diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index fc5d029fb..0edddaba2 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -46,6 +46,8 @@ #include "utils/rel.h" +static void MarkObjectDistributedLocally(const ObjectAddress *distAddress); +static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, Datum *paramValues); @@ -149,6 +151,44 @@ ObjectExists(const ObjectAddress *address) */ void MarkObjectDistributed(const ObjectAddress *distAddress) +{ + MarkObjectDistributedLocally(distAddress); + + if (EnableDependencyCreation) + { + char *workerPgDistObjectUpdateCommand = + CreatePgDistObjectEntryCommand(distAddress); + SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand); + } +} + + +/* + * TODO:add comment + */ +void +MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress) +{ + MarkObjectDistributedLocally(distAddress); + + if (EnableDependencyCreation) + { + char *workerPgDistObjectUpdateCommand = + CreatePgDistObjectEntryCommand(distAddress); + SendCommandToWorkersWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand); + } +} + + +/* + * MarkObjectDistributedLocally marks an object as a distributed object by citus. + * Marking is done by adding appropriate entries to citus.pg_dist_object. + * + * This function should never be called alone, MarkObjectDistributed() or + * MarkObjectDistributedViaSuperUser() should be called. + */ +static void +MarkObjectDistributedLocally(const ObjectAddress *distAddress) { int paramCount = 3; Oid paramTypes[3] = { @@ -171,22 +211,29 @@ MarkObjectDistributed(const ObjectAddress *distAddress) { ereport(ERROR, (errmsg("failed to insert object into citus.pg_dist_object"))); } +} - if (EnableDependencyCreation) - { - /* create a list by adding the address of value to not to have warning */ - List *objectAddressList = list_make1((ObjectAddress *) distAddress); - List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX); - List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID); - List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN); - char *workerPgDistObjectUpdateCommand = - MarkObjectsDistributedCreateCommand(objectAddressList, - distArgumetIndexList, - colocationIdList, - forceDelegationList); - SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand); - } +/* + * tODO: add comment + */ +static char * +CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress) +{ + /* create a list by adding the address of value to not to have warning */ + List *objectAddressList = + list_make1((ObjectAddress *) objectAddress); + List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX); + List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID); + List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN); + + char *workerPgDistObjectUpdateCommand = + MarkObjectsDistributedCreateCommand(objectAddressList, + distArgumetIndexList, + colocationIdList, + forceDelegationList); + + return workerPgDistObjectUpdateCommand; } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 61baff4fe..baa648be3 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -123,6 +123,18 @@ SendCommandToWorkersWithMetadata(const char *command) } +/* + * TODO: add comment + * Discourged to use. + */ +void +SendCommandToWorkersWithMetadataViaSuperUser(const char *command) +{ + SendCommandToMetadataWorkersParams(command, CitusExtensionOwnerName(), + 0, NULL, NULL); +} + + /* * TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the * TargetWorkerSet. diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index 659e8ab7f..472cd83e2 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -23,6 +23,7 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress); extern bool IsObjectDistributed(const ObjectAddress *address); extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); +extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); extern bool IsTableOwnedByExtension(Oid relationId); extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target, diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 63d419c66..c3748ee5b 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -49,6 +49,7 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons List * commandList); extern void SendCommandToWorkersWithMetadata(const char *command); +extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command); extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void EnsureNoModificationsHaveBeenDone(void); extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,