From a231ff29b092d549d935ead6f80e78b2bffd200c Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 10 May 2021 19:50:00 +0200 Subject: [PATCH] Get prepared for some improvements for online rebalancer To see all the changes, see https://github.com/citusdata/citus-enterprise/pull/586/files --- .../distributed/commands/alter_table.c | 5 +- src/backend/distributed/commands/index.c | 5 +- .../distributed/operations/node_protocol.c | 137 +++++++++++++++--- .../distributed/operations/repair_shards.c | 76 ++++++---- .../planner/local_distributed_join_planner.c | 10 +- src/include/distributed/commands.h | 4 +- .../distributed/coordinator_protocol.h | 27 +++- 7 files changed, 210 insertions(+), 54 deletions(-) diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index ddc62021a..84e47eac7 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -558,8 +558,11 @@ ConvertTable(TableConversionState *con) includeIndexes = false; } + + bool includeReplicaIdentity = true; List *postLoadCommands = GetPostLoadTableCreationCommands(con->relationId, - includeIndexes); + includeIndexes, + includeReplicaIdentity); List *justBeforeDropCommands = NIL; List *attachPartitionCommands = NIL; diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 1f66e512b..c4cbc453d 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -303,7 +303,8 @@ CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement) * It returns a list that is filled by the pgIndexProcessor. */ List * -ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor pgIndexProcessor) +ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor pgIndexProcessor, + int indexFlags) { List *result = NIL; ScanKeyData scanKey[1]; @@ -325,7 +326,7 @@ ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor pgIndexProcesso while (HeapTupleIsValid(heapTuple)) { Form_pg_index indexForm = (Form_pg_index) GETSTRUCT(heapTuple); - pgIndexProcessor(indexForm, &result); + pgIndexProcessor(indexForm, &result, indexFlags); heapTuple = systable_getnext(scanDescriptor); } diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index f4def2511..fc03a09d1 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -74,10 +74,13 @@ int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; int NextShardId = 0; int NextPlacementId = 0; -static List * GetTableReplicaIdentityCommand(Oid relationId); +static void GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_pg_index + indexForm, + List ** + indexDDLEventList, + int indexFlags); static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); -static void GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, - List **indexDDLEventList); + /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_get_table_metadata); @@ -549,7 +552,7 @@ GetFullTableCreationCommands(Oid relationId, bool includeSequenceDefaults) tableDDLEventList = list_concat(tableDDLEventList, preLoadCreationCommandList); List *postLoadCreationCommandList = - GetPostLoadTableCreationCommands(relationId, true); + GetPostLoadTableCreationCommands(relationId, true, true); tableDDLEventList = list_concat(tableDDLEventList, postLoadCreationCommandList); @@ -562,19 +565,43 @@ GetFullTableCreationCommands(Oid relationId, bool includeSequenceDefaults) * of DDL commands that should be applied after loading the data. */ List * -GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes) +GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes, + bool includeReplicaIdentity) { List *tableDDLEventList = NIL; - if (includeIndexes) + /* + * Include all the commands (e.g., create index, set index clustered + * and set index statistics) regarding the indexes. Note that + * running all these commands in parallel might fail as the + * latter two depends on the first one. So, the caller should + * execute the commands sequentially. + */ + int indexFlags = INCLUDE_INDEX_ALL_STATEMENTS; + + if (includeIndexes && includeReplicaIdentity) { List *indexAndConstraintCommandList = - GetTableIndexAndConstraintCommands(relationId); + GetTableIndexAndConstraintCommands(relationId, indexFlags); + tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList); + } + else if (includeIndexes && !includeReplicaIdentity) + { + /* + * Do not include the indexes/constraints that backs + * replica identity, if any. + */ + List *indexAndConstraintCommandList = + GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(relationId, + indexFlags); tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList); } - List *replicaIdentityEvents = GetTableReplicaIdentityCommand(relationId); - tableDDLEventList = list_concat(tableDDLEventList, replicaIdentityEvents); + if (includeReplicaIdentity) + { + List *replicaIdentityEvents = GetTableReplicaIdentityCommand(relationId); + tableDDLEventList = list_concat(tableDDLEventList, replicaIdentityEvents); + } List *triggerCommands = GetExplicitTriggerCommandList(relationId); tableDDLEventList = list_concat(tableDDLEventList, triggerCommands); @@ -590,7 +617,7 @@ GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes) * GetTableReplicaIdentityCommand returns the list of DDL commands to * (re)define the replica identity choice for a given table. */ -static List * +List * GetTableReplicaIdentityCommand(Oid relationId) { List *replicaIdentityCreateCommandList = NIL; @@ -694,18 +721,82 @@ GetPreLoadTableCreationCommands(Oid relationId, bool includeSequenceDefaults, * (re)create indexes and constraints for a given table. */ List * -GetTableIndexAndConstraintCommands(Oid relationId) +GetTableIndexAndConstraintCommands(Oid relationId, int indexFlags) { return ExecuteFunctionOnEachTableIndex(relationId, - GatherIndexAndConstraintDefinitionList); + GatherIndexAndConstraintDefinitionList, + indexFlags); +} + + +/* + * GetTableIndexAndConstraintCommands returns the list of DDL commands to + * (re)create indexes and constraints for a given table. + */ +List * +GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(Oid relationId, int indexFlags) +{ + return ExecuteFunctionOnEachTableIndex(relationId, + GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity, + indexFlags); +} + + +/* + * GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity is a wrapper around + * GatherIndexAndConstraintDefinitionList(), which only excludes the indexes or + * constraints that back the replica identity. + */ +static void +GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_pg_index indexForm, + List **indexDDLEventList, + int indexFlags) +{ + Oid relationId = indexForm->indrelid; + Relation relation = table_open(relationId, AccessShareLock); + + Oid replicaIdentityIndex = GetRelationIdentityOrPK(relation); + + if (replicaIdentityIndex == indexForm->indexrelid) + { + /* this index is backing the replica identity, so skip */ + table_close(relation, NoLock); + return; + } + + GatherIndexAndConstraintDefinitionList(indexForm, indexDDLEventList, indexFlags); + + table_close(relation, NoLock); +} + + +/* + * Get replica identity index or if it is not defined a primary key. + * + * If neither is defined, returns InvalidOid. + * + * Inspired from postgres/src/backend/replication/logical/worker.c + */ +Oid +GetRelationIdentityOrPK(Relation rel) +{ + Oid idxoid = RelationGetReplicaIndex(rel); + + if (!OidIsValid(idxoid)) + { + idxoid = RelationGetPrimaryKeyIndex(rel); + } + + return idxoid; } /* * GatherIndexAndConstraintDefinitionList adds the DDL command for the given index. */ -static void -GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLEventList) +void +GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLEventList, + int indexFlags) { Oid indexId = indexForm->indexrelid; char *statementDef = NULL; @@ -726,11 +817,15 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE } /* append found constraint or index definition to the list */ - *indexDDLEventList = lappend(*indexDDLEventList, makeTableDDLCommandString( - statementDef)); + if (indexFlags & INCLUDE_CREATE_INDEX_STATEMENTS) + { + *indexDDLEventList = lappend(*indexDDLEventList, makeTableDDLCommandString( + statementDef)); + } /* if table is clustered on this index, append definition to the list */ - if (indexForm->indisclustered) + if ((indexFlags & INCLUDE_INDEX_CLUSTERED_STATEMENTS) && + indexForm->indisclustered) { char *clusteredDef = pg_get_indexclusterdef_string(indexId); Assert(clusteredDef != NULL); @@ -740,8 +835,12 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE } /* we need alter index commands for altered targets on expression indexes */ - List *alterIndexStatisticsCommands = GetAlterIndexStatisticsCommands(indexId); - *indexDDLEventList = list_concat(*indexDDLEventList, alterIndexStatisticsCommands); + if (indexFlags & INCLUDE_INDEX_STATISTICS_STATEMENTTS) + { + List *alterIndexStatisticsCommands = GetAlterIndexStatisticsCommands(indexId); + *indexDDLEventList = list_concat(*indexDDLEventList, + alterIndexStatisticsCommands); + } } diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index b5b30606e..86f1490d4 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -63,7 +63,10 @@ static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName char shardReplicationMode); static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort); + int32 targetNodePort, bool useLogicalReplication); +static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, + int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort); static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval, const char *sourceNodeName, int32 sourceNodePort); @@ -359,8 +362,9 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) * CopyColocatedShardPlacement function copies given shard with its co-located * shards. */ + bool useLogicalReplication = false; CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort); + targetNodePort, useLogicalReplication); ShardInterval *colocatedShard = NULL; foreach_ptr(colocatedShard, colocatedShardList) @@ -741,8 +745,9 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode); } + bool useLogicalReplication = false; CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); + targetNodeName, targetNodePort, useLogicalReplication); /* * Finally insert the placements to pg_dist_placement and sync it to the @@ -820,32 +825,51 @@ EnsureTableListSuitableForReplication(List *tableIdList) /* - * CopyColocatedShardPlacement copies a shard along with its co-located shards - * from a source node to target node. It does not make any checks about state - * of the shards. It is caller's responsibility to make those checks if they are - * necessary. + * CopyShardTables copies a shard along with its co-located shards from a source + * node to target node. It does not make any checks about state of the shards. + * It is caller's responsibility to make those checks if they are necessary. */ static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort) + char *targetNodeName, int32 targetNodePort, bool useLogicalReplication) { - ShardInterval *shardInterval = NULL; + if (list_length(shardIntervalList) < 1) + { + return; + } + if (useLogicalReplication) + { + /* only supported in Citus enterprise */ + } + else + { + CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); + } +} + + +/* + * CopyShardTablesViaBlockWrites copies a shard along with its co-located shards + * from a source node to target node via COPY command. While the command is in + * progress, the modifications on the source node is blocked. + */ +static void +CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort) +{ MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, - "CopyShardTables", + "CopyShardTablesViaBlockWrites", ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); /* iterate through the colocated shards and copy each */ + ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) { - bool includeDataCopy = true; - - if (PartitionedTable(shardInterval->relationId)) - { - /* partitioned tables contain no data */ - includeDataCopy = false; - } + bool includeDataCopy = !PartitionedTable(shardInterval->relationId); List *ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort, includeDataCopy); @@ -853,11 +877,10 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); + + MemoryContextReset(localContext); } - MemoryContextReset(localContext); - - /* * Once all shards are created, we can recreate relationships between shards. * @@ -868,15 +891,14 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP { List *shardForeignConstraintCommandList = NIL; List *referenceTableForeignConstraintList = NIL; - - char *tableOwner = TableOwner(shardInterval->relationId); + List *commandList = NIL; CopyShardForeignConstraintCommandListGrouped(shardInterval, &shardForeignConstraintCommandList, &referenceTableForeignConstraintList); - List *commandList = list_concat(shardForeignConstraintCommandList, - referenceTableForeignConstraintList); + commandList = list_concat(commandList, shardForeignConstraintCommandList); + commandList = list_concat(commandList, referenceTableForeignConstraintList); if (PartitionTable(shardInterval->relationId)) { @@ -886,8 +908,10 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP commandList = lappend(commandList, attachPartitionCommand); } + char *tableOwner = TableOwner(shardInterval->relationId); SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, commandList); + MemoryContextReset(localContext); } @@ -1079,7 +1103,9 @@ CopyShardCommandList(ShardInterval *shardInterval, const char *sourceNodeName, copyShardDataCommand->data); } - List *indexCommandList = GetPostLoadTableCreationCommands(relationId, true); + bool includeReplicaIdentity = true; + List *indexCommandList = + GetPostLoadTableCreationCommands(relationId, true, includeReplicaIdentity); indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList, shardId); copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, diff --git a/src/backend/distributed/planner/local_distributed_join_planner.c b/src/backend/distributed/planner/local_distributed_join_planner.c index bb7c1d4de..9f93b447c 100644 --- a/src/backend/distributed/planner/local_distributed_join_planner.c +++ b/src/backend/distributed/planner/local_distributed_join_planner.c @@ -94,6 +94,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_router_planner.h" +#include "distributed/coordinator_protocol.h" #include "distributed/query_colocation_checker.h" #include "distributed/query_pushdown_planning.h" #include "distributed/recursive_planning.h" @@ -179,7 +180,8 @@ static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionConte plannerRestrictionContext, List *rangeTableList, int resultRTEIdentity); -static void AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexes); +static void AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexes, + int flags); static ConversionChoice GetConversionChoice(ConversionCandidates * conversionCandidates, PlannerRestrictionContext * @@ -403,7 +405,8 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry, FetchEqualityAttrNumsForRTE((Node *) restrictClauseList); List *uniqueIndexColumnsList = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid, - AppendUniqueIndexColumnsToList); + AppendUniqueIndexColumnsToList, + INCLUDE_INDEX_ALL_STATEMENTS); IndexColumns *indexColumns = NULL; foreach_ptr(indexColumns, uniqueIndexColumnsList) { @@ -442,7 +445,8 @@ FirstIsSuperSetOfSecond(List *firstIntList, List *secondIntList) * unique index. */ static void -AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexGroups) +AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexGroups, + int flags) { if (indexForm->indisunique || indexForm->indisprimary) { diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 4810e5915..8c1282129 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -120,7 +120,7 @@ extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand, ProcessUtilityContext processUtilityContext); /* index.c */ -typedef void (*PGIndexProcessor)(Form_pg_index, List **); +typedef void (*PGIndexProcessor)(Form_pg_index, List **, int); /* call.c */ @@ -277,7 +277,7 @@ extern List * PostprocessIndexStmt(Node *node, extern void ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement); extern void MarkIndexValid(IndexStmt *indexStmt); extern List * ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor - pgIndexProcessor); + pgIndexProcessor, int flags); /* objectaddress.c - forward declarations */ extern ObjectAddress CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok); diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 4c30b2b04..779c05adb 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -94,6 +94,21 @@ typedef enum TableDDLCommandType } TableDDLCommandType; +/* + * IndexDefinitionDeparseFlags helps to control which parts of the + * index creation commands are deparsed. + */ +typedef enum IndexDefinitionDeparseFlags +{ + INCLUDE_CREATE_INDEX_STATEMENTS = 1 << 0, + INCLUDE_INDEX_CLUSTERED_STATEMENTS = 1 << 1, + INCLUDE_INDEX_STATISTICS_STATEMENTTS = 1 << 2, + INCLUDE_INDEX_ALL_STATEMENTS = INCLUDE_CREATE_INDEX_STATEMENTS | + INCLUDE_INDEX_CLUSTERED_STATEMENTS | + INCLUDE_INDEX_STATISTICS_STATEMENTTS +} IndexDefinitionDeparseFlags; + + struct TableDDLCommand; typedef struct TableDDLCommand TableDDLCommand; typedef char *(*TableDDLFunction)(void *context); @@ -177,12 +192,20 @@ extern uint64 GetNextShardId(void); extern uint64 GetNextPlacementId(void); extern Oid ResolveRelationId(text *relationName, bool missingOk); extern List * GetFullTableCreationCommands(Oid relationId, bool includeSequenceDefaults); -extern List * GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes); +extern List * GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes, + bool includeReplicaIdentity); extern List * GetPreLoadTableCreationCommands(Oid relationId, bool includeSequenceDefaults, char *accessMethod); -extern List * GetTableIndexAndConstraintCommands(Oid relationId); +extern List * GetTableIndexAndConstraintCommands(Oid relationId, int indexFlags); +extern List * GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(Oid relationId, + int indexFlags); +extern Oid GetRelationIdentityOrPK(Relation rel); +extern void GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, + List **indexDDLEventList, + int indexFlags); extern bool IndexImpliedByAConstraint(Form_pg_index indexForm); +extern List * GetTableReplicaIdentityCommand(Oid relationId); extern char ShardStorageType(Oid relationId); extern bool DistributedTableReplicationIsEnabled(void); extern void CheckDistributedTable(Oid relationId);