diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index a635cde62..db83b2665 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -461,6 +461,29 @@ ResolveRelationId(text *relationName) */ List * GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) +{ + List *tableDDLEventList = NIL; + List *tableCreationCommandList = NIL; + List *indexAndConstraintCommandList = NIL; + + tableCreationCommandList = GetTableCreationCommands(relationId, + includeSequenceDefaults); + tableDDLEventList = list_concat(tableDDLEventList, tableCreationCommandList); + + indexAndConstraintCommandList = GetTableIndexAndConstraintCommands(relationId); + tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList); + + return tableDDLEventList; +} + + +/* + * GetTableCreationCommands takes in a relationId, and returns the list of DDL + * commands needed to reconstruct the relation, excluding indexes and + * constraints. + */ +List * +GetTableCreationCommands(Oid relationId, bool includeSequenceDefaults) { List *tableDDLEventList = NIL; char tableType = 0; @@ -471,12 +494,6 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) char *createSchemaCommand = NULL; Oid schemaId = InvalidOid; - Relation pgIndex = NULL; - SysScanDesc scanDescriptor = NULL; - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - HeapTuple heapTuple = NULL; - /* * Set search_path to NIL so that all objects outside of pg_catalog will be * schema-prefixed. pg_catalog will be added automatically when we call @@ -528,6 +545,37 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) tableDDLEventList = lappend(tableDDLEventList, tableColumnOptionsDef); } + /* revert back to original search_path */ + PopOverrideSearchPath(); + + return tableDDLEventList; +} + + +/* + * GetTableIndexAndConstraintCommands returns the list of DDL commands to + * (re)create indexes and constraints for a given table. + */ +List * +GetTableIndexAndConstraintCommands(Oid relationId) +{ + List *indexDDLEventList = NIL; + Relation pgIndex = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + HeapTuple heapTuple = NULL; + + /* + * Set search_path to NIL so that all objects outside of pg_catalog will be + * schema-prefixed. pg_catalog will be added automatically when we call + * PushOverrideSearchPath(), since we set addCatalog to true; + */ + OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext); + overridePath->schemas = NIL; + overridePath->addCatalog = true; + PushOverrideSearchPath(overridePath); + /* open system catalog and scan all indexes that belong to this table */ pgIndex = heap_open(IndexRelationId, AccessShareLock); @@ -580,7 +628,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) } /* append found constraint or index definition to the list */ - tableDDLEventList = lappend(tableDDLEventList, statementDef); + indexDDLEventList = lappend(indexDDLEventList, statementDef); /* if table is clustered on this index, append definition to the list */ if (indexForm->indisclustered) @@ -588,7 +636,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) char *clusteredDef = pg_get_indexclusterdef_string(indexId); Assert(clusteredDef != NULL); - tableDDLEventList = lappend(tableDDLEventList, clusteredDef); + indexDDLEventList = lappend(indexDDLEventList, clusteredDef); } heapTuple = systable_getnext(scanDescriptor); @@ -601,7 +649,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) /* revert back to original search_path */ PopOverrideSearchPath(); - return tableDDLEventList; + return indexDDLEventList; } diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 4b7ac8a22..2471da365 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -46,6 +46,7 @@ static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); static List * RecreateTableDDLCommandList(Oid relationId); +static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_copy_shard_placement); @@ -247,27 +248,20 @@ List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, int32 sourceNodePort) { + int64 shardId = shardInterval->shardId; char *shardName = ConstructQualifiedShardName(shardInterval); - List *ddlCommandList = NIL; - ListCell *ddlCommandCell = NULL; + List *tableRecreationCommandList = NIL; + List *indexCommandList = NIL; List *copyShardToNodeCommandsList = NIL; StringInfo copyShardDataCommand = makeStringInfo(); + Oid relationId = shardInterval->relationId; - ddlCommandList = RecreateTableDDLCommandList(shardInterval->relationId); + tableRecreationCommandList = RecreateTableDDLCommandList(relationId); + tableRecreationCommandList = + WorkerApplyShardDDLCommandList(tableRecreationCommandList, shardId); - foreach(ddlCommandCell, ddlCommandList) - { - char *ddlCommand = lfirst(ddlCommandCell); - char *escapedDdlCommand = quote_literal_cstr(ddlCommand); - - StringInfo applyDdlCommand = makeStringInfo(); - appendStringInfo(applyDdlCommand, - WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA, - shardInterval->shardId, escapedDdlCommand); - - copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, - applyDdlCommand->data); - } + copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, + tableRecreationCommandList); appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, quote_literal_cstr(shardName), /* table to append */ @@ -278,6 +272,12 @@ CopyShardCommandList(ShardInterval *shardInterval, copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, copyShardDataCommand->data); + indexCommandList = GetTableIndexAndConstraintCommands(relationId); + indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList, shardId); + + copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, + indexCommandList); + return copyShardToNodeCommandsList; } @@ -368,8 +368,8 @@ ConstructQualifiedShardName(ShardInterval *shardInterval) /* * RecreateTableDDLCommandList returns a list of DDL statements similar to that - * returned by GetTableDDLEvents except that the list begins with a "DROP TABLE" - * or "DROP FOREIGN TABLE" statement to facilitate total recreation of a placement. + * returned by GetTableCreationCommands except that the list begins with a "DROP TABLE" + * or "DROP FOREIGN TABLE" statement to facilitate idempotent recreation of a placement. */ static List * RecreateTableDDLCommandList(Oid relationId) @@ -405,10 +405,36 @@ RecreateTableDDLCommandList(Oid relationId) } dropCommandList = list_make1(dropCommand->data); - - createCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults); - + createCommandList = GetTableCreationCommands(relationId, includeSequenceDefaults); recreateCommandList = list_concat(dropCommandList, createCommandList); return recreateCommandList; } + + +/* + * WorkerApplyShardDDLCommandList wraps all DDL commands in ddlCommandList + * in a call to worker_apply_shard_ddl_command to apply the DDL command to + * the shard specified by shardId. + */ +static List * +WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId) +{ + List *applyDdlCommandList = NIL; + ListCell *ddlCommandCell = NULL; + + foreach(ddlCommandCell, ddlCommandList) + { + char *ddlCommand = lfirst(ddlCommandCell); + char *escapedDdlCommand = quote_literal_cstr(ddlCommand); + + StringInfo applyDdlCommand = makeStringInfo(); + appendStringInfo(applyDdlCommand, + WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA, + shardId, escapedDdlCommand); + + applyDdlCommandList = lappend(applyDdlCommandList, applyDdlCommand->data); + } + + return applyDdlCommandList; +} diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 4e6819580..020b64754 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -101,6 +101,8 @@ extern uint64 GetNextShardId(void); extern uint64 GetNextPlacementId(void); extern Oid ResolveRelationId(text *relationName); extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation); +extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation); +extern List * GetTableIndexAndConstraintCommands(Oid relationId); extern List * GetTableForeignConstraintCommands(Oid relationId); extern char ShardStorageType(Oid relationId); extern void CheckDistributedTable(Oid relationId);