Fixes UnmarkDistributedObject propagation

pull/7270/head
gindibay 2023-10-14 16:57:28 +03:00
parent 3a6fdada11
commit 64a8cffb24
13 changed files with 116 additions and 40 deletions

View File

@ -95,6 +95,18 @@ PostprocessCreateDistributedObjectFromCatalogStmt(Node *stmt, const char *queryS
}
void
UnmarkDistObjectsOnAllNodes(Node *parseTree)
{
List *addresses = GetObjectAddressListFromParseTree(parseTree, false, true);
ObjectAddress *address = NULL;
foreach_ptr(address, addresses)
{
UnmarkObjectDistributed(address);
}
}
/*
* PreprocessAlterDistributedObjectStmt handles any updates to distributed objects by
* creating the fully qualified sql to apply to all workers after checking all
@ -266,11 +278,11 @@ PreprocessDropDistributedObjectStmt(Node *node, const char *queryString,
/*
* remove the entries for the distributed objects on dropping
*/
ObjectAddress *address = NULL;
foreach_ptr(address, distributedObjectAddresses)
{
UnmarkObjectDistributed(address);
}
/* ObjectAddress *address = NULL; */
/* foreach_ptr(address, distributedObjectAddresses) */
/* { */
/* UnmarkObjectDistributedLocally(address); */
/* } */
/*
* temporary swap the lists of objects to delete with the distributed objects and

View File

@ -363,6 +363,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
EnsureCoordinator();
DropdbStmt *stmt = (DropdbStmt *) node;
Oid databaseOid = get_database_oid(stmt->dbname, stmt->missing_ok);
@ -380,8 +381,6 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
return NIL;
}
UnmarkObjectDistributed(&dbAddress);
char *dropDatabaseCommand = DeparseTreeNode(node);
StringInfo internalDropCommand = makeStringInfo();
@ -390,6 +389,8 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
quote_literal_cstr(dropDatabaseCommand));
UnmarkDistObjectsOnAllNodes(node);
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) internalDropCommand->data,
ENABLE_DDL_PROPAGATION);
@ -398,13 +399,30 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
}
List *
CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
static List *
GetAddressFromDbName(const char *dbname, bool missing_ok)
{
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
Oid databaseOid = get_database_oid(stmt->dbname, missing_ok);
Oid databaseOid = get_database_oid(dbname, missing_ok);
ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid);
return list_make1(dbAddress);
}
List *
CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
{
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
return GetAddressFromDbName(stmt->dbname, missing_ok);
}
List *
DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
{
DropdbStmt *stmt = castNode(DropdbStmt, node);
return GetAddressFromDbName(stmt->dbname, missing_ok);
}

View File

@ -484,7 +484,7 @@ static DistributeObjectOps Database_Drop = {
.postprocess = NULL,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_DROP,
.address = NULL,
.address = DropDatabaseStmtObjectAddress,
.markDistributed = false,
};

View File

@ -273,7 +273,7 @@ PreprocessDropExtensionStmt(Node *node, const char *queryString,
ObjectAddress *address = NULL;
foreach_ptr(address, distributedExtensionAddresses)
{
UnmarkObjectDistributed(address);
UnmarkObjectDistributedLocally(address);
}
/*

View File

@ -1080,7 +1080,7 @@ UnmarkRolesDistributed(List *roles)
}
ObjectAddressSet(roleAddress, AuthIdRelationId, roleOid);
UnmarkObjectDistributed(&roleAddress);
UnmarkObjectDistributedLocally(&roleAddress);
}
}

View File

@ -322,7 +322,7 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString,
ObjectAddress *address = NULL;
foreach_ptr(address, distributedSequenceAddresses)
{
UnmarkObjectDistributed(address);
UnmarkObjectDistributedLocally(address);
}
/*

View File

@ -225,18 +225,18 @@ multi_ProcessUtility(PlannedStmt *pstmt,
* Make sure that on DROP DATABASE we terminate the background daemon
* associated with it.
*/
if (IsA(parsetree, DropdbStmt))
{
const bool missingOK = true;
DropdbStmt *dropDbStatement = (DropdbStmt *) parsetree;
char *dbname = dropDbStatement->dbname;
Oid databaseOid = get_database_oid(dbname, missingOK);
/* if (IsA(parsetree, DropdbStmt)) */
/* { */
/* const bool missingOK = true; */
/* DropdbStmt *dropDbStatement = (DropdbStmt *) parsetree; */
/* char *dbname = dropDbStatement->dbname; */
/* Oid databaseOid = get_database_oid(dbname, missingOK); */
if (OidIsValid(databaseOid))
{
StopMaintenanceDaemon(databaseOid);
}
}
/* if (OidIsValid(databaseOid)) */
/* { */
/* StopMaintenanceDaemon(databaseOid); */
/* } */
/* } */
if (!CitusHasBeenLoaded())
{
@ -741,6 +741,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
}
}
pstmt->utilityStmt = parsetree;
PG_TRY();

View File

@ -31,35 +31,49 @@ optionToStatement(StringInfo buf, DefElem *option, const struct
{
if (strcmp(name, opt_formats[i].name) == 0)
{
switch (opt_formats[i].type) {
case OPTION_FORMAT_STRING: {
switch (opt_formats[i].type)
{
case OPTION_FORMAT_STRING:
{
char *value = defGetString(option);
appendStringInfo(buf, opt_formats[i].format, quote_identifier(value));
break;
}
case OPTION_FORMAT_INTEGER: {
case OPTION_FORMAT_INTEGER:
{
int32 value = defGetInt32(option);
appendStringInfo(buf, opt_formats[i].format, value);
break;
}
case OPTION_FORMAT_BOOLEAN: {
case OPTION_FORMAT_BOOLEAN:
{
bool value = defGetBoolean(option);
appendStringInfo(buf, opt_formats[i].format, value ? "true" : "false");
appendStringInfo(buf, opt_formats[i].format, value ? "true" :
"false");
break;
}
#if PG_VERSION_NUM >= PG_VERSION_15
case OPTION_FORMAT_OBJECT_ID: {
case OPTION_FORMAT_OBJECT_ID:
{
Oid value = defGetObjectId(option);
appendStringInfo(buf, opt_formats[i].format, value);
break;
}
#endif
case OPTION_FORMAT_LITERAL_CSTR: {
case OPTION_FORMAT_LITERAL_CSTR:
{
char *value = defGetString(option);
appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr(value));
appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr(
value));
break;
}
default: {
default:
{
elog(ERROR, "unrecognized option type: %d", opt_formats[i].type);
break;
}

View File

@ -93,7 +93,7 @@ citus_unmark_object_distributed(PG_FUNCTION_ARGS)
errhint("drop the object via a DROP command")));
}
UnmarkObjectDistributed(&address);
UnmarkObjectDistributedLocally(&address);
PG_RETURN_VOID();
}
@ -361,8 +361,11 @@ ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
* distributed. This will prevent updates to that object to be propagated to the worker.
*/
void
UnmarkObjectDistributed(const ObjectAddress *address)
UnmarkObjectDistributedLocally(const ObjectAddress *address)
{
elog(WARNING,
"Test UnmarkObjectDistributedLocally %d %d %d", address->classId,
address->objectId, address->objectSubId);
int paramCount = 3;
Oid paramTypes[3] = {
OIDOID,
@ -387,6 +390,26 @@ UnmarkObjectDistributed(const ObjectAddress *address)
}
void
UnmarkObjectDistributed(const ObjectAddress *address)
{
UnmarkObjectDistributedLocally(address);
/* TODO Some post operations are needed in my opinion like MarkObjectDistributed
* Below is the code from MarkObjectDistributed
*
* if (EnableMetadataSync)
* {
* //I need to change below command IMO but I'm not sure
* char *workerPgDistObjectUpdateCommand =
* CreatePgDistObjectEntryCommand(distAddress);
* SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand);
* }
*
*/
}
/*
* IsObjectDistributed returns if the object addressed is already distributed in the
* cluster. This performs a local indexed lookup in pg_dist_object.

View File

@ -49,6 +49,10 @@ BEGIN
-- remove entries from citus.pg_dist_object for all dropped root (objsubid = 0) objects
PERFORM master_unmark_object_distributed(v_obj.classid, v_obj.objid, v_obj.objsubid);
--add a test log
RAISE WARNING 'Dropped object for test: %', v_obj.object_name;
END LOOP;
SELECT COUNT(*) INTO constraint_event_count

View File

@ -131,10 +131,10 @@ WorkerDropDistributedTable(Oid relationId)
{
ObjectAddress ownedSequenceAddress = { 0 };
ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid);
UnmarkObjectDistributed(&ownedSequenceAddress);
UnmarkObjectDistributedLocally(&ownedSequenceAddress);
}
UnmarkObjectDistributed(distributedTableObject);
UnmarkObjectDistributedLocally(distributedTableObject);
/*
* Remove metadata before object's itself to make functions no-op within
@ -243,7 +243,7 @@ worker_drop_shell_table(PG_FUNCTION_ARGS)
{
ObjectAddress ownedSequenceAddress = { 0 };
ObjectAddressSet(ownedSequenceAddress, RelationRelationId, ownedSequenceOid);
UnmarkObjectDistributed(&ownedSequenceAddress);
UnmarkObjectDistributedLocally(&ownedSequenceAddress);
}
/*

View File

@ -193,6 +193,7 @@ extern List * DropTextSearchConfigObjectAddress(Node *node, bool missing_ok, boo
isPostprocess);
extern List * DropTextSearchDictObjectAddress(Node *node, bool missing_ok, bool
isPostprocess);
extern void UnmarkDistObjectsOnAllNodes(Node *parseTree);
/* index.c */
typedef void (*PGIndexProcessor)(Form_pg_index, List **, int);
@ -233,6 +234,8 @@ extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *que
extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool
isPostprocess);
extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool
isPostprocess);
extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);

View File

@ -27,6 +27,7 @@ extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address);
extern void UnmarkObjectDistributedLocally(const ObjectAddress *address);
extern bool IsTableOwnedByExtension(Oid relationId);
extern bool ObjectAddressDependsOnExtension(const ObjectAddress *target);
extern bool IsAnyObjectAddressOwnedByExtension(const List *targets,