Try UnmarkObjectDistributedLocally

velioglu/unmark_objects
Burak Velioglu 2022-03-15 01:28:44 +03:00
parent 36b33e2491
commit 70137a3768
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
10 changed files with 161 additions and 8 deletions

View File

@ -90,7 +90,14 @@ citus_unmark_object_distributed(PG_FUNCTION_ARGS)
errhint("drop the object via a DROP command"))); errhint("drop the object via a DROP command")));
} }
UnmarkObjectDistributed(&address); if (IsCoordinator())
{
UnmarkObjectDistributed(&address);
}
else
{
UnmarkObjectDistributedLocally(&address);
}
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -312,6 +319,23 @@ ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
*/ */
void void
UnmarkObjectDistributed(const ObjectAddress *address) UnmarkObjectDistributed(const ObjectAddress *address)
{
UnmarkObjectDistributedLocally(address);
if (EnableMetadataSync)
{
char *pgDistObjectDropCommand = DropDistributedObjectCommand(address);
SendCommandToWorkersWithMetadataViaSuperUser(pgDistObjectDropCommand);
}
}
/*
* UnmarkObjectDistributedLocally removes the entry from pg_dist_object that marks this
* object as distributed locally.
*/
void
UnmarkObjectDistributedLocally(const ObjectAddress *address)
{ {
int paramCount = 3; int paramCount = 3;
Oid paramTypes[3] = { Oid paramTypes[3] = {

View File

@ -153,6 +153,7 @@ PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation); PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation);
PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata);
PG_FUNCTION_INFO_V1(citus_internal_drop_object_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata);
@ -963,7 +964,7 @@ citus_internal_add_object_metadata(PG_FUNCTION_ARGS)
* argsArray parameters * argsArray parameters
*/ */
ObjectAddress objectAddress = PgGetObjectAddress(textType, nameArray, ObjectAddress objectAddress = PgGetObjectAddress(textType, nameArray,
argsArray); argsArray, false);
/* First, disable propagation off to not to cause infinite propagation */ /* First, disable propagation off to not to cause infinite propagation */
bool prevDependencyCreationValue = EnableMetadataSync; bool prevDependencyCreationValue = EnableMetadataSync;
@ -1000,6 +1001,103 @@ citus_internal_add_object_metadata(PG_FUNCTION_ARGS)
} }
/*
* DropDistributedObjectCommand generates a command that can be executed to
* drop the provided object from pg_dist_object on a worker node.
*/
char *
DropDistributedObjectCommand(const ObjectAddress *address)
{
StringInfo dropDistributedObjectCommand = makeStringInfo();
appendStringInfo(dropDistributedObjectCommand,
"SELECT citus_internal_drop_object_metadata(");
List *names = NIL;
List *args = NIL;
char *objectType = NULL;
#if PG_VERSION_NUM >= PG_VERSION_14
objectType = getObjectTypeDescription(address, false);
getObjectIdentityParts(address, &names, &args, false);
#else
objectType = getObjectTypeDescription(address);
getObjectIdentityParts(address, &names, &args);
#endif
appendStringInfo(dropDistributedObjectCommand,
"(%s, ARRAY[",
quote_literal_cstr(objectType));
char *name = NULL;
bool firstInNameLoop = true;
foreach_ptr(name, names)
{
if (!firstInNameLoop)
{
appendStringInfo(dropDistributedObjectCommand, ", ");
}
firstInNameLoop = false;
appendStringInfoString(dropDistributedObjectCommand,
quote_literal_cstr(name));
}
appendStringInfo(dropDistributedObjectCommand, "]::text[], ARRAY[");
char *arg;
bool firstInArgLoop = true;
foreach_ptr(arg, args)
{
if (!firstInArgLoop)
{
appendStringInfo(dropDistributedObjectCommand, ", ");
}
firstInArgLoop = false;
appendStringInfoString(dropDistributedObjectCommand,
quote_literal_cstr(arg));
}
appendStringInfo(dropDistributedObjectCommand, "]::text[]);");
return dropDistributedObjectCommand->data;
}
/*
* citus_internal_drop_object_metadata is an internal UDF to
* drop a row from pg_dist_object.
*/
Datum
citus_internal_drop_object_metadata(PG_FUNCTION_ARGS)
{
char *textType = TextDatumGetCString(PG_GETARG_DATUM(0));
ArrayType *nameArray = PG_GETARG_ARRAYTYPE_P(1);
ArrayType *argsArray = PG_GETARG_ARRAYTYPE_P(2);
if (!ShouldSkipMetadataChecks())
{
/* this UDF is not allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation();
}
/*
* We check the acl/ownership while getting the object address. That
* funtion also checks the sanity of given textType, nameArray and
* argsArray parameters
*/
ObjectAddress objectAddress = PgGetObjectAddress(textType, nameArray,
argsArray, true);
if (OidIsValid(objectAddress.objectId))
{
UnmarkObjectDistributedLocally(&objectAddress);
}
PG_RETURN_VOID();
}
/* /*
* EnsureObjectMetadataIsSane checks whether the distribution argument index and * EnsureObjectMetadataIsSane checks whether the distribution argument index and
* colocation id metadata params for distributed object is sane. You can look * colocation id metadata params for distributed object is sane. You can look

View File

@ -53,7 +53,7 @@ static List * textarray_to_strvaluelist(ArrayType *arr);
* Codes added by Citus are tagged with CITUS CODE BEGIN/END. * Codes added by Citus are tagged with CITUS CODE BEGIN/END.
*/ */
ObjectAddress ObjectAddress
PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr) PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr, bool missingOk)
{ {
List *name = NIL; List *name = NIL;
TypeName *typename = NULL; TypeName *typename = NULL;
@ -368,7 +368,12 @@ PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr)
} }
ObjectAddress addr = get_object_address(type, objnode, ObjectAddress addr = get_object_address(type, objnode,
&relation, AccessShareLock, false); &relation, AccessShareLock, missingOk);
if (!OidIsValid(addr.objectId))
{
return addr;
}
/* CITUS CODE BEGIN */ /* CITUS CODE BEGIN */
ErrorIfCurrentUserCanNotDistributeObject(type, &addr, objnode, &relation); ErrorIfCurrentUserCanNotDistributeObject(type, &addr, objnode, &relation);

View File

@ -10,6 +10,7 @@
#include "udfs/citus_shard_indexes_on_worker/11.0-1.sql" #include "udfs/citus_shard_indexes_on_worker/11.0-1.sql"
#include "udfs/citus_internal_add_object_metadata/11.0-1.sql" #include "udfs/citus_internal_add_object_metadata/11.0-1.sql"
#include "udfs/citus_internal_drop_object_metadata/11.0-1.sql"
#include "udfs/citus_internal_add_colocation_metadata/11.0-1.sql" #include "udfs/citus_internal_add_colocation_metadata/11.0-1.sql"
#include "udfs/citus_internal_delete_colocation_metadata/11.0-1.sql" #include "udfs/citus_internal_delete_colocation_metadata/11.0-1.sql"
#include "udfs/citus_run_local_command/11.0-1.sql" #include "udfs/citus_run_local_command/11.0-1.sql"

View File

@ -52,6 +52,7 @@ DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer);
DROP FUNCTION pg_catalog.citus_check_cluster_node_health (); DROP FUNCTION pg_catalog.citus_check_cluster_node_health ();
DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer, boolean); DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer, boolean);
DROP FUNCTION pg_catalog.citus_internal_drop_object_metadata(text, text[], text[]);
DROP FUNCTION pg_catalog.citus_internal_add_colocation_metadata(int, int, int, regtype, oid); DROP FUNCTION pg_catalog.citus_internal_add_colocation_metadata(int, int, int, regtype, oid);
DROP FUNCTION pg_catalog.citus_internal_delete_colocation_metadata(int); DROP FUNCTION pg_catalog.citus_internal_delete_colocation_metadata(int);
DROP FUNCTION pg_catalog.citus_run_local_command(text); DROP FUNCTION pg_catalog.citus_run_local_command(text);

View File

@ -0,0 +1,11 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_drop_object_metadata(
typeText text,
objNames text[],
objArgs text[])
RETURNS void
LANGUAGE C
STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_drop_object_metadata(text,text[],text[]) IS
'Drops distributed object from pg_dist_object';

View File

@ -0,0 +1,11 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_drop_object_metadata(
typeText text,
objNames text[],
objArgs text[])
RETURNS void
LANGUAGE C
STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_drop_object_metadata(text,text[],text[]) IS
'Drops distributed object from pg_dist_object';

View File

@ -137,10 +137,10 @@ WorkerDropDistributedTable(Oid relationId)
{ {
ObjectAddress ownedSequenceAddress = { 0 }; ObjectAddress ownedSequenceAddress = { 0 };
ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid); ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid);
UnmarkObjectDistributed(&ownedSequenceAddress); UnmarkObjectDistributedLocally(&ownedSequenceAddress);
} }
UnmarkObjectDistributed(&distributedTableObject); UnmarkObjectDistributedLocally(&distributedTableObject);
if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL)) if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
{ {
@ -238,7 +238,7 @@ worker_drop_shell_table(PG_FUNCTION_ARGS)
{ {
ObjectAddress ownedSequenceAddress = { 0 }; ObjectAddress ownedSequenceAddress = { 0 };
ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid); ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid);
UnmarkObjectDistributed(&ownedSequenceAddress); UnmarkObjectDistributedLocally(&ownedSequenceAddress);
} }
/* /*

View File

@ -25,11 +25,12 @@ extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address); extern void UnmarkObjectDistributed(const ObjectAddress *address);
extern void UnmarkObjectDistributedLocally(const ObjectAddress *address);
extern bool IsTableOwnedByExtension(Oid relationId); extern bool IsTableOwnedByExtension(Oid relationId);
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target, extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
ObjectAddress *extensionAddress); ObjectAddress *extensionAddress);
extern ObjectAddress PgGetObjectAddress(char *ttype, ArrayType *namearr, extern ObjectAddress PgGetObjectAddress(char *ttype, ArrayType *namearr,
ArrayType *argsarr); ArrayType *argsarr, bool missingOk);
extern List * GetDistributedObjectAddressList(void); extern List * GetDistributedObjectAddressList(void);
extern RoleSpec * GetRoleSpecObjectForUser(Oid roleOid); extern RoleSpec * GetRoleSpecObjectForUser(Oid roleOid);
extern void UpdateDistributedObjectColocationId(uint32 oldColocationId, uint32 extern void UpdateDistributedObjectColocationId(uint32 oldColocationId, uint32

View File

@ -45,6 +45,7 @@ extern char * MarkObjectsDistributedCreateCommand(List *addresses,
List *distributionArgumentIndexes, List *distributionArgumentIndexes,
List *colocationIds, List *colocationIds,
List *forceDelegations); List *forceDelegations);
extern char * DropDistributedObjectCommand(const ObjectAddress *address) ;
extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry); extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry);
extern char * DistributionDeleteCommand(const char *schemaName, extern char * DistributionDeleteCommand(const char *schemaName,
const char *tableName); const char *tableName);