Merge pull request #5926 from citusdata/velioglu/syncMetadataViaObject

Use object address instead of relation id on DDLJob to decide on syncing metadata
velioglu/prop_view_temp^2
Burak Velioglu 2022-05-06 15:43:49 +03:00 committed by GitHub
commit a2158794bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 48 additions and 24 deletions

View File

@ -464,7 +464,8 @@ GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createInd
{ {
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = CreateIndexStmtGetRelationId(createIndexStatement); ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId,
CreateIndexStmtGetRelationId(createIndexStatement));
ddlJob->startNewTransaction = createIndexStatement->concurrent; ddlJob->startNewTransaction = createIndexStatement->concurrent;
ddlJob->metadataSyncCommand = createIndexCommand; ddlJob->metadataSyncCommand = createIndexCommand;
ddlJob->taskList = CreateIndexTaskList(createIndexStatement); ddlJob->taskList = CreateIndexTaskList(createIndexStatement);
@ -598,7 +599,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
} }
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = IsReindexWithParam_compat(reindexStatement, ddlJob->startNewTransaction = IsReindexWithParam_compat(reindexStatement,
"concurrently"); "concurrently");
ddlJob->metadataSyncCommand = reindexCommand; ddlJob->metadataSyncCommand = reindexCommand;
@ -695,7 +696,8 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
MarkInvalidateForeignKeyGraph(); MarkInvalidateForeignKeyGraph();
} }
ddlJob->targetRelationId = distributedRelationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId,
distributedRelationId);
/* /*
* We do not want DROP INDEX CONCURRENTLY to commit locally before * We do not want DROP INDEX CONCURRENTLY to commit locally before

View File

@ -127,7 +127,7 @@ PreprocessRenameStmt(Node *node, const char *renameCommand,
} }
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = tableRelationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, tableRelationId);
ddlJob->metadataSyncCommand = renameCommand; ddlJob->metadataSyncCommand = renameCommand;
ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand); ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand);

View File

@ -92,7 +92,7 @@ PreprocessCreateStatisticsStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
@ -197,7 +197,7 @@ PreprocessDropStatisticsStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
@ -236,7 +236,7 @@ PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
@ -274,7 +274,7 @@ PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
@ -376,7 +376,7 @@ PreprocessAlterStatisticsStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
@ -416,7 +416,7 @@ PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);

View File

@ -1102,7 +1102,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
/* fill them here as it is possible to use them in some conditional blocks below */ /* fill them here as it is possible to use them in some conditional blocks below */
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = leftRelationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, leftRelationId);
const char *sqlForTaskList = alterTableCommand; const char *sqlForTaskList = alterTableCommand;
if (deparseAT) if (deparseAT)
@ -1779,7 +1779,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->metadataSyncCommand = DeparseTreeNode((Node *) stmt); ddlJob->metadataSyncCommand = DeparseTreeNode((Node *) stmt);
ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand); ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand);
return list_make1(ddlJob); return list_make1(ddlJob);

View File

@ -712,7 +712,7 @@ CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName,
const char *queryString) const char *queryString)
{ {
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->metadataSyncCommand = queryString; ddlJob->metadataSyncCommand = queryString;
if (!triggerName) if (!triggerName)

View File

@ -1044,16 +1044,20 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
EnsureCoordinator(); EnsureCoordinator();
Oid targetRelationId = ddlJob->targetRelationId; ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress;
if (OidIsValid(targetRelationId)) if (OidIsValid(targetObjectAddress.classId))
{ {
/* /*
* Only for ddlJobs that are targetting a relation (table) we want to sync * Only for ddlJobs that are targetting an object we want to sync
* its metadata and verify some properties around the table. * its metadata.
*/ */
shouldSyncMetadata = ShouldSyncTableMetadata(targetRelationId); shouldSyncMetadata = ShouldSyncUserCommandForObject(targetObjectAddress);
EnsurePartitionTableNotReplicated(targetRelationId);
if (targetObjectAddress.classId == RelationRelationId)
{
EnsurePartitionTableNotReplicated(targetObjectAddress.objectId);
}
} }
bool localExecutionSupported = true; bool localExecutionSupported = true;
@ -1304,7 +1308,7 @@ CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command)
} }
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->metadataSyncCommand = GetTableDDLCommand(command); ddlJob->metadataSyncCommand = GetTableDDLCommand(command);
ddlJob->taskList = taskList; ddlJob->taskList = taskList;
@ -1555,7 +1559,7 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands)
} }
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = InvalidOid; ddlJob->targetObjectAddress = InvalidObjectAddress;
ddlJob->metadataSyncCommand = NULL; ddlJob->metadataSyncCommand = NULL;
ddlJob->taskList = list_make1(task); ddlJob->taskList = list_make1(task);

View File

@ -425,6 +425,22 @@ ClusterHasKnownMetadataWorkers()
} }
/*
* ShouldSyncUserCommandForObject checks if the user command should be synced to the
* worker nodes for the given object.
*/
bool
ShouldSyncUserCommandForObject(ObjectAddress objectAddress)
{
if (objectAddress.classId == RelationRelationId)
{
return ShouldSyncTableMetadata(objectAddress.objectId);
}
return false;
}
/* /*
* ShouldSyncTableMetadata checks if the metadata of a distributed table should be * ShouldSyncTableMetadata checks if the metadata of a distributed table should be
* propagated to metadata workers, i.e. the table is a hash distributed table or * propagated to metadata workers, i.e. the table is a hash distributed table or

View File

@ -1065,7 +1065,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
/* /*
* EnsurePartitionTableNotReplicated errors out if the infput relation is * EnsurePartitionTableNotReplicated errors out if the input relation is
* a partition table and the table has a replication factor greater than * a partition table and the table has a replication factor greater than
* one. * one.
* *

View File

@ -50,13 +50,13 @@ extern bool InDelegatedProcedureCall;
/* /*
* A DDLJob encapsulates the remote tasks and commands needed to process all or * A DDLJob encapsulates the remote tasks and commands needed to process all or
* part of a distributed DDL command. It hold the distributed relation's oid, * part of a distributed DDL command. It hold the target object's address,
* the original DDL command string (for MX DDL propagation), and a task list of * the original DDL command string (for MX DDL propagation), and a task list of
* DDL_TASK-type Tasks to be executed. * DDL_TASK-type Tasks to be executed.
*/ */
typedef struct DDLJob typedef struct DDLJob
{ {
Oid targetRelationId; /* oid of the target distributed relation */ ObjectAddress targetObjectAddress; /* target distributed object address */
/* /*
* Whether to commit and start a new transaction before sending commands * Whether to commit and start a new transaction before sending commands

View File

@ -13,6 +13,7 @@
#define METADATA_SYNC_H #define METADATA_SYNC_H
#include "distributed/commands/utility_hook.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
@ -34,6 +35,7 @@ extern void SyncCitusTableMetadata(Oid relationId);
extern void EnsureSequentialModeMetadataOperations(void); extern void EnsureSequentialModeMetadataOperations(void);
extern bool ClusterHasKnownMetadataWorkers(void); extern bool ClusterHasKnownMetadataWorkers(void);
extern char * LocalGroupIdUpdateCommand(int32 groupId); extern char * LocalGroupIdUpdateCommand(int32 groupId);
extern bool ShouldSyncUserCommandForObject(ObjectAddress objectAddress);
extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadata(Oid relationId);
extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId); extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId);
extern List * NodeMetadataCreateCommands(void); extern List * NodeMetadataCreateCommands(void);