Merge pull request #4973 from citusdata/base_for_logical

Get prepared for some improvements for online rebalancer
pull/4974/head
Önder Kalacı 2021-05-11 12:19:36 +02:00 committed by GitHub
commit 398d2472f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 210 additions and 54 deletions

View File

@ -558,8 +558,11 @@ ConvertTable(TableConversionState *con)
includeIndexes = false;
}
bool includeReplicaIdentity = true;
List *postLoadCommands = GetPostLoadTableCreationCommands(con->relationId,
includeIndexes);
includeIndexes,
includeReplicaIdentity);
List *justBeforeDropCommands = NIL;
List *attachPartitionCommands = NIL;

View File

@ -303,7 +303,8 @@ CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement)
* It returns a list that is filled by the pgIndexProcessor.
*/
List *
ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor pgIndexProcessor)
ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor pgIndexProcessor,
int indexFlags)
{
List *result = NIL;
ScanKeyData scanKey[1];
@ -325,7 +326,7 @@ ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor pgIndexProcesso
while (HeapTupleIsValid(heapTuple))
{
Form_pg_index indexForm = (Form_pg_index) GETSTRUCT(heapTuple);
pgIndexProcessor(indexForm, &result);
pgIndexProcessor(indexForm, &result, indexFlags);
heapTuple = systable_getnext(scanDescriptor);
}

View File

@ -74,10 +74,13 @@ int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;
int NextShardId = 0;
int NextPlacementId = 0;
static List * GetTableReplicaIdentityCommand(Oid relationId);
static void GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_pg_index
indexForm,
List **
indexDDLEventList,
int indexFlags);
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
static void GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm,
List **indexDDLEventList);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_get_table_metadata);
@ -549,7 +552,7 @@ GetFullTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
tableDDLEventList = list_concat(tableDDLEventList, preLoadCreationCommandList);
List *postLoadCreationCommandList =
GetPostLoadTableCreationCommands(relationId, true);
GetPostLoadTableCreationCommands(relationId, true, true);
tableDDLEventList = list_concat(tableDDLEventList, postLoadCreationCommandList);
@ -562,19 +565,43 @@ GetFullTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
* of DDL commands that should be applied after loading the data.
*/
List *
GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes)
GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes,
bool includeReplicaIdentity)
{
List *tableDDLEventList = NIL;
if (includeIndexes)
/*
* Include all the commands (e.g., create index, set index clustered
* and set index statistics) regarding the indexes. Note that
* running all these commands in parallel might fail as the
* latter two depends on the first one. So, the caller should
* execute the commands sequentially.
*/
int indexFlags = INCLUDE_INDEX_ALL_STATEMENTS;
if (includeIndexes && includeReplicaIdentity)
{
List *indexAndConstraintCommandList =
GetTableIndexAndConstraintCommands(relationId);
GetTableIndexAndConstraintCommands(relationId, indexFlags);
tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList);
}
else if (includeIndexes && !includeReplicaIdentity)
{
/*
* Do not include the indexes/constraints that backs
* replica identity, if any.
*/
List *indexAndConstraintCommandList =
GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(relationId,
indexFlags);
tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList);
}
List *replicaIdentityEvents = GetTableReplicaIdentityCommand(relationId);
tableDDLEventList = list_concat(tableDDLEventList, replicaIdentityEvents);
if (includeReplicaIdentity)
{
List *replicaIdentityEvents = GetTableReplicaIdentityCommand(relationId);
tableDDLEventList = list_concat(tableDDLEventList, replicaIdentityEvents);
}
List *triggerCommands = GetExplicitTriggerCommandList(relationId);
tableDDLEventList = list_concat(tableDDLEventList, triggerCommands);
@ -590,7 +617,7 @@ GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes)
* GetTableReplicaIdentityCommand returns the list of DDL commands to
* (re)define the replica identity choice for a given table.
*/
static List *
List *
GetTableReplicaIdentityCommand(Oid relationId)
{
List *replicaIdentityCreateCommandList = NIL;
@ -694,18 +721,82 @@ GetPreLoadTableCreationCommands(Oid relationId, bool includeSequenceDefaults,
* (re)create indexes and constraints for a given table.
*/
List *
GetTableIndexAndConstraintCommands(Oid relationId)
GetTableIndexAndConstraintCommands(Oid relationId, int indexFlags)
{
return ExecuteFunctionOnEachTableIndex(relationId,
GatherIndexAndConstraintDefinitionList);
GatherIndexAndConstraintDefinitionList,
indexFlags);
}
/*
* GetTableIndexAndConstraintCommands returns the list of DDL commands to
* (re)create indexes and constraints for a given table.
*/
List *
GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(Oid relationId, int indexFlags)
{
return ExecuteFunctionOnEachTableIndex(relationId,
GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity,
indexFlags);
}
/*
* GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity is a wrapper around
* GatherIndexAndConstraintDefinitionList(), which only excludes the indexes or
* constraints that back the replica identity.
*/
static void
GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_pg_index indexForm,
List **indexDDLEventList,
int indexFlags)
{
Oid relationId = indexForm->indrelid;
Relation relation = table_open(relationId, AccessShareLock);
Oid replicaIdentityIndex = GetRelationIdentityOrPK(relation);
if (replicaIdentityIndex == indexForm->indexrelid)
{
/* this index is backing the replica identity, so skip */
table_close(relation, NoLock);
return;
}
GatherIndexAndConstraintDefinitionList(indexForm, indexDDLEventList, indexFlags);
table_close(relation, NoLock);
}
/*
* Get replica identity index or if it is not defined a primary key.
*
* If neither is defined, returns InvalidOid.
*
* Inspired from postgres/src/backend/replication/logical/worker.c
*/
Oid
GetRelationIdentityOrPK(Relation rel)
{
Oid idxoid = RelationGetReplicaIndex(rel);
if (!OidIsValid(idxoid))
{
idxoid = RelationGetPrimaryKeyIndex(rel);
}
return idxoid;
}
/*
* GatherIndexAndConstraintDefinitionList adds the DDL command for the given index.
*/
static void
GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLEventList)
void
GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLEventList,
int indexFlags)
{
Oid indexId = indexForm->indexrelid;
char *statementDef = NULL;
@ -726,11 +817,15 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE
}
/* append found constraint or index definition to the list */
*indexDDLEventList = lappend(*indexDDLEventList, makeTableDDLCommandString(
statementDef));
if (indexFlags & INCLUDE_CREATE_INDEX_STATEMENTS)
{
*indexDDLEventList = lappend(*indexDDLEventList, makeTableDDLCommandString(
statementDef));
}
/* if table is clustered on this index, append definition to the list */
if (indexForm->indisclustered)
if ((indexFlags & INCLUDE_INDEX_CLUSTERED_STATEMENTS) &&
indexForm->indisclustered)
{
char *clusteredDef = pg_get_indexclusterdef_string(indexId);
Assert(clusteredDef != NULL);
@ -740,8 +835,12 @@ GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLE
}
/* we need alter index commands for altered targets on expression indexes */
List *alterIndexStatisticsCommands = GetAlterIndexStatisticsCommands(indexId);
*indexDDLEventList = list_concat(*indexDDLEventList, alterIndexStatisticsCommands);
if (indexFlags & INCLUDE_INDEX_STATISTICS_STATEMENTTS)
{
List *alterIndexStatisticsCommands = GetAlterIndexStatisticsCommands(indexId);
*indexDDLEventList = list_concat(*indexDDLEventList,
alterIndexStatisticsCommands);
}
}

View File

@ -63,7 +63,10 @@ static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName
char shardReplicationMode);
static void CopyShardTables(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort);
int32 targetNodePort, bool useLogicalReplication);
static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort);
static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval,
const char *sourceNodeName,
int32 sourceNodePort);
@ -359,8 +362,9 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
* CopyColocatedShardPlacement function copies given shard with its co-located
* shards.
*/
bool useLogicalReplication = false;
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName,
targetNodePort);
targetNodePort, useLogicalReplication);
ShardInterval *colocatedShard = NULL;
foreach_ptr(colocatedShard, colocatedShardList)
@ -741,8 +745,9 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode);
}
bool useLogicalReplication = false;
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort);
targetNodeName, targetNodePort, useLogicalReplication);
/*
* Finally insert the placements to pg_dist_placement and sync it to the
@ -820,32 +825,51 @@ EnsureTableListSuitableForReplication(List *tableIdList)
/*
* CopyColocatedShardPlacement copies a shard along with its co-located shards
* from a source node to target node. It does not make any checks about state
* of the shards. It is caller's responsibility to make those checks if they are
* necessary.
* CopyShardTables copies a shard along with its co-located shards from a source
* node to target node. It does not make any checks about state of the shards.
* It is caller's responsibility to make those checks if they are necessary.
*/
static void
CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort)
char *targetNodeName, int32 targetNodePort, bool useLogicalReplication)
{
ShardInterval *shardInterval = NULL;
if (list_length(shardIntervalList) < 1)
{
return;
}
if (useLogicalReplication)
{
/* only supported in Citus enterprise */
}
else
{
CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort);
}
}
/*
* CopyShardTablesViaBlockWrites copies a shard along with its co-located shards
* from a source node to target node via COPY command. While the command is in
* progress, the modifications on the source node is blocked.
*/
static void
CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort)
{
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CopyShardTables",
"CopyShardTablesViaBlockWrites",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
/* iterate through the colocated shards and copy each */
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
bool includeDataCopy = true;
if (PartitionedTable(shardInterval->relationId))
{
/* partitioned tables contain no data */
includeDataCopy = false;
}
bool includeDataCopy = !PartitionedTable(shardInterval->relationId);
List *ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName,
sourceNodePort, includeDataCopy);
@ -853,11 +877,10 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
MemoryContextReset(localContext);
}
MemoryContextReset(localContext);
/*
* Once all shards are created, we can recreate relationships between shards.
*
@ -868,15 +891,14 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
{
List *shardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL;
char *tableOwner = TableOwner(shardInterval->relationId);
List *commandList = NIL;
CopyShardForeignConstraintCommandListGrouped(shardInterval,
&shardForeignConstraintCommandList,
&referenceTableForeignConstraintList);
List *commandList = list_concat(shardForeignConstraintCommandList,
referenceTableForeignConstraintList);
commandList = list_concat(commandList, shardForeignConstraintCommandList);
commandList = list_concat(commandList, referenceTableForeignConstraintList);
if (PartitionTable(shardInterval->relationId))
{
@ -886,8 +908,10 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
commandList = lappend(commandList, attachPartitionCommand);
}
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
tableOwner, commandList);
MemoryContextReset(localContext);
}
@ -1079,7 +1103,9 @@ CopyShardCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
copyShardDataCommand->data);
}
List *indexCommandList = GetPostLoadTableCreationCommands(relationId, true);
bool includeReplicaIdentity = true;
List *indexCommandList =
GetPostLoadTableCreationCommands(relationId, true, includeReplicaIdentity);
indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList, shardId);
copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList,

View File

@ -94,6 +94,7 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/query_colocation_checker.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/recursive_planning.h"
@ -179,7 +180,8 @@ static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionConte
plannerRestrictionContext,
List *rangeTableList,
int resultRTEIdentity);
static void AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexes);
static void AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexes,
int flags);
static ConversionChoice GetConversionChoice(ConversionCandidates *
conversionCandidates,
PlannerRestrictionContext *
@ -403,7 +405,8 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
FetchEqualityAttrNumsForRTE((Node *) restrictClauseList);
List *uniqueIndexColumnsList = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid,
AppendUniqueIndexColumnsToList);
AppendUniqueIndexColumnsToList,
INCLUDE_INDEX_ALL_STATEMENTS);
IndexColumns *indexColumns = NULL;
foreach_ptr(indexColumns, uniqueIndexColumnsList)
{
@ -442,7 +445,8 @@ FirstIsSuperSetOfSecond(List *firstIntList, List *secondIntList)
* unique index.
*/
static void
AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexGroups)
AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexGroups,
int flags)
{
if (indexForm->indisunique || indexForm->indisprimary)
{

View File

@ -120,7 +120,7 @@ extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand,
ProcessUtilityContext processUtilityContext);
/* index.c */
typedef void (*PGIndexProcessor)(Form_pg_index, List **);
typedef void (*PGIndexProcessor)(Form_pg_index, List **, int);
/* call.c */
@ -277,7 +277,7 @@ extern List * PostprocessIndexStmt(Node *node,
extern void ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement);
extern void MarkIndexValid(IndexStmt *indexStmt);
extern List * ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor
pgIndexProcessor);
pgIndexProcessor, int flags);
/* objectaddress.c - forward declarations */
extern ObjectAddress CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok);

View File

@ -94,6 +94,21 @@ typedef enum TableDDLCommandType
} TableDDLCommandType;
/*
* IndexDefinitionDeparseFlags helps to control which parts of the
* index creation commands are deparsed.
*/
typedef enum IndexDefinitionDeparseFlags
{
INCLUDE_CREATE_INDEX_STATEMENTS = 1 << 0,
INCLUDE_INDEX_CLUSTERED_STATEMENTS = 1 << 1,
INCLUDE_INDEX_STATISTICS_STATEMENTTS = 1 << 2,
INCLUDE_INDEX_ALL_STATEMENTS = INCLUDE_CREATE_INDEX_STATEMENTS |
INCLUDE_INDEX_CLUSTERED_STATEMENTS |
INCLUDE_INDEX_STATISTICS_STATEMENTTS
} IndexDefinitionDeparseFlags;
struct TableDDLCommand;
typedef struct TableDDLCommand TableDDLCommand;
typedef char *(*TableDDLFunction)(void *context);
@ -177,12 +192,20 @@ extern uint64 GetNextShardId(void);
extern uint64 GetNextPlacementId(void);
extern Oid ResolveRelationId(text *relationName, bool missingOk);
extern List * GetFullTableCreationCommands(Oid relationId, bool includeSequenceDefaults);
extern List * GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes);
extern List * GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes,
bool includeReplicaIdentity);
extern List * GetPreLoadTableCreationCommands(Oid relationId,
bool includeSequenceDefaults,
char *accessMethod);
extern List * GetTableIndexAndConstraintCommands(Oid relationId);
extern List * GetTableIndexAndConstraintCommands(Oid relationId, int indexFlags);
extern List * GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(Oid relationId,
int indexFlags);
extern Oid GetRelationIdentityOrPK(Relation rel);
extern void GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm,
List **indexDDLEventList,
int indexFlags);
extern bool IndexImpliedByAConstraint(Form_pg_index indexForm);
extern List * GetTableReplicaIdentityCommand(Oid relationId);
extern char ShardStorageType(Oid relationId);
extern bool DistributedTableReplicationIsEnabled(void);
extern void CheckDistributedTable(Oid relationId);