mirror of https://github.com/citusdata/citus.git
Add distributed partitioned table support distributed table creation
With this PR, Citus starts to support all possible ways to create distributed partitioned tables. These are; - Distributing already created partitioning hierarchy - CREATE TABLE ... PARTITION OF a distributed_table - ALTER TABLE distributed_table ATTACH PARTITION non_distributed_table - ALTER TABLE distributed_table ATTACH PARTITION distributed_table We also support DETACHing partitions from partitioned tables and propogating TRUNCATE and DDL commands to distributed partitioned tables. This PR also refactors some parts of distributed table creation logic.pull/1509/head
parent
a650aaa631
commit
fddf9b3fcc
|
@ -38,6 +38,7 @@
|
|||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_logical_planner.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_utility.h"
|
||||
#include "distributed/pg_dist_colocation.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
|
@ -262,7 +263,6 @@ create_reference_table(PG_FUNCTION_ARGS)
|
|||
errdetail("There are no active worker nodes.")));
|
||||
}
|
||||
|
||||
|
||||
CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE,
|
||||
colocateWithTableName, viaDeprecatedAPI);
|
||||
|
||||
|
@ -277,7 +277,8 @@ create_reference_table(PG_FUNCTION_ARGS)
|
|||
* This functions contains all necessary logic to create distributed tables. It
|
||||
* perform necessary checks to ensure distributing the table is safe. If it is
|
||||
* safe to distribute the table, this function creates distributed table metadata,
|
||||
* creates shards and copies local data to shards.
|
||||
* creates shards and copies local data to shards. This function also handles
|
||||
* partitioned tables by distributing its partitions as well.
|
||||
*
|
||||
* viaDeprecatedAPI boolean flag is not optimal way to implement this function,
|
||||
* but it helps reducing code duplication a lot. We hope to remove that flag one
|
||||
|
@ -355,6 +356,21 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
|||
CreateReferenceTableShard(relationId);
|
||||
}
|
||||
|
||||
/* if this table is partitioned table, distribute its partitions too */
|
||||
if (PartitionedTable(relationId))
|
||||
{
|
||||
List *partitionList = PartitionList(relationId);
|
||||
ListCell *partitionCell = NULL;
|
||||
|
||||
foreach(partitionCell, partitionList)
|
||||
{
|
||||
Oid partitionRelationId = lfirst_oid(partitionCell);
|
||||
CreateDistributedTable(partitionRelationId, distributionColumn,
|
||||
distributionMethod, colocateWithTableName,
|
||||
viaDeprecatedAPI);
|
||||
}
|
||||
}
|
||||
|
||||
/* copy over data for hash distributed and reference tables */
|
||||
if (distributionMethod == DISTRIBUTE_BY_HASH ||
|
||||
distributionMethod == DISTRIBUTE_BY_NONE)
|
||||
|
@ -566,6 +582,7 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
|
|||
Relation relation = NULL;
|
||||
TupleDesc relationDesc = NULL;
|
||||
char *relationName = NULL;
|
||||
Oid parentRelationId = InvalidOid;
|
||||
|
||||
EnsureTableOwner(relationId);
|
||||
EnsureTableNotDistributed(relationId);
|
||||
|
@ -626,6 +643,81 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
|
|||
}
|
||||
}
|
||||
|
||||
if (PartitionTable(relationId))
|
||||
{
|
||||
parentRelationId = PartitionParentOid(relationId);
|
||||
}
|
||||
|
||||
/* partitions cannot be distributed if their parent is not distributed */
|
||||
if (PartitionTable(relationId) && !IsDistributedTable(parentRelationId))
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
char *parentRelationName = get_rel_name(parentRelationId);
|
||||
|
||||
ereport(ERROR, (errmsg("cannot distribute relation \"%s\" which is partition of "
|
||||
"\"%s\"", relationName, parentRelationName),
|
||||
errdetail("Citus does not support distributing partitions "
|
||||
"if their parent is not distributed table."),
|
||||
errhint("Distribute the partitioned table \"%s\" instead.",
|
||||
parentRelationName)));
|
||||
}
|
||||
|
||||
/*
|
||||
* These checks are mostly for partitioned tables not partitions because we prevent
|
||||
* distributing partitions directly in the above check. However, partitions can still
|
||||
* reach this point because, we call CreateDistributedTable for partitions if their
|
||||
* parent table is distributed.
|
||||
*/
|
||||
if (PartitionedTable(relationId))
|
||||
{
|
||||
/* we cannot distribute partitioned tables with master_create_distributed_table */
|
||||
if (viaDeprecatedAPI)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("distributing partitioned tables in only supported "
|
||||
"with create_distributed_table UDF")));
|
||||
}
|
||||
|
||||
/* distributing partitioned tables in only supported for hash-distribution */
|
||||
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("distributing partitioned tables in only supported "
|
||||
"for hash-distributed tables")));
|
||||
}
|
||||
|
||||
/* we currently don't support partitioned tables for replication factor > 1 */
|
||||
if (ShardReplicationFactor > 1)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("distributing partitioned tables with replication "
|
||||
"factor greater than 1 is not supported")));
|
||||
}
|
||||
|
||||
/* we currently don't support MX tables to be distributed partitioned table */
|
||||
if (replicationModel == REPLICATION_MODEL_STREAMING)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("distributing partitioned tables which uses "
|
||||
"streaming replication is not supported")));
|
||||
}
|
||||
|
||||
/* we don't support distributing tables with multi-level partitioning */
|
||||
if (PartitionTable(relationId))
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
Oid parentRelationId = PartitionParentOid(relationId);
|
||||
char *parentRelationName = get_rel_name(parentRelationId);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("distributing multi-level partitioned tables "
|
||||
"is not supported"),
|
||||
errdetail("Relation \"%s\" is partitioned table itself and "
|
||||
"it is also partition of relation \"%s\".",
|
||||
relationName, parentRelationName)));
|
||||
}
|
||||
}
|
||||
|
||||
ErrorIfUnsupportedConstraint(relation, distributionMethod, distributionColumn,
|
||||
colocationId);
|
||||
|
||||
|
@ -1015,7 +1107,9 @@ RegularTable(Oid relationId)
|
|||
/*
|
||||
* CopyLocalDataIntoShards copies data from the local table, which is hidden
|
||||
* after converting it to a distributed table, into the shards of the distributed
|
||||
* table.
|
||||
* table. For partitioned tables, this functions returns without copying the data
|
||||
* because we call this function for both partitioned tables and its partitions.
|
||||
* Returning early saves us from copying data to workers twice.
|
||||
*
|
||||
* This function uses CitusCopyDestReceiver to invoke the distributed COPY logic.
|
||||
* We cannot use a regular COPY here since that cannot read from a table. Instead
|
||||
|
@ -1057,6 +1151,17 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
|
|||
/* take an ExclusiveLock to block all operations except SELECT */
|
||||
distributedRelation = heap_open(distributedRelationId, ExclusiveLock);
|
||||
|
||||
/*
|
||||
* Skip copying from partitioned tables, we will copy the data from
|
||||
* partition to partition's shards.
|
||||
*/
|
||||
if (PartitionedTable(distributedRelationId))
|
||||
{
|
||||
heap_close(distributedRelation, NoLock);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* All writes have finished, make sure that we can see them by using the
|
||||
* latest snapshot. We use GetLatestSnapshot instead of
|
||||
|
|
|
@ -42,6 +42,7 @@
|
|||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_planner.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/multi_router_planner.h"
|
||||
|
@ -108,6 +109,8 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
|||
/* Local functions forward declarations for processing distributed table commands */
|
||||
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
||||
bool *commandMustRunAsOwner);
|
||||
static void ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement);
|
||||
static void ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement);
|
||||
static List * PlanIndexStmt(IndexStmt *createIndexStatement,
|
||||
const char *createIndexCommand);
|
||||
static List * PlanDropIndexStmt(DropStmt *dropIndexStatement,
|
||||
|
@ -154,8 +157,8 @@ static void ShowNoticeIfNotUsing2PC(void);
|
|||
static List * DDLTaskList(Oid relationId, const char *commandString);
|
||||
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
|
||||
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
|
||||
static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
|
||||
const char *commandString);
|
||||
static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
|
||||
const char *commandString);
|
||||
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
|
||||
void *arg);
|
||||
static void CheckCopyPermissions(CopyStmt *copyStatement);
|
||||
|
@ -467,6 +470,29 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
params, dest, completionTag);
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* We only process CREATE TABLE ... PARTITION OF commands in the function below
|
||||
* to handle the case when user creates a table as a partition of distributed table.
|
||||
*/
|
||||
if (IsA(parsetree, CreateStmt))
|
||||
{
|
||||
CreateStmt *createStatement = (CreateStmt *) parsetree;
|
||||
|
||||
ProcessCreateTableStmtPartitionOf(createStatement);
|
||||
}
|
||||
|
||||
/*
|
||||
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
|
||||
* and distribute the partition if necessary.
|
||||
*/
|
||||
if (IsA(parsetree, AlterTableStmt))
|
||||
{
|
||||
AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree;
|
||||
|
||||
ProcessAlterTableStmtAttachPartition(alterTableStatement);
|
||||
}
|
||||
|
||||
/* don't run post-process code for local commands */
|
||||
if (ddlJobs != NIL)
|
||||
{
|
||||
|
@ -776,6 +802,135 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ProcessCreateTableStmtPartitionOf takes CreateStmt object as a parameter but
|
||||
* it only processes CREATE TABLE ... PARTITION OF statements and it checks if
|
||||
* user creates the table as a partition of a distributed table. In that case,
|
||||
* it distributes partition as well. Since the table itself is a partition,
|
||||
* CreateDistributedTable will attach it to its parent table automatically after
|
||||
* distributing it.
|
||||
*
|
||||
* This function does nothing if PostgreSQL's version is less then 10 and given
|
||||
* CreateStmt is not a CREATE TABLE ... PARTITION OF command.
|
||||
*/
|
||||
static void
|
||||
ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement)
|
||||
{
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
if (createStatement->inhRelations != NIL && createStatement->partbound != NULL)
|
||||
{
|
||||
RangeVar *parentRelation = linitial(createStatement->inhRelations);
|
||||
bool parentMissingOk = false;
|
||||
Oid parentRelationId = RangeVarGetRelid(parentRelation, NoLock,
|
||||
parentMissingOk);
|
||||
|
||||
/* a partition can only inherit from single parent table */
|
||||
Assert(list_length(createStatement->inhRelations) == 1);
|
||||
|
||||
Assert(parentRelationId != InvalidOid);
|
||||
|
||||
/*
|
||||
* If a partition is being created and if its parent is a distributed
|
||||
* table, we will distribute this table as well.
|
||||
*/
|
||||
if (IsDistributedTable(parentRelationId))
|
||||
{
|
||||
bool missingOk = false;
|
||||
Oid relationId = RangeVarGetRelid(createStatement->relation, NoLock,
|
||||
missingOk);
|
||||
Var *parentDistributionColumn = DistPartitionKey(parentRelationId);
|
||||
char parentDistributionMethod = DISTRIBUTE_BY_HASH;
|
||||
char *parentRelationName = get_rel_name(parentRelationId);
|
||||
bool viaDeprecatedAPI = false;
|
||||
|
||||
CreateDistributedTable(relationId, parentDistributionColumn,
|
||||
parentDistributionMethod, parentRelationName,
|
||||
viaDeprecatedAPI);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ProcessAlterTableStmtAttachPartition takes AlterTableStmt object as parameter
|
||||
* but it only processes into ALTER TABLE ... ATTACH PARTITION commands and
|
||||
* distributes the partition if necessary. There are four cases to consider;
|
||||
*
|
||||
* Parent is not distributed, partition is not distributed: We do not need to
|
||||
* do anything in this case.
|
||||
*
|
||||
* Parent is not distributed, partition is distributed: This can happen if
|
||||
* user first distributes a table and tries to attach it to a non-distributed
|
||||
* table. Non-distributed tables cannot have distributed partitions, thus we
|
||||
* simply error out in this case.
|
||||
*
|
||||
* Parent is distributed, partition is not distributed: We should distribute
|
||||
* the table and attach it to its parent in workers. CreateDistributedTable
|
||||
* perform both of these operations. Thus, we will not propagate ALTER TABLE
|
||||
* ... ATTACH PARTITION command to workers.
|
||||
*
|
||||
* Parent is distributed, partition is distributed: Partition is already
|
||||
* distributed, we only need to attach it to its parent in workers. Attaching
|
||||
* operation will be performed via propagating this ALTER TABLE ... ATTACH
|
||||
* PARTITION command to workers.
|
||||
*
|
||||
* This function does nothing if PostgreSQL's version is less then 10 and given
|
||||
* CreateStmt is not a ALTER TABLE ... ATTACH PARTITION OF command.
|
||||
*/
|
||||
static void
|
||||
ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement)
|
||||
{
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
List *commandList = alterTableStatement->cmds;
|
||||
ListCell *commandCell = NULL;
|
||||
|
||||
foreach(commandCell, commandList)
|
||||
{
|
||||
AlterTableCmd *alterTableCommand = (AlterTableCmd *) lfirst(commandCell);
|
||||
|
||||
if (alterTableCommand->subtype == AT_AttachPartition)
|
||||
{
|
||||
Oid relationId = AlterTableLookupRelation(alterTableStatement, NoLock);
|
||||
PartitionCmd *partitionCommand = (PartitionCmd *) alterTableCommand->def;
|
||||
bool partitionMissingOk = false;
|
||||
Oid partitionRelationId = RangeVarGetRelid(partitionCommand->name, NoLock,
|
||||
partitionMissingOk);
|
||||
|
||||
/*
|
||||
* If user first distributes the table then tries to attach it to non
|
||||
* distributed table, we error out.
|
||||
*/
|
||||
if (!IsDistributedTable(relationId) &&
|
||||
IsDistributedTable(partitionRelationId))
|
||||
{
|
||||
char *parentRelationName = get_rel_name(partitionRelationId);
|
||||
|
||||
ereport(ERROR, (errmsg("non-distributed tables cannot have "
|
||||
"distributed partitions"),
|
||||
errhint("Distribute the partitioned table \"%s\" "
|
||||
"instead", parentRelationName)));
|
||||
}
|
||||
|
||||
/* if parent of this table is distributed, distribute this table too */
|
||||
if (IsDistributedTable(relationId) &&
|
||||
!IsDistributedTable(partitionRelationId))
|
||||
{
|
||||
Var *distributionColumn = DistPartitionKey(relationId);
|
||||
char distributionMethod = DISTRIBUTE_BY_HASH;
|
||||
char *relationName = get_rel_name(relationId);
|
||||
bool viaDeprecatedAPI = false;
|
||||
|
||||
CreateDistributedTable(partitionRelationId, distributionColumn,
|
||||
distributionMethod, relationName,
|
||||
viaDeprecatedAPI);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PlanIndexStmt determines whether a given CREATE INDEX statement involves
|
||||
* a distributed table. If so (and if the statement does not use unsupported
|
||||
|
@ -1041,6 +1196,42 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
|
|||
constraint->skip_validation = true;
|
||||
}
|
||||
}
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
else if (alterTableType == AT_AttachPartition)
|
||||
{
|
||||
PartitionCmd *partitionCommand = (PartitionCmd *) command->def;
|
||||
|
||||
/*
|
||||
* We only support ALTER TABLE ATTACH PARTITION, if it is only subcommand of
|
||||
* ALTER TABLE. It was already checked in ErrorIfUnsupportedAlterTableStmt.
|
||||
*/
|
||||
Assert(list_length(commandList) <= 1);
|
||||
|
||||
rightRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, false);
|
||||
|
||||
/*
|
||||
* Do not generate tasks if relation is distributed and the partition
|
||||
* is not distributed. Because, we'll manually convert the partition into
|
||||
* distributed table and co-locate with its parent.
|
||||
*/
|
||||
if (!IsDistributedTable(rightRelationId))
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
}
|
||||
else if (alterTableType == AT_DetachPartition)
|
||||
{
|
||||
PartitionCmd *partitionCommand = (PartitionCmd *) command->def;
|
||||
|
||||
/*
|
||||
* We only support ALTER TABLE DETACH PARTITION, if it is only subcommand of
|
||||
* ALTER TABLE. It was already checked in ErrorIfUnsupportedAlterTableStmt.
|
||||
*/
|
||||
Assert(list_length(commandList) <= 1);
|
||||
|
||||
rightRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, false);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
@ -1051,8 +1242,8 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
|
|||
if (rightRelationId)
|
||||
{
|
||||
/* if foreign key related, use specialized task list function ... */
|
||||
ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId,
|
||||
alterTableCommand);
|
||||
ddlJob->taskList = InterShardDDLTaskList(leftRelationId, rightRelationId,
|
||||
alterTableCommand);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -1759,6 +1950,54 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
break;
|
||||
}
|
||||
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
case AT_AttachPartition:
|
||||
{
|
||||
Oid relationId = AlterTableLookupRelation(alterTableStatement,
|
||||
NoLock);
|
||||
PartitionCmd *partitionCommand = (PartitionCmd *) command->def;
|
||||
bool missingOK = false;
|
||||
Oid partitionRelationId = RangeVarGetRelid(partitionCommand->name,
|
||||
NoLock, missingOK);
|
||||
|
||||
/* we only allow partitioning commands if they are only subcommand */
|
||||
if (commandList->length > 1)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot execute ATTACH PARTITION command "
|
||||
"with other subcommands"),
|
||||
errhint("You can issue each subcommand "
|
||||
"separately.")));
|
||||
}
|
||||
|
||||
if (IsDistributedTable(partitionRelationId) &&
|
||||
!TablesColocated(relationId, partitionRelationId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("distributed tables cannot have "
|
||||
"non-colocated distributed tables as a "
|
||||
"partition ")));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case AT_DetachPartition:
|
||||
{
|
||||
/* we only allow partitioning commands if they are only subcommand */
|
||||
if (commandList->length > 1)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot execute DETACH PARTITION command "
|
||||
"with other subcommands"),
|
||||
errhint("You can issue each subcommand "
|
||||
"separately.")));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
#endif
|
||||
case AT_SetNotNull:
|
||||
case AT_DropConstraint:
|
||||
case AT_EnableTrigAll:
|
||||
|
@ -1776,9 +2015,10 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("alter table command is currently unsupported"),
|
||||
errdetail("Only ADD|DROP COLUMN, SET|DROP NOT NULL,"
|
||||
" SET|DROP DEFAULT, ADD|DROP CONSTRAINT and "
|
||||
"TYPE subcommands are supported.")));
|
||||
errdetail("Only ADD|DROP COLUMN, SET|DROP NOT NULL, "
|
||||
"SET|DROP DEFAULT, ADD|DROP CONSTRAINT, "
|
||||
"ATTACH|DETACH PARTITION and TYPE subcommands "
|
||||
"are supported.")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2786,16 +3026,17 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
|
|||
|
||||
|
||||
/*
|
||||
* ForeignKeyTaskList builds a list of tasks to execute a foreign key command on a
|
||||
* shards of given list of distributed table.
|
||||
* InterShardDDLTaskList builds a list of tasks to execute a inter shard DDL command on a
|
||||
* shards of given list of distributed table. At the moment this function is used to run
|
||||
* foreign key and partitioning command on worker node.
|
||||
*
|
||||
* leftRelationId is the relation id of actual distributed table which given foreign key
|
||||
* command is applied. rightRelationId is the relation id of distributed table which
|
||||
* foreign key refers to.
|
||||
* leftRelationId is the relation id of actual distributed table which given command is
|
||||
* applied. rightRelationId is the relation id of distributed table which given command
|
||||
* refers to.
|
||||
*/
|
||||
static List *
|
||||
ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
|
||||
const char *commandString)
|
||||
InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
|
||||
const char *commandString)
|
||||
{
|
||||
List *taskList = NIL;
|
||||
|
||||
|
|
|
@ -194,7 +194,8 @@ DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardInterval
|
|||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("expire target is not a regular or foreign table")));
|
||||
errmsg("expire target is not a regular, foreign or partitioned "
|
||||
"table")));
|
||||
}
|
||||
|
||||
connection = GetNodeConnection(connectionFlag, workerNode->workerName,
|
||||
|
|
|
@ -402,7 +402,8 @@ RecreateTableDDLCommandList(Oid relationId)
|
|||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("repair target is not a regular or foreign table")));
|
||||
errmsg("repair target is not a regular, foreign or partitioned "
|
||||
"table")));
|
||||
}
|
||||
|
||||
dropCommandList = list_make1(dropCommand->data);
|
||||
|
|
|
@ -24,6 +24,9 @@
|
|||
#include "commands/tablecmds.h"
|
||||
#include "catalog/indexing.h"
|
||||
#include "catalog/namespace.h"
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
#include "catalog/partition.h"
|
||||
#endif
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
|
@ -31,6 +34,7 @@
|
|||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/pg_dist_shard.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
|
@ -368,6 +372,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
|
|||
int placementsCreated = 0;
|
||||
int attemptNumber = 0;
|
||||
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(relationId);
|
||||
char *alterTableAttachPartitionCommand = NULL;
|
||||
bool includeSequenceDefaults = false;
|
||||
List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults);
|
||||
uint32 connectionFlag = FOR_DDL;
|
||||
|
@ -402,7 +407,8 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
|
|||
}
|
||||
|
||||
WorkerCreateShard(relationId, shardIndex, shardId, ddlCommandList,
|
||||
foreignConstraintCommandList, connection);
|
||||
foreignConstraintCommandList, alterTableAttachPartitionCommand,
|
||||
connection);
|
||||
|
||||
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize,
|
||||
nodeGroupId);
|
||||
|
@ -483,12 +489,20 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
|
|||
ListCell *connectionCell = NULL;
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
int connectionFlags = FOR_DDL;
|
||||
char *alterTableAttachPartitionCommand = NULL;
|
||||
|
||||
if (useExclusiveConnection)
|
||||
{
|
||||
connectionFlags |= CONNECTION_PER_PLACEMENT;
|
||||
}
|
||||
|
||||
|
||||
if (PartitionTable(distributedRelationId))
|
||||
{
|
||||
alterTableAttachPartitionCommand =
|
||||
GenerateAlterTableAttachPartitionCommand(distributedRelationId);
|
||||
}
|
||||
|
||||
BeginOrContinueCoordinatedTransaction();
|
||||
|
||||
foreach(shardPlacementCell, shardPlacements)
|
||||
|
@ -517,7 +531,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
|
|||
|
||||
WorkerCreateShard(distributedRelationId, shardIndex, shardId,
|
||||
ddlCommandList, foreignConstraintCommandList,
|
||||
connection);
|
||||
alterTableAttachPartitionCommand, connection);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -540,7 +554,8 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
|
|||
*/
|
||||
void
|
||||
WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList,
|
||||
List *foreignConstraintCommandList, MultiConnection *connection)
|
||||
List *foreignConstraintCommandList,
|
||||
char *alterTableAttachPartitionCommand, MultiConnection *connection)
|
||||
{
|
||||
Oid schemaId = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
|
@ -618,6 +633,40 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
|
|||
|
||||
ExecuteCriticalRemoteCommand(connection, applyForeignConstraintCommand->data);
|
||||
}
|
||||
|
||||
/*
|
||||
* If the shard is created for a partition, send the command to create the
|
||||
* partitioning hierarcy on the shard.
|
||||
*/
|
||||
if (alterTableAttachPartitionCommand != NULL)
|
||||
{
|
||||
Oid parentRelationId = PartitionParentOid(relationId);
|
||||
uint64 correspondingParentShardId = InvalidOid;
|
||||
StringInfo applyAttachPartitionCommand = makeStringInfo();
|
||||
|
||||
Oid parentSchemaId = InvalidOid;
|
||||
char *parentSchemaName = NULL;
|
||||
char *escapedParentSchemaName = NULL;
|
||||
char *escapedCommand = NULL;
|
||||
|
||||
Assert(PartitionTable(relationId));
|
||||
|
||||
parentSchemaId = get_rel_namespace(parentRelationId);
|
||||
parentSchemaName = get_namespace_name(parentSchemaId);
|
||||
escapedParentSchemaName = quote_literal_cstr(parentSchemaName);
|
||||
escapedCommand = quote_literal_cstr(alterTableAttachPartitionCommand);
|
||||
|
||||
correspondingParentShardId = ColocatedShardIdInRelation(parentRelationId,
|
||||
shardIndex);
|
||||
|
||||
appendStringInfo(applyAttachPartitionCommand,
|
||||
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, correspondingParentShardId,
|
||||
escapedParentSchemaName, shardId, escapedSchemaName,
|
||||
escapedCommand);
|
||||
|
||||
|
||||
ExecuteCriticalRemoteCommand(connection, applyAttachPartitionCommand->data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -451,8 +451,8 @@ RelayEventExtendNamesForInterShardCommands(Node *parseTree, uint64 leftShardId,
|
|||
}
|
||||
}
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
else if (command->subtype == AT_AttachPartition || command->subtype ==
|
||||
AT_DetachPartition)
|
||||
else if (command->subtype == AT_AttachPartition ||
|
||||
command->subtype == AT_DetachPartition)
|
||||
{
|
||||
PartitionCmd *partitionCommand = (PartitionCmd *) command->def;
|
||||
|
||||
|
@ -469,9 +469,8 @@ RelayEventExtendNamesForInterShardCommands(Node *parseTree, uint64 leftShardId,
|
|||
SetSchemaNameIfNotExist(relationSchemaName, rightShardSchemaName);
|
||||
|
||||
/*
|
||||
* We will not append shard id to referencing table name or
|
||||
* constraint name. They will be handled when we drop into
|
||||
* RelayEventExtendNames.
|
||||
* We will not append shard id to left shard name. This will be
|
||||
* handled when we drop into RelayEventExtendNames.
|
||||
*/
|
||||
AppendShardIdToName(referencedTableName, rightShardId);
|
||||
}
|
||||
|
@ -622,6 +621,7 @@ AppendShardIdToName(char **name, uint64 shardId)
|
|||
{
|
||||
snprintf(extendedName, NAMEDATALEN, "%s%s", (*name), shardIdAndSeparator);
|
||||
}
|
||||
|
||||
/*
|
||||
* Otherwise, we need to truncate the name further to accommodate
|
||||
* a sufficient hash value. The resulting name will avoid collision
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include "catalog/pg_inherits.h"
|
||||
#include "catalog/pg_inherits_fn.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include <distributed/multi_partitioning_utils.h>
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "utils/builtins.h"
|
||||
|
@ -159,6 +159,26 @@ IsParentTable(Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* Wrapper around get_partition_parent
|
||||
*
|
||||
* Note: Because this function assumes that the relation whose OID is passed
|
||||
* as an argument will have precisely one parent, it should only be called
|
||||
* when it is known that the relation is a partition.
|
||||
*/
|
||||
Oid
|
||||
PartitionParentOid(Oid partitionOid)
|
||||
{
|
||||
Oid partitionParentOid = InvalidOid;
|
||||
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
partitionParentOid = get_partition_parent(partitionOid);
|
||||
#endif
|
||||
|
||||
return partitionParentOid;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Takes a parent relation and returns Oid list of its partitions. The
|
||||
* function errors out if the given relation is not a parent.
|
||||
|
|
|
@ -125,6 +125,7 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
|
|||
extern void CreateReferenceTableShard(Oid distributedTableId);
|
||||
extern void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId,
|
||||
List *ddlCommandList, List *foreignConstraintCommandList,
|
||||
char *alterTableAttachPartitionCommand,
|
||||
MultiConnection *connection);
|
||||
extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
|
||||
extern void CheckHashPartitionedTable(Oid distributedTableId);
|
||||
|
|
|
@ -15,6 +15,7 @@ extern bool PartitionedTable(Oid relationId);
|
|||
extern bool PartitionTable(Oid relationId);
|
||||
extern bool IsChildTable(Oid relationId);
|
||||
extern bool IsParentTable(Oid relationId);
|
||||
extern Oid PartitionParentOid(Oid partitionOid);
|
||||
extern List * PartitionList(Oid parentRelationId);
|
||||
extern char * GenerateDetachPartitionCommand(Oid partitionTableId);
|
||||
extern char * GenerateAlterTableAttachPartitionCommand(Oid partitionTableId);
|
||||
|
|
|
@ -1393,7 +1393,7 @@ ALTER TABLE reference_table_ddl RENAME TO reference_table_ddl_test;
|
|||
ERROR: renaming distributed tables is currently unsupported
|
||||
ALTER TABLE reference_table_ddl SET WITH OIDS;
|
||||
ERROR: alter table command is currently unsupported
|
||||
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported.
|
||||
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
|
||||
-- now test reference tables against some helper UDFs that Citus provides
|
||||
-- cannot delete / drop shards from a reference table
|
||||
SELECT master_apply_delete_command('DELETE FROM reference_table_ddl');
|
||||
|
|
|
@ -332,7 +332,7 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.lineite
|
|||
ALTER TABLE lineitem_alter ADD COLUMN int_column3 INTEGER,
|
||||
ALTER COLUMN int_column1 SET STATISTICS 10;
|
||||
ERROR: alter table command is currently unsupported
|
||||
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported.
|
||||
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
|
||||
ALTER TABLE lineitem_alter DROP COLUMN int_column1, DROP COLUMN int_column2;
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.lineitem_alter'::regclass;
|
||||
Column | Type | Modifiers
|
||||
|
@ -364,12 +364,12 @@ ERROR: cannot execute ALTER TABLE command involving partition column
|
|||
-- Verify that we error out on unsupported statement types
|
||||
ALTER TABLE lineitem_alter ALTER COLUMN l_orderkey SET STATISTICS 100;
|
||||
ERROR: alter table command is currently unsupported
|
||||
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported.
|
||||
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
|
||||
ALTER TABLE lineitem_alter DROP CONSTRAINT IF EXISTS non_existent_contraint;
|
||||
NOTICE: constraint "non_existent_contraint" of relation "lineitem_alter" does not exist, skipping
|
||||
ALTER TABLE lineitem_alter SET WITHOUT OIDS;
|
||||
ERROR: alter table command is currently unsupported
|
||||
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported.
|
||||
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
|
||||
-- Verify that we error out in case of postgres errors on supported statement
|
||||
-- types
|
||||
ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type;
|
||||
|
|
Loading…
Reference in New Issue