From 64a8cffb2406bad7fb403bc941ffac2713205b7a Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 14 Oct 2023 16:57:28 +0300 Subject: [PATCH] Fixes UnmarkDistributedObject propagation --- src/backend/distributed/commands/common.c | 22 ++++++++++--- src/backend/distributed/commands/database.c | 30 +++++++++++++---- .../commands/distribute_object_ops.c | 2 +- src/backend/distributed/commands/extension.c | 2 +- src/backend/distributed/commands/role.c | 2 +- src/backend/distributed/commands/sequence.c | 2 +- .../distributed/commands/utility_hook.c | 23 ++++++------- .../distributed/deparser/citus_deparseutils.c | 32 +++++++++++++------ src/backend/distributed/metadata/distobject.c | 27 ++++++++++++++-- .../sql/udfs/citus_drop_trigger/12.0-1.sql | 4 +++ .../distributed/worker/worker_drop_protocol.c | 6 ++-- src/include/distributed/commands.h | 3 ++ src/include/distributed/metadata/distobject.h | 1 + 13 files changed, 116 insertions(+), 40 deletions(-) diff --git a/src/backend/distributed/commands/common.c b/src/backend/distributed/commands/common.c index 797981d47..8429c91e0 100644 --- a/src/backend/distributed/commands/common.c +++ b/src/backend/distributed/commands/common.c @@ -95,6 +95,18 @@ PostprocessCreateDistributedObjectFromCatalogStmt(Node *stmt, const char *queryS } +void +UnmarkDistObjectsOnAllNodes(Node *parseTree) +{ + List *addresses = GetObjectAddressListFromParseTree(parseTree, false, true); + ObjectAddress *address = NULL; + foreach_ptr(address, addresses) + { + UnmarkObjectDistributed(address); + } +} + + /* * PreprocessAlterDistributedObjectStmt handles any updates to distributed objects by * creating the fully qualified sql to apply to all workers after checking all @@ -266,11 +278,11 @@ PreprocessDropDistributedObjectStmt(Node *node, const char *queryString, /* * remove the entries for the distributed objects on dropping */ - ObjectAddress *address = NULL; - foreach_ptr(address, distributedObjectAddresses) - { - UnmarkObjectDistributed(address); - } + /* ObjectAddress *address = NULL; */ + /* foreach_ptr(address, distributedObjectAddresses) */ + /* { */ + /* UnmarkObjectDistributedLocally(address); */ + /* } */ /* * temporary swap the lists of objects to delete with the distributed objects and diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 8aee9213f..eff855eb7 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -363,6 +363,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, EnsureCoordinator(); + DropdbStmt *stmt = (DropdbStmt *) node; Oid databaseOid = get_database_oid(stmt->dbname, stmt->missing_ok); @@ -380,8 +381,6 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, return NIL; } - UnmarkObjectDistributed(&dbAddress); - char *dropDatabaseCommand = DeparseTreeNode(node); StringInfo internalDropCommand = makeStringInfo(); @@ -390,6 +389,8 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, quote_literal_cstr(dropDatabaseCommand)); + UnmarkDistObjectsOnAllNodes(node); + List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) internalDropCommand->data, ENABLE_DDL_PROPAGATION); @@ -398,13 +399,30 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, } -List * -CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) +static List * +GetAddressFromDbName(const char *dbname, bool missing_ok) { - CreatedbStmt *stmt = castNode(CreatedbStmt, node); - Oid databaseOid = get_database_oid(stmt->dbname, missing_ok); + Oid databaseOid = get_database_oid(dbname, missing_ok); ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid); return list_make1(dbAddress); } + + +List * +CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) +{ + CreatedbStmt *stmt = castNode(CreatedbStmt, node); + + return GetAddressFromDbName(stmt->dbname, missing_ok); +} + + +List * +DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) +{ + DropdbStmt *stmt = castNode(DropdbStmt, node); + + return GetAddressFromDbName(stmt->dbname, missing_ok); +} diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 49a96e016..2888bef1d 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -484,7 +484,7 @@ static DistributeObjectOps Database_Drop = { .postprocess = NULL, .objectType = OBJECT_DATABASE, .operationType = DIST_OPS_DROP, - .address = NULL, + .address = DropDatabaseStmtObjectAddress, .markDistributed = false, }; diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index 5bddf1ede..01c30995e 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -273,7 +273,7 @@ PreprocessDropExtensionStmt(Node *node, const char *queryString, ObjectAddress *address = NULL; foreach_ptr(address, distributedExtensionAddresses) { - UnmarkObjectDistributed(address); + UnmarkObjectDistributedLocally(address); } /* diff --git a/src/backend/distributed/commands/role.c b/src/backend/distributed/commands/role.c index 754be1a2b..ca2c1eef1 100644 --- a/src/backend/distributed/commands/role.c +++ b/src/backend/distributed/commands/role.c @@ -1080,7 +1080,7 @@ UnmarkRolesDistributed(List *roles) } ObjectAddressSet(roleAddress, AuthIdRelationId, roleOid); - UnmarkObjectDistributed(&roleAddress); + UnmarkObjectDistributedLocally(&roleAddress); } } diff --git a/src/backend/distributed/commands/sequence.c b/src/backend/distributed/commands/sequence.c index 9ff586c8c..3a9ab3b44 100644 --- a/src/backend/distributed/commands/sequence.c +++ b/src/backend/distributed/commands/sequence.c @@ -322,7 +322,7 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString, ObjectAddress *address = NULL; foreach_ptr(address, distributedSequenceAddresses) { - UnmarkObjectDistributed(address); + UnmarkObjectDistributedLocally(address); } /* diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index dd729cad0..ad066cfba 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -225,18 +225,18 @@ multi_ProcessUtility(PlannedStmt *pstmt, * Make sure that on DROP DATABASE we terminate the background daemon * associated with it. */ - if (IsA(parsetree, DropdbStmt)) - { - const bool missingOK = true; - DropdbStmt *dropDbStatement = (DropdbStmt *) parsetree; - char *dbname = dropDbStatement->dbname; - Oid databaseOid = get_database_oid(dbname, missingOK); + /* if (IsA(parsetree, DropdbStmt)) */ + /* { */ + /* const bool missingOK = true; */ + /* DropdbStmt *dropDbStatement = (DropdbStmt *) parsetree; */ + /* char *dbname = dropDbStatement->dbname; */ + /* Oid databaseOid = get_database_oid(dbname, missingOK); */ - if (OidIsValid(databaseOid)) - { - StopMaintenanceDaemon(databaseOid); - } - } + /* if (OidIsValid(databaseOid)) */ + /* { */ + /* StopMaintenanceDaemon(databaseOid); */ + /* } */ + /* } */ if (!CitusHasBeenLoaded()) { @@ -741,6 +741,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt, } } + pstmt->utilityStmt = parsetree; PG_TRY(); diff --git a/src/backend/distributed/deparser/citus_deparseutils.c b/src/backend/distributed/deparser/citus_deparseutils.c index f96244a27..79c6a0668 100644 --- a/src/backend/distributed/deparser/citus_deparseutils.c +++ b/src/backend/distributed/deparser/citus_deparseutils.c @@ -31,35 +31,49 @@ optionToStatement(StringInfo buf, DefElem *option, const struct { if (strcmp(name, opt_formats[i].name) == 0) { - switch (opt_formats[i].type) { - case OPTION_FORMAT_STRING: { + switch (opt_formats[i].type) + { + case OPTION_FORMAT_STRING: + { char *value = defGetString(option); appendStringInfo(buf, opt_formats[i].format, quote_identifier(value)); break; } - case OPTION_FORMAT_INTEGER: { + + case OPTION_FORMAT_INTEGER: + { int32 value = defGetInt32(option); appendStringInfo(buf, opt_formats[i].format, value); break; } - case OPTION_FORMAT_BOOLEAN: { + + case OPTION_FORMAT_BOOLEAN: + { bool value = defGetBoolean(option); - appendStringInfo(buf, opt_formats[i].format, value ? "true" : "false"); + appendStringInfo(buf, opt_formats[i].format, value ? "true" : + "false"); break; } + #if PG_VERSION_NUM >= PG_VERSION_15 - case OPTION_FORMAT_OBJECT_ID: { + case OPTION_FORMAT_OBJECT_ID: + { Oid value = defGetObjectId(option); appendStringInfo(buf, opt_formats[i].format, value); break; } + #endif - case OPTION_FORMAT_LITERAL_CSTR: { + case OPTION_FORMAT_LITERAL_CSTR: + { char *value = defGetString(option); - appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr(value)); + appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr( + value)); break; } - default: { + + default: + { elog(ERROR, "unrecognized option type: %d", opt_formats[i].type); break; } diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index c420e6ec3..2c5589dfa 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -93,7 +93,7 @@ citus_unmark_object_distributed(PG_FUNCTION_ARGS) errhint("drop the object via a DROP command"))); } - UnmarkObjectDistributed(&address); + UnmarkObjectDistributedLocally(&address); PG_RETURN_VOID(); } @@ -361,8 +361,11 @@ ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, * distributed. This will prevent updates to that object to be propagated to the worker. */ void -UnmarkObjectDistributed(const ObjectAddress *address) +UnmarkObjectDistributedLocally(const ObjectAddress *address) { + elog(WARNING, + "Test UnmarkObjectDistributedLocally %d %d %d", address->classId, + address->objectId, address->objectSubId); int paramCount = 3; Oid paramTypes[3] = { OIDOID, @@ -387,6 +390,26 @@ UnmarkObjectDistributed(const ObjectAddress *address) } +void +UnmarkObjectDistributed(const ObjectAddress *address) +{ + UnmarkObjectDistributedLocally(address); + + /* TODO Some post operations are needed in my opinion like MarkObjectDistributed + * Below is the code from MarkObjectDistributed + * + * if (EnableMetadataSync) + * { + * //I need to change below command IMO but I'm not sure + * char *workerPgDistObjectUpdateCommand = + * CreatePgDistObjectEntryCommand(distAddress); + * SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand); + * } + * + */ +} + + /* * IsObjectDistributed returns if the object addressed is already distributed in the * cluster. This performs a local indexed lookup in pg_dist_object. diff --git a/src/backend/distributed/sql/udfs/citus_drop_trigger/12.0-1.sql b/src/backend/distributed/sql/udfs/citus_drop_trigger/12.0-1.sql index 312099aeb..e707acecf 100644 --- a/src/backend/distributed/sql/udfs/citus_drop_trigger/12.0-1.sql +++ b/src/backend/distributed/sql/udfs/citus_drop_trigger/12.0-1.sql @@ -49,6 +49,10 @@ BEGIN -- remove entries from citus.pg_dist_object for all dropped root (objsubid = 0) objects PERFORM master_unmark_object_distributed(v_obj.classid, v_obj.objid, v_obj.objsubid); + + --add a test log + RAISE WARNING 'Dropped object for test: %', v_obj.object_name; + END LOOP; SELECT COUNT(*) INTO constraint_event_count diff --git a/src/backend/distributed/worker/worker_drop_protocol.c b/src/backend/distributed/worker/worker_drop_protocol.c index 16b7bb66a..8ec9c106a 100644 --- a/src/backend/distributed/worker/worker_drop_protocol.c +++ b/src/backend/distributed/worker/worker_drop_protocol.c @@ -131,10 +131,10 @@ WorkerDropDistributedTable(Oid relationId) { ObjectAddress ownedSequenceAddress = { 0 }; ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid); - UnmarkObjectDistributed(&ownedSequenceAddress); + UnmarkObjectDistributedLocally(&ownedSequenceAddress); } - UnmarkObjectDistributed(distributedTableObject); + UnmarkObjectDistributedLocally(distributedTableObject); /* * Remove metadata before object's itself to make functions no-op within @@ -243,7 +243,7 @@ worker_drop_shell_table(PG_FUNCTION_ARGS) { ObjectAddress ownedSequenceAddress = { 0 }; ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid); - UnmarkObjectDistributed(&ownedSequenceAddress); + UnmarkObjectDistributedLocally(&ownedSequenceAddress); } /* diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index b1f65177e..f763bea49 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -193,6 +193,7 @@ extern List * DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, boo isPostprocess); extern List * DropTextSearchDictObjectAddress(Node *node, bool missing_ok, bool isPostprocess); +extern void UnmarkDistObjectsOnAllNodes(Node *parseTree); /* index.c */ typedef void (*PGIndexProcessor)(Form_pg_index, List **, int); @@ -233,6 +234,8 @@ extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *que extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess); +extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool + isPostprocess); extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index ba984091c..80d561e15 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -27,6 +27,7 @@ extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); +extern void UnmarkObjectDistributedLocally(const ObjectAddress *address); extern bool IsTableOwnedByExtension(Oid relationId); extern bool ObjectAddressDependsOnExtension(const ObjectAddress *target); extern bool IsAnyObjectAddressOwnedByExtension(const List *targets,