mirror of https://github.com/citusdata/citus.git
Use object address instead of relation id on DDLJob to decide on syncing metadata
parent
63f229928f
commit
06a94d167e
|
@ -464,7 +464,8 @@ GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createInd
|
|||
{
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = CreateIndexStmtGetRelationId(createIndexStatement);
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId,
|
||||
CreateIndexStmtGetRelationId(createIndexStatement));
|
||||
ddlJob->startNewTransaction = createIndexStatement->concurrent;
|
||||
ddlJob->metadataSyncCommand = createIndexCommand;
|
||||
ddlJob->taskList = CreateIndexTaskList(createIndexStatement);
|
||||
|
@ -598,7 +599,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
|
|||
}
|
||||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
|
||||
ddlJob->startNewTransaction = IsReindexWithParam_compat(reindexStatement,
|
||||
"concurrently");
|
||||
ddlJob->metadataSyncCommand = reindexCommand;
|
||||
|
@ -695,7 +696,8 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
|
|||
MarkInvalidateForeignKeyGraph();
|
||||
}
|
||||
|
||||
ddlJob->targetRelationId = distributedRelationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId,
|
||||
distributedRelationId);
|
||||
|
||||
/*
|
||||
* We do not want DROP INDEX CONCURRENTLY to commit locally before
|
||||
|
|
|
@ -127,7 +127,7 @@ PreprocessRenameStmt(Node *node, const char *renameCommand,
|
|||
}
|
||||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = tableRelationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, tableRelationId);
|
||||
ddlJob->metadataSyncCommand = renameCommand;
|
||||
ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand);
|
||||
|
||||
|
|
|
@ -92,7 +92,7 @@ PreprocessCreateStatisticsStmt(Node *node, const char *queryString,
|
|||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
|
||||
ddlJob->startNewTransaction = false;
|
||||
ddlJob->metadataSyncCommand = ddlCommand;
|
||||
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
|
||||
|
@ -197,7 +197,7 @@ PreprocessDropStatisticsStmt(Node *node, const char *queryString,
|
|||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
|
||||
ddlJob->startNewTransaction = false;
|
||||
ddlJob->metadataSyncCommand = ddlCommand;
|
||||
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
|
||||
|
@ -236,7 +236,7 @@ PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString,
|
|||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
|
||||
ddlJob->startNewTransaction = false;
|
||||
ddlJob->metadataSyncCommand = ddlCommand;
|
||||
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
|
||||
|
@ -274,7 +274,7 @@ PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString,
|
|||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
|
||||
ddlJob->startNewTransaction = false;
|
||||
ddlJob->metadataSyncCommand = ddlCommand;
|
||||
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
|
||||
|
@ -376,7 +376,7 @@ PreprocessAlterStatisticsStmt(Node *node, const char *queryString,
|
|||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
|
||||
ddlJob->startNewTransaction = false;
|
||||
ddlJob->metadataSyncCommand = ddlCommand;
|
||||
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
|
||||
|
@ -416,7 +416,7 @@ PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString,
|
|||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
|
||||
ddlJob->startNewTransaction = false;
|
||||
ddlJob->metadataSyncCommand = ddlCommand;
|
||||
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
|
||||
|
|
|
@ -1102,7 +1102,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
|||
|
||||
/* fill them here as it is possible to use them in some conditional blocks below */
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = leftRelationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, leftRelationId);
|
||||
|
||||
const char *sqlForTaskList = alterTableCommand;
|
||||
if (deparseAT)
|
||||
|
@ -1779,7 +1779,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
|
|||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
QualifyTreeNode((Node *) stmt);
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
|
||||
ddlJob->metadataSyncCommand = DeparseTreeNode((Node *) stmt);
|
||||
ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand);
|
||||
return list_make1(ddlJob);
|
||||
|
|
|
@ -712,7 +712,7 @@ CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName,
|
|||
const char *queryString)
|
||||
{
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
|
||||
ddlJob->metadataSyncCommand = queryString;
|
||||
|
||||
if (!triggerName)
|
||||
|
|
|
@ -1044,16 +1044,20 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
|||
|
||||
EnsureCoordinator();
|
||||
|
||||
Oid targetRelationId = ddlJob->targetRelationId;
|
||||
ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress;
|
||||
|
||||
if (OidIsValid(targetRelationId))
|
||||
if (OidIsValid(targetObjectAddress.classId))
|
||||
{
|
||||
/*
|
||||
* Only for ddlJobs that are targetting a relation (table) we want to sync
|
||||
* its metadata and verify some properties around the table.
|
||||
* Only for ddlJobs that are targetting an object we want to sync
|
||||
* its metadata.
|
||||
*/
|
||||
shouldSyncMetadata = ShouldSyncTableMetadata(targetRelationId);
|
||||
EnsurePartitionTableNotReplicated(targetRelationId);
|
||||
shouldSyncMetadata = ShouldSyncUserCommandForObject(targetObjectAddress);
|
||||
|
||||
if (targetObjectAddress.classId == RelationRelationId)
|
||||
{
|
||||
EnsurePartitionTableNotReplicated(targetObjectAddress.objectId);
|
||||
}
|
||||
}
|
||||
|
||||
bool localExecutionSupported = true;
|
||||
|
@ -1304,7 +1308,7 @@ CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command)
|
|||
}
|
||||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
|
||||
ddlJob->metadataSyncCommand = GetTableDDLCommand(command);
|
||||
ddlJob->taskList = taskList;
|
||||
|
||||
|
@ -1555,7 +1559,7 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands)
|
|||
}
|
||||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = InvalidOid;
|
||||
ddlJob->targetObjectAddress = InvalidObjectAddress;
|
||||
ddlJob->metadataSyncCommand = NULL;
|
||||
ddlJob->taskList = list_make1(task);
|
||||
|
||||
|
|
|
@ -425,6 +425,22 @@ ClusterHasKnownMetadataWorkers()
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldSyncUserCommandForObject checks if the user command should be synced to the
|
||||
* worker nodes for the given object.
|
||||
*/
|
||||
bool
|
||||
ShouldSyncUserCommandForObject(ObjectAddress objectAddress)
|
||||
{
|
||||
if (objectAddress.classId == RelationRelationId)
|
||||
{
|
||||
return ShouldSyncTableMetadata(objectAddress.objectId);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldSyncTableMetadata checks if the metadata of a distributed table should be
|
||||
* propagated to metadata workers, i.e. the table is a hash distributed table or
|
||||
|
|
|
@ -1065,7 +1065,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
|||
|
||||
|
||||
/*
|
||||
* EnsurePartitionTableNotReplicated errors out if the infput relation is
|
||||
* EnsurePartitionTableNotReplicated errors out if the input relation is
|
||||
* a partition table and the table has a replication factor greater than
|
||||
* one.
|
||||
*
|
||||
|
|
|
@ -50,13 +50,13 @@ extern bool InDelegatedProcedureCall;
|
|||
|
||||
/*
|
||||
* A DDLJob encapsulates the remote tasks and commands needed to process all or
|
||||
* part of a distributed DDL command. It hold the distributed relation's oid,
|
||||
* part of a distributed DDL command. It hold the target object's address,
|
||||
* the original DDL command string (for MX DDL propagation), and a task list of
|
||||
* DDL_TASK-type Tasks to be executed.
|
||||
*/
|
||||
typedef struct DDLJob
|
||||
{
|
||||
Oid targetRelationId; /* oid of the target distributed relation */
|
||||
ObjectAddress targetObjectAddress; /* target distributed object address */
|
||||
|
||||
/*
|
||||
* Whether to commit and start a new transaction before sending commands
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
#define METADATA_SYNC_H
|
||||
|
||||
|
||||
#include "distributed/commands/utility_hook.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
@ -34,6 +35,7 @@ extern void SyncCitusTableMetadata(Oid relationId);
|
|||
extern void EnsureSequentialModeMetadataOperations(void);
|
||||
extern bool ClusterHasKnownMetadataWorkers(void);
|
||||
extern char * LocalGroupIdUpdateCommand(int32 groupId);
|
||||
extern bool ShouldSyncUserCommandForObject(ObjectAddress objectAddress);
|
||||
extern bool ShouldSyncTableMetadata(Oid relationId);
|
||||
extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId);
|
||||
extern List * NodeMetadataCreateCommands(void);
|
||||
|
|
Loading…
Reference in New Issue