mirror of https://github.com/citusdata/citus.git
Create indexes after worker_append_table_to_shard during shard repair
parent
24ee65a054
commit
0bcc227a62
|
@ -461,6 +461,29 @@ ResolveRelationId(text *relationName)
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
|
GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
|
||||||
|
{
|
||||||
|
List *tableDDLEventList = NIL;
|
||||||
|
List *tableCreationCommandList = NIL;
|
||||||
|
List *indexAndConstraintCommandList = NIL;
|
||||||
|
|
||||||
|
tableCreationCommandList = GetTableCreationCommands(relationId,
|
||||||
|
includeSequenceDefaults);
|
||||||
|
tableDDLEventList = list_concat(tableDDLEventList, tableCreationCommandList);
|
||||||
|
|
||||||
|
indexAndConstraintCommandList = GetTableIndexAndConstraintCommands(relationId);
|
||||||
|
tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList);
|
||||||
|
|
||||||
|
return tableDDLEventList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetTableCreationCommands takes in a relationId, and returns the list of DDL
|
||||||
|
* commands needed to reconstruct the relation, excluding indexes and
|
||||||
|
* constraints.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
GetTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
|
||||||
{
|
{
|
||||||
List *tableDDLEventList = NIL;
|
List *tableDDLEventList = NIL;
|
||||||
char tableType = 0;
|
char tableType = 0;
|
||||||
|
@ -471,12 +494,6 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
|
||||||
char *createSchemaCommand = NULL;
|
char *createSchemaCommand = NULL;
|
||||||
Oid schemaId = InvalidOid;
|
Oid schemaId = InvalidOid;
|
||||||
|
|
||||||
Relation pgIndex = NULL;
|
|
||||||
SysScanDesc scanDescriptor = NULL;
|
|
||||||
ScanKeyData scanKey[1];
|
|
||||||
int scanKeyCount = 1;
|
|
||||||
HeapTuple heapTuple = NULL;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Set search_path to NIL so that all objects outside of pg_catalog will be
|
* Set search_path to NIL so that all objects outside of pg_catalog will be
|
||||||
* schema-prefixed. pg_catalog will be added automatically when we call
|
* schema-prefixed. pg_catalog will be added automatically when we call
|
||||||
|
@ -528,6 +545,37 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
|
||||||
tableDDLEventList = lappend(tableDDLEventList, tableColumnOptionsDef);
|
tableDDLEventList = lappend(tableDDLEventList, tableColumnOptionsDef);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* revert back to original search_path */
|
||||||
|
PopOverrideSearchPath();
|
||||||
|
|
||||||
|
return tableDDLEventList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetTableIndexAndConstraintCommands returns the list of DDL commands to
|
||||||
|
* (re)create indexes and constraints for a given table.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
GetTableIndexAndConstraintCommands(Oid relationId)
|
||||||
|
{
|
||||||
|
List *indexDDLEventList = NIL;
|
||||||
|
Relation pgIndex = NULL;
|
||||||
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
int scanKeyCount = 1;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set search_path to NIL so that all objects outside of pg_catalog will be
|
||||||
|
* schema-prefixed. pg_catalog will be added automatically when we call
|
||||||
|
* PushOverrideSearchPath(), since we set addCatalog to true;
|
||||||
|
*/
|
||||||
|
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
|
||||||
|
overridePath->schemas = NIL;
|
||||||
|
overridePath->addCatalog = true;
|
||||||
|
PushOverrideSearchPath(overridePath);
|
||||||
|
|
||||||
/* open system catalog and scan all indexes that belong to this table */
|
/* open system catalog and scan all indexes that belong to this table */
|
||||||
pgIndex = heap_open(IndexRelationId, AccessShareLock);
|
pgIndex = heap_open(IndexRelationId, AccessShareLock);
|
||||||
|
|
||||||
|
@ -580,7 +628,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* append found constraint or index definition to the list */
|
/* append found constraint or index definition to the list */
|
||||||
tableDDLEventList = lappend(tableDDLEventList, statementDef);
|
indexDDLEventList = lappend(indexDDLEventList, statementDef);
|
||||||
|
|
||||||
/* if table is clustered on this index, append definition to the list */
|
/* if table is clustered on this index, append definition to the list */
|
||||||
if (indexForm->indisclustered)
|
if (indexForm->indisclustered)
|
||||||
|
@ -588,7 +636,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
|
||||||
char *clusteredDef = pg_get_indexclusterdef_string(indexId);
|
char *clusteredDef = pg_get_indexclusterdef_string(indexId);
|
||||||
Assert(clusteredDef != NULL);
|
Assert(clusteredDef != NULL);
|
||||||
|
|
||||||
tableDDLEventList = lappend(tableDDLEventList, clusteredDef);
|
indexDDLEventList = lappend(indexDDLEventList, clusteredDef);
|
||||||
}
|
}
|
||||||
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
@ -601,7 +649,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
|
||||||
/* revert back to original search_path */
|
/* revert back to original search_path */
|
||||||
PopOverrideSearchPath();
|
PopOverrideSearchPath();
|
||||||
|
|
||||||
return tableDDLEventList;
|
return indexDDLEventList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,7 @@ static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName,
|
||||||
int32 sourceNodePort, char *targetNodeName,
|
int32 sourceNodePort, char *targetNodeName,
|
||||||
int32 targetNodePort);
|
int32 targetNodePort);
|
||||||
static List * RecreateTableDDLCommandList(Oid relationId);
|
static List * RecreateTableDDLCommandList(Oid relationId);
|
||||||
|
static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId);
|
||||||
|
|
||||||
/* declarations for dynamic loading */
|
/* declarations for dynamic loading */
|
||||||
PG_FUNCTION_INFO_V1(master_copy_shard_placement);
|
PG_FUNCTION_INFO_V1(master_copy_shard_placement);
|
||||||
|
@ -247,27 +248,20 @@ List *
|
||||||
CopyShardCommandList(ShardInterval *shardInterval,
|
CopyShardCommandList(ShardInterval *shardInterval,
|
||||||
char *sourceNodeName, int32 sourceNodePort)
|
char *sourceNodeName, int32 sourceNodePort)
|
||||||
{
|
{
|
||||||
|
int64 shardId = shardInterval->shardId;
|
||||||
char *shardName = ConstructQualifiedShardName(shardInterval);
|
char *shardName = ConstructQualifiedShardName(shardInterval);
|
||||||
List *ddlCommandList = NIL;
|
List *tableRecreationCommandList = NIL;
|
||||||
ListCell *ddlCommandCell = NULL;
|
List *indexCommandList = NIL;
|
||||||
List *copyShardToNodeCommandsList = NIL;
|
List *copyShardToNodeCommandsList = NIL;
|
||||||
StringInfo copyShardDataCommand = makeStringInfo();
|
StringInfo copyShardDataCommand = makeStringInfo();
|
||||||
|
Oid relationId = shardInterval->relationId;
|
||||||
|
|
||||||
ddlCommandList = RecreateTableDDLCommandList(shardInterval->relationId);
|
tableRecreationCommandList = RecreateTableDDLCommandList(relationId);
|
||||||
|
tableRecreationCommandList =
|
||||||
|
WorkerApplyShardDDLCommandList(tableRecreationCommandList, shardId);
|
||||||
|
|
||||||
foreach(ddlCommandCell, ddlCommandList)
|
copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList,
|
||||||
{
|
tableRecreationCommandList);
|
||||||
char *ddlCommand = lfirst(ddlCommandCell);
|
|
||||||
char *escapedDdlCommand = quote_literal_cstr(ddlCommand);
|
|
||||||
|
|
||||||
StringInfo applyDdlCommand = makeStringInfo();
|
|
||||||
appendStringInfo(applyDdlCommand,
|
|
||||||
WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA,
|
|
||||||
shardInterval->shardId, escapedDdlCommand);
|
|
||||||
|
|
||||||
copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList,
|
|
||||||
applyDdlCommand->data);
|
|
||||||
}
|
|
||||||
|
|
||||||
appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,
|
appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,
|
||||||
quote_literal_cstr(shardName), /* table to append */
|
quote_literal_cstr(shardName), /* table to append */
|
||||||
|
@ -278,6 +272,12 @@ CopyShardCommandList(ShardInterval *shardInterval,
|
||||||
copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList,
|
copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList,
|
||||||
copyShardDataCommand->data);
|
copyShardDataCommand->data);
|
||||||
|
|
||||||
|
indexCommandList = GetTableIndexAndConstraintCommands(relationId);
|
||||||
|
indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList, shardId);
|
||||||
|
|
||||||
|
copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList,
|
||||||
|
indexCommandList);
|
||||||
|
|
||||||
return copyShardToNodeCommandsList;
|
return copyShardToNodeCommandsList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -368,8 +368,8 @@ ConstructQualifiedShardName(ShardInterval *shardInterval)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RecreateTableDDLCommandList returns a list of DDL statements similar to that
|
* RecreateTableDDLCommandList returns a list of DDL statements similar to that
|
||||||
* returned by GetTableDDLEvents except that the list begins with a "DROP TABLE"
|
* returned by GetTableCreationCommands except that the list begins with a "DROP TABLE"
|
||||||
* or "DROP FOREIGN TABLE" statement to facilitate total recreation of a placement.
|
* or "DROP FOREIGN TABLE" statement to facilitate idempotent recreation of a placement.
|
||||||
*/
|
*/
|
||||||
static List *
|
static List *
|
||||||
RecreateTableDDLCommandList(Oid relationId)
|
RecreateTableDDLCommandList(Oid relationId)
|
||||||
|
@ -405,10 +405,36 @@ RecreateTableDDLCommandList(Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
dropCommandList = list_make1(dropCommand->data);
|
dropCommandList = list_make1(dropCommand->data);
|
||||||
|
createCommandList = GetTableCreationCommands(relationId, includeSequenceDefaults);
|
||||||
createCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults);
|
|
||||||
|
|
||||||
recreateCommandList = list_concat(dropCommandList, createCommandList);
|
recreateCommandList = list_concat(dropCommandList, createCommandList);
|
||||||
|
|
||||||
return recreateCommandList;
|
return recreateCommandList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WorkerApplyShardDDLCommandList wraps all DDL commands in ddlCommandList
|
||||||
|
* in a call to worker_apply_shard_ddl_command to apply the DDL command to
|
||||||
|
* the shard specified by shardId.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId)
|
||||||
|
{
|
||||||
|
List *applyDdlCommandList = NIL;
|
||||||
|
ListCell *ddlCommandCell = NULL;
|
||||||
|
|
||||||
|
foreach(ddlCommandCell, ddlCommandList)
|
||||||
|
{
|
||||||
|
char *ddlCommand = lfirst(ddlCommandCell);
|
||||||
|
char *escapedDdlCommand = quote_literal_cstr(ddlCommand);
|
||||||
|
|
||||||
|
StringInfo applyDdlCommand = makeStringInfo();
|
||||||
|
appendStringInfo(applyDdlCommand,
|
||||||
|
WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA,
|
||||||
|
shardId, escapedDdlCommand);
|
||||||
|
|
||||||
|
applyDdlCommandList = lappend(applyDdlCommandList, applyDdlCommand->data);
|
||||||
|
}
|
||||||
|
|
||||||
|
return applyDdlCommandList;
|
||||||
|
}
|
||||||
|
|
|
@ -101,6 +101,8 @@ extern uint64 GetNextShardId(void);
|
||||||
extern uint64 GetNextPlacementId(void);
|
extern uint64 GetNextPlacementId(void);
|
||||||
extern Oid ResolveRelationId(text *relationName);
|
extern Oid ResolveRelationId(text *relationName);
|
||||||
extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation);
|
extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation);
|
||||||
|
extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation);
|
||||||
|
extern List * GetTableIndexAndConstraintCommands(Oid relationId);
|
||||||
extern List * GetTableForeignConstraintCommands(Oid relationId);
|
extern List * GetTableForeignConstraintCommands(Oid relationId);
|
||||||
extern char ShardStorageType(Oid relationId);
|
extern char ShardStorageType(Oid relationId);
|
||||||
extern void CheckDistributedTable(Oid relationId);
|
extern void CheckDistributedTable(Oid relationId);
|
||||||
|
|
Loading…
Reference in New Issue