Enable adding partitions after CreateCitusLocalTable

support-partitioning-for-citus-local-tables
Ahmet Gedemenli 2021-08-18 17:41:54 +03:00
parent c71f26524b
commit 10b6c3beb8
4 changed files with 69 additions and 5 deletions

View File

@ -333,6 +333,10 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys)
FinalizeCitusLocalTableCreation(shellRelationId, dependentSequenceList);
/*
* If it's a partitioned table, we need to create Citus Local Tables from its
* partitions too.
*/
CreateChildLocalTablesIfRelationIsPartitioned(shellRelationId, shardRelationId);
}
@ -346,7 +350,7 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys)
* the partition (child) shard tables successfully gets attached to the partitioned
* (parent) shard table, since there is already a parent/child relationship between them.
* However, the partition (child) shell tables are not attached to the partitioned
* (parent) shell tables. So we need to generate and execute commands for DETACH +
* (parent) shell tables. So we need to generate and execute commands for DETACH +
* ATTACH to establish the correct relationship.
*/
static void
@ -381,7 +385,7 @@ CreateChildLocalTablesIfRelationIsPartitioned(Oid shellRelationId, Oid shardRela
* parent shell table. Here we basically DETACH + ATTACH it and make sure
* that the correct parent/child relationship is established between the
* parent/child shell tables.
*/
*/
char *detachPartitionCommand = GenerateDetachPartitionCommand(
partitionRelationId);
ExecuteAndLogUtilityCommandList(list_make2(detachPartitionCommand,

View File

@ -14,6 +14,7 @@
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/index.h"
#include "catalog/partition.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_class.h"
#include "catalog/pg_constraint.h"
@ -58,6 +59,9 @@ bool EnableLocalReferenceForeignKeys = true;
static void PostprocessCreateTableStmtForeignKeys(CreateStmt *createStatement);
static void PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement,
const char *queryString);
static void FixParentsForCitusLocalTablePartitions(Oid shellRelationId, Oid
shardRelationId,
char *partitionBoundCString);
static bool AlterTableDefinesFKeyBetweenPostgresAndNonDistTable(
AlterTableStmt *alterTableStatement);
static bool RelationIdListContainsCitusTableType(List *relationIdList,
@ -358,11 +362,29 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
{
if (IsCitusTableType(parentRelationId, CITUS_LOCAL_TABLE))
{
/*
* First get the relation name and schema id for the partition, to obtain the
* shell partition id later, since CreateCitusLocalTable will rename it.
*/
char *relationName = get_rel_name(relationId);
Oid relationSchemaId = get_rel_namespace(relationId);
char *partitionBoundCString = PartitionBound(relationId);
/*
* If the parent is a citus local table, we don't need distribution column.
* We can do create a Citus Local Table with current table and early return.
*/
CreateCitusLocalTable(relationId, false);
/*
* We now have it converted to Citus Local Table. But we should also
* make sure that the shell partition is attached to the shell parent,
* and the shard partition is attached to the shard parent.
*/
Oid shellRelationId = get_relname_relid(relationName, relationSchemaId);
Oid shardRelationId = relationId;
FixParentsForCitusLocalTablePartitions(shellRelationId, shardRelationId,
partitionBoundCString);
return;
}
@ -382,7 +404,44 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
/*
* PreprocessAlterTableStmtAttachPartition takes AlterTableStmt object as
* FixParentsForCitusLocalTablePartitions Takes two relation ids, one for the shell
* partition, one for the shard partition. And takes the partition bound, which
* corresponds to FOR VALUES FROM .. TO .. part of the query.
* Generates and executes the necessary ALTER TABLE .. DETACH PARTITION .. and
* ALTER TABLE .. ATTACH PARTITION .. commands to accomplish that.
*/
static void
FixParentsForCitusLocalTablePartitions(Oid shellRelationId, Oid shardRelationId,
char *partitionBoundCString)
{
List *commands = NIL;
if (PartitionTable(shellRelationId))
{
commands = lappend(commands, GenerateDetachPartitionCommand(shellRelationId));
}
if (PartitionTable(shardRelationId))
{
commands = lappend(commands, GenerateDetachPartitionCommand(shardRelationId));
}
Oid shellParentId = get_partition_parent(shardRelationId);
char *shellParentQualifiedName = generate_qualified_relation_name(shellParentId);
char *shellPartitionQualifiedName = generate_qualified_relation_name(shellRelationId);
StringInfo createPartitionCommand = makeStringInfo();
appendStringInfo(createPartitionCommand, "ALTER TABLE %s ATTACH PARTITION %s %s;",
shellParentQualifiedName, shellPartitionQualifiedName,
partitionBoundCString);
commands = lappend(commands, createPartitionCommand->data);
ExecuteAndLogUtilityCommandList(commands);
}
/*
* PostprocessAlterTableStmtAttachPartition takes AlterTableStmt object as
* parameter but it only processes into ALTER TABLE ... ATTACH PARTITION
* commands and distributes the partition if necessary. There are four cases
* to consider;

View File

@ -42,7 +42,7 @@
#include "utils/rel.h"
#include "utils/syscache.h"
static char * PartitionBound(Oid partitionId);
char * PartitionBound(Oid partitionId);
static Relation try_relation_open_nolock(Oid relationId);
static List * CreateFixPartitionConstraintsTaskList(Oid relationId);
static List * WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId,
@ -768,7 +768,7 @@ GenerateAlterTableAttachPartitionToParentCommand(Oid partitionTableId,
* The function simply reads the pg_class and gets the partition bound.
* Later, converts it to text format and returns.
*/
static char *
char *
PartitionBound(Oid partitionId)
{
bool isnull = false;

View File

@ -27,6 +27,7 @@ extern char * GenerateAlterTableAttachPartitionCommand(Oid partitionTableId);
extern char * GenerateAlterTableAttachPartitionToParentCommand(Oid partitionTableId,
char *
parentTableQualifiedName);
extern char * PartitionBound(Oid partitionId);
extern char * GeneratePartitioningInformation(Oid tableId);
extern void FixPartitionConstraintsOnWorkers(Oid relationId);
extern void FixLocalPartitionConstraints(Oid relationId, int64 shardId);