diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index b345210af..feec663a8 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -90,7 +90,14 @@ citus_unmark_object_distributed(PG_FUNCTION_ARGS) errhint("drop the object via a DROP command"))); } - UnmarkObjectDistributed(&address); + if (IsCoordinator()) + { + UnmarkObjectDistributed(&address); + } + else + { + UnmarkObjectDistributedLocally(&address); + } PG_RETURN_VOID(); } @@ -312,6 +319,23 @@ ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, */ void UnmarkObjectDistributed(const ObjectAddress *address) +{ + UnmarkObjectDistributedLocally(address); + + if (EnableMetadataSync) + { + char *pgDistObjectDropCommand = DropDistributedObjectCommand(address); + SendCommandToWorkersWithMetadataViaSuperUser(pgDistObjectDropCommand); + } +} + + +/* + * UnmarkObjectDistributedLocally removes the entry from pg_dist_object that marks this + * object as distributed locally. + */ +void +UnmarkObjectDistributedLocally(const ObjectAddress *address) { int paramCount = 3; Oid paramTypes[3] = { diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index dc501923e..b387ac9e5 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -153,6 +153,7 @@ PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation); PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata); +PG_FUNCTION_INFO_V1(citus_internal_drop_object_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata); @@ -963,7 +964,7 @@ citus_internal_add_object_metadata(PG_FUNCTION_ARGS) * argsArray parameters */ ObjectAddress objectAddress = PgGetObjectAddress(textType, nameArray, - argsArray); + argsArray, false); /* First, disable propagation off to not to cause infinite propagation */ bool prevDependencyCreationValue = EnableMetadataSync; @@ -1000,6 +1001,103 @@ citus_internal_add_object_metadata(PG_FUNCTION_ARGS) } +/* + * DropDistributedObjectCommand generates a command that can be executed to + * drop the provided object from pg_dist_object on a worker node. + */ +char * +DropDistributedObjectCommand(const ObjectAddress *address) +{ + StringInfo dropDistributedObjectCommand = makeStringInfo(); + + appendStringInfo(dropDistributedObjectCommand, + "SELECT citus_internal_drop_object_metadata("); + + List *names = NIL; + List *args = NIL; + char *objectType = NULL; + + #if PG_VERSION_NUM >= PG_VERSION_14 + objectType = getObjectTypeDescription(address, false); + getObjectIdentityParts(address, &names, &args, false); + #else + objectType = getObjectTypeDescription(address); + getObjectIdentityParts(address, &names, &args); + #endif + + appendStringInfo(dropDistributedObjectCommand, + "(%s, ARRAY[", + quote_literal_cstr(objectType)); + + char *name = NULL; + bool firstInNameLoop = true; + foreach_ptr(name, names) + { + if (!firstInNameLoop) + { + appendStringInfo(dropDistributedObjectCommand, ", "); + } + + firstInNameLoop = false; + appendStringInfoString(dropDistributedObjectCommand, + quote_literal_cstr(name)); + } + + appendStringInfo(dropDistributedObjectCommand, "]::text[], ARRAY["); + + char *arg; + bool firstInArgLoop = true; + foreach_ptr(arg, args) + { + if (!firstInArgLoop) + { + appendStringInfo(dropDistributedObjectCommand, ", "); + } + firstInArgLoop = false; + appendStringInfoString(dropDistributedObjectCommand, + quote_literal_cstr(arg)); + } + + appendStringInfo(dropDistributedObjectCommand, "]::text[]);"); + + return dropDistributedObjectCommand->data; +} + + +/* + * citus_internal_drop_object_metadata is an internal UDF to + * drop a row from pg_dist_object. + */ +Datum +citus_internal_drop_object_metadata(PG_FUNCTION_ARGS) +{ + char *textType = TextDatumGetCString(PG_GETARG_DATUM(0)); + ArrayType *nameArray = PG_GETARG_ARRAYTYPE_P(1); + ArrayType *argsArray = PG_GETARG_ARRAYTYPE_P(2); + + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + } + + /* + * We check the acl/ownership while getting the object address. That + * funtion also checks the sanity of given textType, nameArray and + * argsArray parameters + */ + ObjectAddress objectAddress = PgGetObjectAddress(textType, nameArray, + argsArray, true); + + if (OidIsValid(objectAddress.objectId)) + { + UnmarkObjectDistributedLocally(&objectAddress); + } + + PG_RETURN_VOID(); +} + + /* * EnsureObjectMetadataIsSane checks whether the distribution argument index and * colocation id metadata params for distributed object is sane. You can look diff --git a/src/backend/distributed/metadata/pg_get_object_address_12_13_14.c b/src/backend/distributed/metadata/pg_get_object_address_12_13_14.c index 26248f025..ae7a09f44 100644 --- a/src/backend/distributed/metadata/pg_get_object_address_12_13_14.c +++ b/src/backend/distributed/metadata/pg_get_object_address_12_13_14.c @@ -53,7 +53,7 @@ static List * textarray_to_strvaluelist(ArrayType *arr); * Codes added by Citus are tagged with CITUS CODE BEGIN/END. */ ObjectAddress -PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr) +PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr, bool missingOk) { List *name = NIL; TypeName *typename = NULL; @@ -368,7 +368,12 @@ PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr) } ObjectAddress addr = get_object_address(type, objnode, - &relation, AccessShareLock, false); + &relation, AccessShareLock, missingOk); + + if (!OidIsValid(addr.objectId)) + { + return addr; + } /* CITUS CODE BEGIN */ ErrorIfCurrentUserCanNotDistributeObject(type, &addr, objnode, &relation); diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index f341976c9..28c7dca02 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -10,6 +10,7 @@ #include "udfs/citus_shard_indexes_on_worker/11.0-1.sql" #include "udfs/citus_internal_add_object_metadata/11.0-1.sql" +#include "udfs/citus_internal_drop_object_metadata/11.0-1.sql" #include "udfs/citus_internal_add_colocation_metadata/11.0-1.sql" #include "udfs/citus_internal_delete_colocation_metadata/11.0-1.sql" #include "udfs/citus_run_local_command/11.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 19d5bb22d..2e5b432c3 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -52,6 +52,7 @@ DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer); DROP FUNCTION pg_catalog.citus_check_cluster_node_health (); DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer, boolean); +DROP FUNCTION pg_catalog.citus_internal_drop_object_metadata(text, text[], text[]); DROP FUNCTION pg_catalog.citus_internal_add_colocation_metadata(int, int, int, regtype, oid); DROP FUNCTION pg_catalog.citus_internal_delete_colocation_metadata(int); DROP FUNCTION pg_catalog.citus_run_local_command(text); diff --git a/src/backend/distributed/sql/udfs/citus_internal_drop_object_metadata/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_internal_drop_object_metadata/11.0-1.sql new file mode 100644 index 000000000..28277d7a5 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_drop_object_metadata/11.0-1.sql @@ -0,0 +1,11 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_drop_object_metadata( + typeText text, + objNames text[], + objArgs text[]) + RETURNS void + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME'; + +COMMENT ON FUNCTION pg_catalog.citus_internal_drop_object_metadata(text,text[],text[]) IS + 'Drops distributed object from pg_dist_object'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_drop_object_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_drop_object_metadata/latest.sql new file mode 100644 index 000000000..28277d7a5 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_drop_object_metadata/latest.sql @@ -0,0 +1,11 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_drop_object_metadata( + typeText text, + objNames text[], + objArgs text[]) + RETURNS void + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME'; + +COMMENT ON FUNCTION pg_catalog.citus_internal_drop_object_metadata(text,text[],text[]) IS + 'Drops distributed object from pg_dist_object'; diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index 0f425583b..fcc9f7efa 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -137,10 +137,10 @@ WorkerDropDistributedTable(Oid relationId) { ObjectAddress ownedSequenceAddress = { 0 }; ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid); - UnmarkObjectDistributed(&ownedSequenceAddress); + UnmarkObjectDistributedLocally(&ownedSequenceAddress); } - UnmarkObjectDistributed(&distributedTableObject); + UnmarkObjectDistributedLocally(&distributedTableObject); if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL)) { @@ -238,7 +238,7 @@ worker_drop_shell_table(PG_FUNCTION_ARGS) { ObjectAddress ownedSequenceAddress = { 0 }; ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid); - UnmarkObjectDistributed(&ownedSequenceAddress); + UnmarkObjectDistributedLocally(&ownedSequenceAddress); } /* diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index 5ea04ec73..7668285a6 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -25,11 +25,12 @@ extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); +extern void UnmarkObjectDistributedLocally(const ObjectAddress *address); extern bool IsTableOwnedByExtension(Oid relationId); extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target, ObjectAddress *extensionAddress); extern ObjectAddress PgGetObjectAddress(char *ttype, ArrayType *namearr, - ArrayType *argsarr); + ArrayType *argsarr, bool missingOk); extern List * GetDistributedObjectAddressList(void); extern RoleSpec * GetRoleSpecObjectForUser(Oid roleOid); extern void UpdateDistributedObjectColocationId(uint32 oldColocationId, uint32 diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index e67726bfc..8e707ffec 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -45,6 +45,7 @@ extern char * MarkObjectsDistributedCreateCommand(List *addresses, List *distributionArgumentIndexes, List *colocationIds, List *forceDelegations); +extern char * DropDistributedObjectCommand(const ObjectAddress *address) ; extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry); extern char * DistributionDeleteCommand(const char *schemaName, const char *tableName);