Merge pull request #1509 from citusdata/create_distributed_partitions

Add support for distributed partitioned tables
pull/1542/head
Burak Yücesoy 2017-08-09 14:27:11 +03:00 committed by GitHub
commit 01d8926228
20 changed files with 3099 additions and 72 deletions

View File

@ -38,6 +38,7 @@
#include "distributed/metadata_sync.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_utility.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
@ -262,7 +263,6 @@ create_reference_table(PG_FUNCTION_ARGS)
errdetail("There are no active worker nodes.")));
}
CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE,
colocateWithTableName, viaDeprecatedAPI);
@ -277,7 +277,8 @@ create_reference_table(PG_FUNCTION_ARGS)
* This functions contains all necessary logic to create distributed tables. It
* perform necessary checks to ensure distributing the table is safe. If it is
* safe to distribute the table, this function creates distributed table metadata,
* creates shards and copies local data to shards.
* creates shards and copies local data to shards. This function also handles
* partitioned tables by distributing its partitions as well.
*
* viaDeprecatedAPI boolean flag is not optimal way to implement this function,
* but it helps reducing code duplication a lot. We hope to remove that flag one
@ -355,6 +356,21 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
CreateReferenceTableShard(relationId);
}
/* if this table is partitioned table, distribute its partitions too */
if (PartitionedTable(relationId))
{
List *partitionList = PartitionList(relationId);
ListCell *partitionCell = NULL;
foreach(partitionCell, partitionList)
{
Oid partitionRelationId = lfirst_oid(partitionCell);
CreateDistributedTable(partitionRelationId, distributionColumn,
distributionMethod, colocateWithTableName,
viaDeprecatedAPI);
}
}
/* copy over data for hash distributed and reference tables */
if (distributionMethod == DISTRIBUTE_BY_HASH ||
distributionMethod == DISTRIBUTE_BY_NONE)
@ -566,6 +582,7 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
Relation relation = NULL;
TupleDesc relationDesc = NULL;
char *relationName = NULL;
Oid parentRelationId = InvalidOid;
EnsureTableOwner(relationId);
EnsureTableNotDistributed(relationId);
@ -626,6 +643,81 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
}
}
if (PartitionTable(relationId))
{
parentRelationId = PartitionParentOid(relationId);
}
/* partitions cannot be distributed if their parent is not distributed */
if (PartitionTable(relationId) && !IsDistributedTable(parentRelationId))
{
char *relationName = get_rel_name(relationId);
char *parentRelationName = get_rel_name(parentRelationId);
ereport(ERROR, (errmsg("cannot distribute relation \"%s\" which is partition of "
"\"%s\"", relationName, parentRelationName),
errdetail("Citus does not support distributing partitions "
"if their parent is not distributed table."),
errhint("Distribute the partitioned table \"%s\" instead.",
parentRelationName)));
}
/*
* These checks are mostly for partitioned tables not partitions because we prevent
* distributing partitions directly in the above check. However, partitions can still
* reach this point because, we call CreateDistributedTable for partitions if their
* parent table is distributed.
*/
if (PartitionedTable(relationId))
{
/* we cannot distribute partitioned tables with master_create_distributed_table */
if (viaDeprecatedAPI)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables in only supported "
"with create_distributed_table UDF")));
}
/* distributing partitioned tables in only supported for hash-distribution */
if (distributionMethod != DISTRIBUTE_BY_HASH)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables in only supported "
"for hash-distributed tables")));
}
/* we currently don't support partitioned tables for replication factor > 1 */
if (ShardReplicationFactor > 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables with replication "
"factor greater than 1 is not supported")));
}
/* we currently don't support MX tables to be distributed partitioned table */
if (replicationModel == REPLICATION_MODEL_STREAMING)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables which uses "
"streaming replication is not supported")));
}
/* we don't support distributing tables with multi-level partitioning */
if (PartitionTable(relationId))
{
char *relationName = get_rel_name(relationId);
Oid parentRelationId = PartitionParentOid(relationId);
char *parentRelationName = get_rel_name(parentRelationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing multi-level partitioned tables "
"is not supported"),
errdetail("Relation \"%s\" is partitioned table itself and "
"it is also partition of relation \"%s\".",
relationName, parentRelationName)));
}
}
ErrorIfUnsupportedConstraint(relation, distributionMethod, distributionColumn,
colocationId);
@ -1015,7 +1107,9 @@ RegularTable(Oid relationId)
/*
* CopyLocalDataIntoShards copies data from the local table, which is hidden
* after converting it to a distributed table, into the shards of the distributed
* table.
* table. For partitioned tables, this functions returns without copying the data
* because we call this function for both partitioned tables and its partitions.
* Returning early saves us from copying data to workers twice.
*
* This function uses CitusCopyDestReceiver to invoke the distributed COPY logic.
* We cannot use a regular COPY here since that cannot read from a table. Instead
@ -1057,6 +1151,17 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
/* take an ExclusiveLock to block all operations except SELECT */
distributedRelation = heap_open(distributedRelationId, ExclusiveLock);
/*
* Skip copying from partitioned tables, we will copy the data from
* partition to partition's shards.
*/
if (PartitionedTable(distributedRelationId))
{
heap_close(distributedRelation, NoLock);
return;
}
/*
* All writes have finished, make sure that we can see them by using the
* latest snapshot. We use GetLatestSnapshot instead of

View File

@ -62,6 +62,7 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/placement_connection.h"
@ -74,6 +75,7 @@
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/memutils.h"
@ -288,6 +290,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
DestReceiver *dest = NULL;
Relation distributedRelation = NULL;
Relation copiedDistributedRelation = NULL;
Form_pg_class copiedDistributedRelationTuple = NULL;
TupleDesc tupleDescriptor = NULL;
uint32 columnCount = 0;
Datum *columnValues = NULL;
@ -350,17 +354,52 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
dest = (DestReceiver *) copyDest;
dest->rStartup(dest, 0, tupleDescriptor);
/*
* BeginCopyFrom opens all partitions of given partitioned table with relation_open
* and it expects its caller to close those relations. We do not have direct access
* to opened relations, thus we are changing relkind of partitioned tables so that
* Postgres will treat those tables as regular relations and will not open its
* partitions.
*
* We will make this change on copied version of distributed relation to not change
* anything in relcache.
*/
if (PartitionedTable(tableId))
{
copiedDistributedRelation = (Relation) palloc0(sizeof(RelationData));
copiedDistributedRelationTuple = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
/*
* There is no need to deep copy everything. We will just deep copy of the fields
* we will change.
*/
memcpy(copiedDistributedRelation, distributedRelation, sizeof(RelationData));
memcpy(copiedDistributedRelationTuple, distributedRelation->rd_rel,
CLASS_TUPLE_SIZE);
copiedDistributedRelationTuple->relkind = RELKIND_RELATION;
copiedDistributedRelation->rd_rel = copiedDistributedRelationTuple;
}
else
{
/*
* If we are not dealing with partitioned table, copiedDistributedRelation is same
* as distributedRelation.
*/
copiedDistributedRelation = distributedRelation;
}
/* initialize copy state to read from COPY data source */
#if (PG_VERSION_NUM >= 100000)
copyState = BeginCopyFrom(NULL,
distributedRelation,
copiedDistributedRelation,
copyStatement->filename,
copyStatement->is_program,
NULL,
copyStatement->attlist,
copyStatement->options);
#else
copyState = BeginCopyFrom(distributedRelation,
copyState = BeginCopyFrom(copiedDistributedRelation,
copyStatement->filename,
copyStatement->is_program,
copyStatement->attlist,

View File

@ -42,6 +42,7 @@
#include "distributed/metadata_sync.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
@ -108,6 +109,8 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement);
/* Local functions forward declarations for processing distributed table commands */
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
bool *commandMustRunAsOwner);
static void ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement);
static void ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement);
static List * PlanIndexStmt(IndexStmt *createIndexStatement,
const char *createIndexCommand);
static List * PlanDropIndexStmt(DropStmt *dropIndexStatement,
@ -154,8 +157,8 @@ static void ShowNoticeIfNotUsing2PC(void);
static List * DDLTaskList(Oid relationId, const char *commandString);
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString);
static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString);
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
void *arg);
static void CheckCopyPermissions(CopyStmt *copyStatement);
@ -467,6 +470,29 @@ multi_ProcessUtility(PlannedStmt *pstmt,
params, dest, completionTag);
#endif
/*
* We only process CREATE TABLE ... PARTITION OF commands in the function below
* to handle the case when user creates a table as a partition of distributed table.
*/
if (IsA(parsetree, CreateStmt))
{
CreateStmt *createStatement = (CreateStmt *) parsetree;
ProcessCreateTableStmtPartitionOf(createStatement);
}
/*
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
* and distribute the partition if necessary.
*/
if (IsA(parsetree, AlterTableStmt))
{
AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree;
ProcessAlterTableStmtAttachPartition(alterTableStatement);
}
/* don't run post-process code for local commands */
if (ddlJobs != NIL)
{
@ -776,6 +802,135 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
}
/*
* ProcessCreateTableStmtPartitionOf takes CreateStmt object as a parameter but
* it only processes CREATE TABLE ... PARTITION OF statements and it checks if
* user creates the table as a partition of a distributed table. In that case,
* it distributes partition as well. Since the table itself is a partition,
* CreateDistributedTable will attach it to its parent table automatically after
* distributing it.
*
* This function does nothing if PostgreSQL's version is less then 10 and given
* CreateStmt is not a CREATE TABLE ... PARTITION OF command.
*/
static void
ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement)
{
#if (PG_VERSION_NUM >= 100000)
if (createStatement->inhRelations != NIL && createStatement->partbound != NULL)
{
RangeVar *parentRelation = linitial(createStatement->inhRelations);
bool parentMissingOk = false;
Oid parentRelationId = RangeVarGetRelid(parentRelation, NoLock,
parentMissingOk);
/* a partition can only inherit from single parent table */
Assert(list_length(createStatement->inhRelations) == 1);
Assert(parentRelationId != InvalidOid);
/*
* If a partition is being created and if its parent is a distributed
* table, we will distribute this table as well.
*/
if (IsDistributedTable(parentRelationId))
{
bool missingOk = false;
Oid relationId = RangeVarGetRelid(createStatement->relation, NoLock,
missingOk);
Var *parentDistributionColumn = DistPartitionKey(parentRelationId);
char parentDistributionMethod = DISTRIBUTE_BY_HASH;
char *parentRelationName = get_rel_name(parentRelationId);
bool viaDeprecatedAPI = false;
CreateDistributedTable(relationId, parentDistributionColumn,
parentDistributionMethod, parentRelationName,
viaDeprecatedAPI);
}
}
#endif
}
/*
* ProcessAlterTableStmtAttachPartition takes AlterTableStmt object as parameter
* but it only processes into ALTER TABLE ... ATTACH PARTITION commands and
* distributes the partition if necessary. There are four cases to consider;
*
* Parent is not distributed, partition is not distributed: We do not need to
* do anything in this case.
*
* Parent is not distributed, partition is distributed: This can happen if
* user first distributes a table and tries to attach it to a non-distributed
* table. Non-distributed tables cannot have distributed partitions, thus we
* simply error out in this case.
*
* Parent is distributed, partition is not distributed: We should distribute
* the table and attach it to its parent in workers. CreateDistributedTable
* perform both of these operations. Thus, we will not propagate ALTER TABLE
* ... ATTACH PARTITION command to workers.
*
* Parent is distributed, partition is distributed: Partition is already
* distributed, we only need to attach it to its parent in workers. Attaching
* operation will be performed via propagating this ALTER TABLE ... ATTACH
* PARTITION command to workers.
*
* This function does nothing if PostgreSQL's version is less then 10 and given
* CreateStmt is not a ALTER TABLE ... ATTACH PARTITION OF command.
*/
static void
ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement)
{
#if (PG_VERSION_NUM >= 100000)
List *commandList = alterTableStatement->cmds;
ListCell *commandCell = NULL;
foreach(commandCell, commandList)
{
AlterTableCmd *alterTableCommand = (AlterTableCmd *) lfirst(commandCell);
if (alterTableCommand->subtype == AT_AttachPartition)
{
Oid relationId = AlterTableLookupRelation(alterTableStatement, NoLock);
PartitionCmd *partitionCommand = (PartitionCmd *) alterTableCommand->def;
bool partitionMissingOk = false;
Oid partitionRelationId = RangeVarGetRelid(partitionCommand->name, NoLock,
partitionMissingOk);
/*
* If user first distributes the table then tries to attach it to non
* distributed table, we error out.
*/
if (!IsDistributedTable(relationId) &&
IsDistributedTable(partitionRelationId))
{
char *parentRelationName = get_rel_name(partitionRelationId);
ereport(ERROR, (errmsg("non-distributed tables cannot have "
"distributed partitions"),
errhint("Distribute the partitioned table \"%s\" "
"instead", parentRelationName)));
}
/* if parent of this table is distributed, distribute this table too */
if (IsDistributedTable(relationId) &&
!IsDistributedTable(partitionRelationId))
{
Var *distributionColumn = DistPartitionKey(relationId);
char distributionMethod = DISTRIBUTE_BY_HASH;
char *relationName = get_rel_name(relationId);
bool viaDeprecatedAPI = false;
CreateDistributedTable(partitionRelationId, distributionColumn,
distributionMethod, relationName,
viaDeprecatedAPI);
}
}
}
#endif
}
/*
* PlanIndexStmt determines whether a given CREATE INDEX statement involves
* a distributed table. If so (and if the statement does not use unsupported
@ -1041,6 +1196,42 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
constraint->skip_validation = true;
}
}
#if (PG_VERSION_NUM >= 100000)
else if (alterTableType == AT_AttachPartition)
{
PartitionCmd *partitionCommand = (PartitionCmd *) command->def;
/*
* We only support ALTER TABLE ATTACH PARTITION, if it is only subcommand of
* ALTER TABLE. It was already checked in ErrorIfUnsupportedAlterTableStmt.
*/
Assert(list_length(commandList) <= 1);
rightRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, false);
/*
* Do not generate tasks if relation is distributed and the partition
* is not distributed. Because, we'll manually convert the partition into
* distributed table and co-locate with its parent.
*/
if (!IsDistributedTable(rightRelationId))
{
return NIL;
}
}
else if (alterTableType == AT_DetachPartition)
{
PartitionCmd *partitionCommand = (PartitionCmd *) command->def;
/*
* We only support ALTER TABLE DETACH PARTITION, if it is only subcommand of
* ALTER TABLE. It was already checked in ErrorIfUnsupportedAlterTableStmt.
*/
Assert(list_length(commandList) <= 1);
rightRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, false);
}
#endif
}
ddlJob = palloc0(sizeof(DDLJob));
@ -1051,8 +1242,8 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
if (rightRelationId)
{
/* if foreign key related, use specialized task list function ... */
ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId,
alterTableCommand);
ddlJob->taskList = InterShardDDLTaskList(leftRelationId, rightRelationId,
alterTableCommand);
}
else
{
@ -1759,6 +1950,54 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
break;
}
#if (PG_VERSION_NUM >= 100000)
case AT_AttachPartition:
{
Oid relationId = AlterTableLookupRelation(alterTableStatement,
NoLock);
PartitionCmd *partitionCommand = (PartitionCmd *) command->def;
bool missingOK = false;
Oid partitionRelationId = RangeVarGetRelid(partitionCommand->name,
NoLock, missingOK);
/* we only allow partitioning commands if they are only subcommand */
if (commandList->length > 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot execute ATTACH PARTITION command "
"with other subcommands"),
errhint("You can issue each subcommand "
"separately.")));
}
if (IsDistributedTable(partitionRelationId) &&
!TablesColocated(relationId, partitionRelationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributed tables cannot have "
"non-colocated distributed tables as a "
"partition ")));
}
break;
}
case AT_DetachPartition:
{
/* we only allow partitioning commands if they are only subcommand */
if (commandList->length > 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot execute DETACH PARTITION command "
"with other subcommands"),
errhint("You can issue each subcommand "
"separately.")));
}
break;
}
#endif
case AT_SetNotNull:
case AT_DropConstraint:
case AT_EnableTrigAll:
@ -1776,9 +2015,10 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("alter table command is currently unsupported"),
errdetail("Only ADD|DROP COLUMN, SET|DROP NOT NULL,"
" SET|DROP DEFAULT, ADD|DROP CONSTRAINT and "
"TYPE subcommands are supported.")));
errdetail("Only ADD|DROP COLUMN, SET|DROP NOT NULL, "
"SET|DROP DEFAULT, ADD|DROP CONSTRAINT, "
"ATTACH|DETACH PARTITION and TYPE subcommands "
"are supported.")));
}
}
}
@ -2786,16 +3026,17 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
/*
* ForeignKeyTaskList builds a list of tasks to execute a foreign key command on a
* shards of given list of distributed table.
* InterShardDDLTaskList builds a list of tasks to execute a inter shard DDL command on a
* shards of given list of distributed table. At the moment this function is used to run
* foreign key and partitioning command on worker node.
*
* leftRelationId is the relation id of actual distributed table which given foreign key
* command is applied. rightRelationId is the relation id of distributed table which
* foreign key refers to.
* leftRelationId is the relation id of actual distributed table which given command is
* applied. rightRelationId is the relation id of distributed table which given command
* refers to.
*/
static List *
ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString)
InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString)
{
List *taskList = NIL;

View File

@ -194,7 +194,8 @@ DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardInterval
else
{
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("expire target is not a regular or foreign table")));
errmsg("expire target is not a regular, foreign or partitioned "
"table")));
}
connection = GetNodeConnection(connectionFlag, workerNode->workerName,

View File

@ -402,7 +402,8 @@ RecreateTableDDLCommandList(Oid relationId)
else
{
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("repair target is not a regular or foreign table")));
errmsg("repair target is not a regular, foreign or partitioned "
"table")));
}
dropCommandList = list_make1(dropCommand->data);

View File

@ -24,6 +24,10 @@
#include "commands/tablecmds.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#if (PG_VERSION_NUM >= 100000)
#include "catalog/partition.h"
#endif
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/multi_client_executor.h"
@ -31,6 +35,7 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/placement_connection.h"
@ -340,11 +345,7 @@ CheckDistributedTable(Oid relationId)
char *relationName = get_rel_name(relationId);
/* check that the relationId belongs to a table */
char tableType = get_rel_relkind(relationId);
if (!(tableType == RELKIND_RELATION || tableType == RELKIND_FOREIGN_TABLE))
{
ereport(ERROR, (errmsg("relation \"%s\" is not a table", relationName)));
}
EnsureRelationKindSupported(relationId);
if (!IsDistributedTable(relationId))
{
@ -368,6 +369,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
int placementsCreated = 0;
int attemptNumber = 0;
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(relationId);
char *alterTableAttachPartitionCommand = NULL;
bool includeSequenceDefaults = false;
List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults);
uint32 connectionFlag = FOR_DDL;
@ -402,7 +404,8 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
}
WorkerCreateShard(relationId, shardIndex, shardId, ddlCommandList,
foreignConstraintCommandList, connection);
foreignConstraintCommandList, alterTableAttachPartitionCommand,
connection);
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize,
nodeGroupId);
@ -483,12 +486,20 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
ListCell *connectionCell = NULL;
ListCell *shardPlacementCell = NULL;
int connectionFlags = FOR_DDL;
char *alterTableAttachPartitionCommand = NULL;
if (useExclusiveConnection)
{
connectionFlags |= CONNECTION_PER_PLACEMENT;
}
if (PartitionTable(distributedRelationId))
{
alterTableAttachPartitionCommand =
GenerateAlterTableAttachPartitionCommand(distributedRelationId);
}
BeginOrContinueCoordinatedTransaction();
foreach(shardPlacementCell, shardPlacements)
@ -517,7 +528,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
WorkerCreateShard(distributedRelationId, shardIndex, shardId,
ddlCommandList, foreignConstraintCommandList,
connection);
alterTableAttachPartitionCommand, connection);
}
/*
@ -540,7 +551,8 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
*/
void
WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList,
List *foreignConstraintCommandList, MultiConnection *connection)
List *foreignConstraintCommandList,
char *alterTableAttachPartitionCommand, MultiConnection *connection)
{
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
@ -618,6 +630,40 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
ExecuteCriticalRemoteCommand(connection, applyForeignConstraintCommand->data);
}
/*
* If the shard is created for a partition, send the command to create the
* partitioning hierarcy on the shard.
*/
if (alterTableAttachPartitionCommand != NULL)
{
Oid parentRelationId = PartitionParentOid(relationId);
uint64 correspondingParentShardId = InvalidOid;
StringInfo applyAttachPartitionCommand = makeStringInfo();
Oid parentSchemaId = InvalidOid;
char *parentSchemaName = NULL;
char *escapedParentSchemaName = NULL;
char *escapedCommand = NULL;
Assert(PartitionTable(relationId));
parentSchemaId = get_rel_namespace(parentRelationId);
parentSchemaName = get_namespace_name(parentSchemaId);
escapedParentSchemaName = quote_literal_cstr(parentSchemaName);
escapedCommand = quote_literal_cstr(alterTableAttachPartitionCommand);
correspondingParentShardId = ColocatedShardIdInRelation(parentRelationId,
shardIndex);
appendStringInfo(applyAttachPartitionCommand,
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, correspondingParentShardId,
escapedParentSchemaName, shardId, escapedSchemaName,
escapedCommand);
ExecuteCriticalRemoteCommand(connection, applyAttachPartitionCommand->data);
}
}

View File

@ -22,6 +22,7 @@
#include "distributed/multi_planner.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_master_planner.h"
#include "distributed/multi_router_planner.h"
@ -31,6 +32,7 @@
#include "parser/parsetree.h"
#include "optimizer/pathnode.h"
#include "optimizer/planner.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@ -68,7 +70,8 @@ static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *origin
Query *query, ParamListInfo boundParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static void AssignRTEIdentities(Query *queryTree);
static void AdjustParseTree(Query *parse, bool assignRTEIdentities,
bool setPartitionedTablesInherited);
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan);
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan,
@ -91,6 +94,8 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
bool needsDistributedPlanning = NeedsDistributedPlanning(parse);
Query *originalQuery = NULL;
PlannerRestrictionContext *plannerRestrictionContext = NULL;
bool assignRTEIdentities = false;
bool setPartitionedTablesInherited = false;
/*
* standard_planner scribbles on it's input, but for deparsing we need the
@ -99,8 +104,10 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
if (needsDistributedPlanning)
{
originalQuery = copyObject(parse);
assignRTEIdentities = true;
setPartitionedTablesInherited = false;
AssignRTEIdentities(parse);
AdjustParseTree(parse, assignRTEIdentities, setPartitionedTablesInherited);
}
/* create a restriction context and put it at the end if context list */
@ -128,6 +135,14 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
}
PG_END_TRY();
if (needsDistributedPlanning)
{
assignRTEIdentities = false;
setPartitionedTablesInherited = true;
AdjustParseTree(parse, assignRTEIdentities, setPartitionedTablesInherited);
}
/* remove the context from the context list */
PopPlannerRestrictionContext();
@ -152,19 +167,18 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
/*
* AssignRTEIdentities assigns unique identities to the
* RTE_RELATIONs in the given query.
* AdjustParseTree function modifies query tree by adding RTE identities to the
* RTE_RELATIONs and changing inh flag and relkind of partitioned tables. We
* perform these operations to ensure PostgreSQL's standard planner behaves as
* we need.
*
* To be able to track individual RTEs through postgres' query
* planning, we need to be able to figure out whether an RTE is
* actually a copy of another, rather than a different one. We
* simply number the RTEs starting from 1.
*
* Note that we're only interested in RTE_RELATIONs and thus assigning
* identifiers to those RTEs only.
* Please note that, we want to avoid modifying query tree as much as possible
* because if PostgreSQL changes the way it uses modified fields, that may break
* our logic.
*/
static void
AssignRTEIdentities(Query *queryTree)
AdjustParseTree(Query *queryTree, bool assignRTEIdentities,
bool setPartitionedTablesInherited)
{
List *rangeTableList = NIL;
ListCell *rangeTableCell = NULL;
@ -177,12 +191,42 @@ AssignRTEIdentities(Query *queryTree)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
if (rangeTableEntry->rtekind != RTE_RELATION)
/*
* To be able to track individual RTEs through PostgreSQL's query
* planning, we need to be able to figure out whether an RTE is
* actually a copy of another, rather than a different one. We
* simply number the RTEs starting from 1.
*
* Note that we're only interested in RTE_RELATIONs and thus assigning
* identifiers to those RTEs only.
*/
if (assignRTEIdentities && rangeTableEntry->rtekind == RTE_RELATION)
{
continue;
AssignRTEIdentity(rangeTableEntry, rteIdentifier++);
}
AssignRTEIdentity(rangeTableEntry, rteIdentifier++);
/*
* We want Postgres to behave partitioned tables as regular relations
* (i.e. we do not want to expand them to their partitions). To do this
* we set each distributed partitioned table's inh flag to appropriate
* value before and after dropping to the standart_planner.
*/
if (IsDistributedTable(rangeTableEntry->relid) &&
PartitionedTable(rangeTableEntry->relid))
{
rangeTableEntry->inh = setPartitionedTablesInherited;
#if (PG_VERSION_NUM >= 100000)
if (setPartitionedTablesInherited)
{
rangeTableEntry->relkind = RELKIND_PARTITIONED_TABLE;
}
else
{
rangeTableEntry->relkind = RELKIND_RELATION;
}
#endif
}
}
}

View File

@ -451,8 +451,8 @@ RelayEventExtendNamesForInterShardCommands(Node *parseTree, uint64 leftShardId,
}
}
#if (PG_VERSION_NUM >= 100000)
else if (command->subtype == AT_AttachPartition || command->subtype ==
AT_DetachPartition)
else if (command->subtype == AT_AttachPartition ||
command->subtype == AT_DetachPartition)
{
PartitionCmd *partitionCommand = (PartitionCmd *) command->def;
@ -469,9 +469,8 @@ RelayEventExtendNamesForInterShardCommands(Node *parseTree, uint64 leftShardId,
SetSchemaNameIfNotExist(relationSchemaName, rightShardSchemaName);
/*
* We will not append shard id to referencing table name or
* constraint name. They will be handled when we drop into
* RelayEventExtendNames.
* We will not append shard id to left shard name. This will be
* handled when we drop into RelayEventExtendNames.
*/
AppendShardIdToName(referencedTableName, rightShardId);
}
@ -622,6 +621,7 @@ AppendShardIdToName(char **name, uint64 shardId)
{
snprintf(extendedName, NAMEDATALEN, "%s%s", (*name), shardIdAndSeparator);
}
/*
* Otherwise, we need to truncate the name further to accommodate
* a sufficient hash value. The resulting name will avoid collision

View File

@ -307,12 +307,7 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults)
initStringInfo(&buffer);
relationKind = relation->rd_rel->relkind;
#if (PG_VERSION_NUM >= 100000)
if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE)
#else
if (relationKind == RELKIND_RELATION)
#endif
if (RegularTable(tableRelationId))
{
appendStringInfoString(&buffer, "CREATE ");
@ -461,6 +456,7 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults)
* If the relation is a foreign table, append the server name and options to
* the create table statement.
*/
relationKind = relation->rd_rel->relkind;
if (relationKind == RELKIND_FOREIGN_TABLE)
{
ForeignTable *foreignTable = GetForeignTable(tableRelationId);

View File

@ -17,7 +17,7 @@
#include "catalog/pg_inherits.h"
#include "catalog/pg_inherits_fn.h"
#include "distributed/citus_ruleutils.h"
#include <distributed/multi_partitioning_utils.h>
#include "distributed/multi_partitioning_utils.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "utils/builtins.h"
@ -159,6 +159,26 @@ IsParentTable(Oid relationId)
}
/*
* Wrapper around get_partition_parent
*
* Note: Because this function assumes that the relation whose OID is passed
* as an argument will have precisely one parent, it should only be called
* when it is known that the relation is a partition.
*/
Oid
PartitionParentOid(Oid partitionOid)
{
Oid partitionParentOid = InvalidOid;
#if (PG_VERSION_NUM >= 100000)
partitionParentOid = get_partition_parent(partitionOid);
#endif
return partitionParentOid;
}
/*
* Takes a parent relation and returns Oid list of its partitions. The
* function errors out if the given relation is not a parent.

View File

@ -690,7 +690,7 @@ LocalTableSize(Oid relationId)
Datum relationIdDatum = ObjectIdGetDatum(relationId);
relationType = get_rel_relkind(relationId);
if (relationType == RELKIND_RELATION)
if (RegularTable(relationId))
{
Datum tableSizeDatum = DirectFunctionCall1(pg_table_size, relationIdDatum);

View File

@ -62,12 +62,7 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
/* first check the relation type */
distributedRelation = relation_open(relationId, AccessShareLock);
relationKind = distributedRelation->rd_rel->relkind;
if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE)
{
char *relationName = generate_relation_name(relationId, NIL);
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("%s is not a regular or foreign table", relationName)));
}
EnsureRelationKindSupported(relationId);
/* close the relation since we do not need anymore */
relation_close(distributedRelation, AccessShareLock);

View File

@ -125,6 +125,7 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
extern void CreateReferenceTableShard(Oid distributedTableId);
extern void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId,
List *ddlCommandList, List *foreignConstraintCommandList,
char *alterTableAttachPartitionCommand,
MultiConnection *connection);
extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
extern void CheckHashPartitionedTable(Oid distributedTableId);

View File

@ -15,6 +15,7 @@ extern bool PartitionedTable(Oid relationId);
extern bool PartitionTable(Oid relationId);
extern bool IsChildTable(Oid relationId);
extern bool IsParentTable(Oid relationId);
extern Oid PartitionParentOid(Oid partitionOid);
extern List * PartitionList(Oid parentRelationId);
extern char * GenerateDetachPartitionCommand(Oid partitionTableId);
extern char * GenerateAlterTableAttachPartitionCommand(Oid partitionTableId);

View File

@ -0,0 +1,937 @@
--
-- Distributed Partitioned Table Tests
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
--
-- Distributed Partitioned Table Creation Tests
--
-- 1-) Distributing partitioned table
-- create partitioned table
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
-- distribute partitioned table
SELECT create_distributed_table('partitioning_test', 'id');
NOTICE: Copying data from local table...
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
-- see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
id | time
----+------------
1 | 06-06-2009
2 | 07-07-2010
3 | 09-09-2009
4 | 03-03-2010
(4 rows)
-- see partitioned table and its partitions are distributed
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
ORDER BY 1;
logicalrelid
------------------------
partitioning_test
partitioning_test_2009
partitioning_test_2010
(3 rows)
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | count
------------------------+-------
partitioning_test | 4
partitioning_test_2009 | 4
partitioning_test_2010 | 4
(3 rows)
-- 2-) Creating partition of a distributed table
CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01');
-- new partition is automatically distributed as well
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
ORDER BY 1;
logicalrelid
------------------------
partitioning_test
partitioning_test_2011
(2 rows)
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | count
------------------------+-------
partitioning_test | 4
partitioning_test_2011 | 4
(2 rows)
-- 3-) Attaching non distributed table to a distributed table
CREATE TABLE partitioning_test_2012(id int, time date);
-- load some data
INSERT INTO partitioning_test_2012 VALUES (5, '2012-06-06');
INSERT INTO partitioning_test_2012 VALUES (6, '2012-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01');
NOTICE: Copying data from local table...
-- attached partition is distributed as well
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
ORDER BY 1;
logicalrelid
------------------------
partitioning_test
partitioning_test_2012
(2 rows)
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | count
------------------------+-------
partitioning_test | 4
partitioning_test_2012 | 4
(2 rows)
-- see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
id | time
----+------------
1 | 06-06-2009
2 | 07-07-2010
3 | 09-09-2009
4 | 03-03-2010
5 | 06-06-2012
6 | 07-07-2012
(6 rows)
-- 4-) Attaching distributed table to distributed table
CREATE TABLE partitioning_test_2013(id int, time date);
SELECT create_distributed_table('partitioning_test_2013', 'id');
create_distributed_table
--------------------------
(1 row)
-- load some data
INSERT INTO partitioning_test_2013 VALUES (7, '2013-06-06');
INSERT INTO partitioning_test_2013 VALUES (8, '2013-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2013 FOR VALUES FROM ('2013-01-01') TO ('2014-01-01');
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
-- see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
id | time
----+------------
1 | 06-06-2009
2 | 07-07-2010
3 | 09-09-2009
4 | 03-03-2010
5 | 06-06-2012
6 | 07-07-2012
7 | 06-06-2013
8 | 07-07-2013
(8 rows)
-- 5-) Failure cases while creating distributed partitioned tables
-- cannot distribute a partition if its parent is not distributed
CREATE TABLE partitioning_test_failure(id int, time date) PARTITION BY RANGE (time);
CREATE TABLE partitioning_test_failure_2009 PARTITION OF partitioning_test_failure FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
SELECT create_distributed_table('partitioning_test_failure_2009', 'id');
ERROR: cannot distribute relation "partitioning_test_failure_2009" which is partition of "partitioning_test_failure"
DETAIL: Citus does not support distributing partitions if their parent is not distributed table.
HINT: Distribute the partitioned table "partitioning_test_failure" instead.
-- only hash distributed tables can have partitions
SELECT create_distributed_table('partitioning_test_failure', 'id', 'append');
ERROR: distributing partitioned tables in only supported for hash-distributed tables
SELECT create_distributed_table('partitioning_test_failure', 'id', 'range');
ERROR: distributing partitioned tables in only supported for hash-distributed tables
SELECT create_reference_table('partitioning_test_failure');
ERROR: distributing partitioned tables in only supported for hash-distributed tables
-- replication factor > 1 is not allowed in distributed partitioned tables
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('partitioning_test_failure', 'id');
ERROR: distributing partitioned tables with replication factor greater than 1 is not supported
SET citus.shard_replication_factor TO 1;
-- non-distributed tables cannot have distributed partitions;
DROP TABLE partitioning_test_failure_2009;
CREATE TABLE partitioning_test_failure_2009(id int, time date);
SELECT create_distributed_table('partitioning_test_failure_2009', 'id');
create_distributed_table
--------------------------
(1 row)
ALTER TABLE partitioning_test_failure ATTACH PARTITION partitioning_test_failure_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
ERROR: non-distributed tables cannot have distributed partitions
HINT: Distribute the partitioned table "partitioning_test_failure_2009" instead
-- multi-level partitioning is not allowed
DROP TABLE partitioning_test_failure_2009;
CREATE TABLE partitioning_test_failure_2009 PARTITION OF partitioning_test_failure FOR VALUES FROM ('2009-01-01') TO ('2010-01-01') PARTITION BY RANGE (time);
SELECT create_distributed_table('partitioning_test_failure', 'id');
ERROR: distributing multi-level partitioned tables is not supported
DETAIL: Relation "partitioning_test_failure_2009" is partitioned table itself and it is also partition of relation "partitioning_test_failure".
-- multi-level partitioning is not allowed in different order
DROP TABLE partitioning_test_failure_2009;
SELECT create_distributed_table('partitioning_test_failure', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE partitioning_test_failure_2009 PARTITION OF partitioning_test_failure FOR VALUES FROM ('2009-01-01') TO ('2010-01-01') PARTITION BY RANGE (time);
ERROR: distributing multi-level partitioned tables is not supported
DETAIL: Relation "partitioning_test_failure_2009" is partitioned table itself and it is also partition of relation "partitioning_test_failure".
--
-- DMLs in distributed partitioned tables
--
-- test COPY
-- COPY data to partitioned table
COPY partitioning_test FROM STDIN WITH CSV;
-- COPY data to partition directly
COPY partitioning_test_2009 FROM STDIN WITH CSV;
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id >= 9 ORDER BY 1;
id | time
----+------------
9 | 01-01-2009
10 | 01-01-2010
11 | 01-01-2011
12 | 01-01-2012
13 | 01-02-2009
14 | 01-03-2009
(6 rows)
-- test INSERT
-- INSERT INTO the partitioned table
INSERT INTO partitioning_test VALUES(15, '2009-02-01');
INSERT INTO partitioning_test VALUES(16, '2010-02-01');
INSERT INTO partitioning_test VALUES(17, '2011-02-01');
INSERT INTO partitioning_test VALUES(18, '2012-02-01');
-- INSERT INTO the partitions directly table
INSERT INTO partitioning_test VALUES(19, '2009-02-02');
INSERT INTO partitioning_test VALUES(20, '2010-02-02');
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id >= 15 ORDER BY 1;
id | time
----+------------
15 | 02-01-2009
16 | 02-01-2010
17 | 02-01-2011
18 | 02-01-2012
19 | 02-02-2009
20 | 02-02-2010
(6 rows)
-- test INSERT/SELECT
-- INSERT/SELECT from partition to partitioned table
INSERT INTO partitioning_test SELECT * FROM partitioning_test_2011;
-- INSERT/SELECT from partitioned table to partition
INSERT INTO partitioning_test_2012 SELECT * FROM partitioning_test WHERE time >= '2012-01-01' AND time < '2013-01-01';
-- see the data is loaded to shards (rows in the given range should be duplicated)
SELECT * FROM partitioning_test WHERE time >= '2011-01-01' AND time < '2013-01-01' ORDER BY 1;
id | time
----+------------
5 | 06-06-2012
5 | 06-06-2012
6 | 07-07-2012
6 | 07-07-2012
11 | 01-01-2011
11 | 01-01-2011
12 | 01-01-2012
12 | 01-01-2012
17 | 02-01-2011
17 | 02-01-2011
18 | 02-01-2012
18 | 02-01-2012
(12 rows)
-- test UPDATE
-- UPDATE partitioned table
UPDATE partitioning_test SET time = '2013-07-07' WHERE id = 7;
-- UPDATE partition directly
UPDATE partitioning_test_2013 SET time = '2013-08-08' WHERE id = 8;
-- see the data is updated
SELECT * FROM partitioning_test WHERE id = 7 OR id = 8 ORDER BY 1;
id | time
----+------------
7 | 07-07-2013
8 | 08-08-2013
(2 rows)
-- UPDATE that tries to move a row to a non-existing partition (this should fail)
UPDATE partitioning_test SET time = '2020-07-07' WHERE id = 7;
ERROR: new row for relation "partitioning_test_2013_1660021" violates partition constraint
DETAIL: Failing row contains (7, 2020-07-07).
CONTEXT: while executing command on localhost:57638
-- UPDATE with subqueries on partitioned table
UPDATE
partitioning_test
SET
time = time + INTERVAL '1 day'
WHERE
id IN (SELECT id FROM partitioning_test WHERE id = 1);
-- UPDATE with subqueries on partition
UPDATE
partitioning_test_2009
SET
time = time + INTERVAL '1 month'
WHERE
id IN (SELECT id FROM partitioning_test WHERE id = 2);
-- see the data is updated
SELECT * FROM partitioning_test WHERE id = 1 OR id = 2 ORDER BY 1;
id | time
----+------------
1 | 06-07-2009
2 | 07-07-2010
(2 rows)
-- test DELETE
-- DELETE from partitioned table
DELETE FROM partitioning_test WHERE id = 9;
-- DELETE from partition directly
DELETE FROM partitioning_test_2010 WHERE id = 10;
-- see the data is deleted
SELECT * FROM partitioning_test WHERE id = 9 OR id = 10 ORDER BY 1;
id | time
----+------
(0 rows)
-- test master_modify_multiple_shards
-- master_modify_multiple_shards on partitioned table
SELECT master_modify_multiple_shards('UPDATE partitioning_test SET time = time + INTERVAL ''1 day''');
master_modify_multiple_shards
-------------------------------
24
(1 row)
-- see rows are UPDATED
SELECT * FROM partitioning_test ORDER BY 1;
id | time
----+------------
1 | 06-08-2009
2 | 07-08-2010
3 | 09-10-2009
4 | 03-04-2010
5 | 06-07-2012
5 | 06-07-2012
6 | 07-08-2012
6 | 07-08-2012
7 | 07-08-2013
8 | 08-09-2013
11 | 01-02-2011
11 | 01-02-2011
12 | 01-02-2012
12 | 01-02-2012
13 | 01-03-2009
14 | 01-04-2009
15 | 02-02-2009
16 | 02-02-2010
17 | 02-02-2011
17 | 02-02-2011
18 | 02-02-2012
18 | 02-02-2012
19 | 02-03-2009
20 | 02-03-2010
(24 rows)
-- master_modify_multiple_shards on partition directly
SELECT master_modify_multiple_shards('UPDATE partitioning_test_2009 SET time = time + INTERVAL ''1 day''');
master_modify_multiple_shards
-------------------------------
6
(1 row)
-- see rows are UPDATED
SELECT * FROM partitioning_test_2009 ORDER BY 1;
id | time
----+------------
1 | 06-09-2009
3 | 09-11-2009
13 | 01-04-2009
14 | 01-05-2009
15 | 02-03-2009
19 | 02-04-2009
(6 rows)
-- test master_modify_multiple_shards which fails in workers (updated value is outside of partition bounds)
SELECT master_modify_multiple_shards('UPDATE partitioning_test_2009 SET time = time + INTERVAL ''6 month''');
ERROR: new row for relation "partitioning_test_2009_1660005" violates partition constraint
DETAIL: Failing row contains (3, 2010-03-11).
CONTEXT: while executing command on localhost:57638
--
-- DDL in distributed partitioned tables
--
-- test CREATE INDEX
-- CREATE INDEX on partitioned table - this will error out
CREATE INDEX partitioning_index ON partitioning_test(id);
ERROR: cannot create index on partitioned table "partitioning_test"
-- CREATE INDEX on partition
CREATE INDEX partitioning_2009_index ON partitioning_test_2009(id);
-- CREATE INDEX CONCURRENTLY on partition
CREATE INDEX CONCURRENTLY partitioned_2010_index ON partitioning_test_2010(id);
-- see index is created
SELECT tablename, indexname FROM pg_indexes WHERE tablename LIKE 'partitioning_test%' ORDER BY indexname;
tablename | indexname
------------------------+-------------------------
partitioning_test_2010 | partitioned_2010_index
partitioning_test_2009 | partitioning_2009_index
(2 rows)
-- test add COLUMN
-- add COLUMN to partitioned table
ALTER TABLE partitioning_test ADD new_column int;
-- add COLUMN to partition - this will error out
ALTER TABLE partitioning_test_2010 ADD new_column_2 int;
ERROR: cannot add column to a partition
-- see additional column is created
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test'::regclass ORDER BY 1;
name | type
------------+---------
id | integer
new_column | integer
time | date
(3 rows)
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test_2010'::regclass ORDER BY 1;
name | type
------------+---------
id | integer
new_column | integer
time | date
(3 rows)
-- test add PRIMARY KEY
-- add PRIMARY KEY to partitioned table - this will error out
ALTER TABLE partitioning_test ADD CONSTRAINT partitioning_primary PRIMARY KEY (id);
ERROR: primary key constraints are not supported on partitioned tables
LINE 1: ALTER TABLE partitioning_test ADD CONSTRAINT partitioning_pr...
^
-- ADD PRIMARY KEY to partition
ALTER TABLE partitioning_test_2009 ADD CONSTRAINT partitioning_2009_primary PRIMARY KEY (id);
-- see PRIMARY KEY is created
SELECT
table_name,
constraint_name,
constraint_type
FROM
information_schema.table_constraints
WHERE
table_name = 'partitioning_test_2009' AND
constraint_name = 'partitioning_2009_primary';
table_name | constraint_name | constraint_type
------------------------+---------------------------+-----------------
partitioning_test_2009 | partitioning_2009_primary | PRIMARY KEY
(1 row)
-- test ADD FOREIGN CONSTRAINT
-- add FOREIGN CONSTRAINT to partitioned table -- this will error out
ALTER TABLE partitioning_test ADD CONSTRAINT partitioning_foreign FOREIGN KEY (id) REFERENCES partitioning_test_2009 (id);
ERROR: foreign key constraints are not supported on partitioned tables
LINE 1: ALTER TABLE partitioning_test ADD CONSTRAINT partitioning_fo...
^
-- add FOREIGN CONSTRAINT to partition
INSERT INTO partitioning_test_2009 VALUES (5, '2009-06-06');
INSERT INTO partitioning_test_2009 VALUES (6, '2009-07-07');
INSERT INTO partitioning_test_2009 VALUES(12, '2009-02-01');
INSERT INTO partitioning_test_2009 VALUES(18, '2009-02-01');
ALTER TABLE partitioning_test_2012 ADD CONSTRAINT partitioning_2012_foreign FOREIGN KEY (id) REFERENCES partitioning_test_2009 (id) ON DELETE CASCADE;
-- see FOREIGN KEY is created
SELECT "Constraint" FROM table_fkeys WHERE relid = 'partitioning_test_2012'::regclass ORDER BY 1;
Constraint
---------------------------
partitioning_2012_foreign
(1 row)
-- test ON DELETE CASCADE works
DELETE FROM partitioning_test_2009 WHERE id = 5;
-- see that element is deleted from both partitions
SELECT * FROM partitioning_test_2009 WHERE id = 5 ORDER BY 1;
id | time | new_column
----+------+------------
(0 rows)
SELECT * FROM partitioning_test_2012 WHERE id = 5 ORDER BY 1;
id | time | new_column
----+------+------------
(0 rows)
-- test DETACH partition
ALTER TABLE partitioning_test DETACH PARTITION partitioning_test_2009;
-- see DETACHed partitions content is not accessible from partitioning_test;
SELECT * FROM partitioning_test WHERE time >= '2009-01-01' AND time < '2010-01-01' ORDER BY 1;
id | time | new_column
----+------+------------
(0 rows)
--
-- Transaction tests
--
-- DDL in transaction
BEGIN;
ALTER TABLE partitioning_test ADD newer_column int;
-- see additional column is created
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test'::regclass ORDER BY 1;
name | type
--------------+---------
id | integer
new_column | integer
newer_column | integer
time | date
(4 rows)
ROLLBACK;
-- see rollback is successful
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test'::regclass ORDER BY 1;
name | type
------------+---------
id | integer
new_column | integer
time | date
(3 rows)
-- COPY in transaction
BEGIN;
COPY partitioning_test FROM STDIN WITH CSV;
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id = 22 ORDER BY 1;
id | time | new_column
----+------------+------------
22 | 01-01-2010 | 22
(1 row)
SELECT * FROM partitioning_test WHERE id = 23 ORDER BY 1;
id | time | new_column
----+------------+------------
23 | 01-01-2011 | 23
(1 row)
SELECT * FROM partitioning_test WHERE id = 24 ORDER BY 1;
id | time | new_column
----+------------+------------
24 | 01-01-2013 | 24
(1 row)
ROLLBACK;
-- see rollback is successful
SELECT * FROM partitioning_test WHERE id >= 22 ORDER BY 1;
id | time | new_column
----+------+------------
(0 rows)
-- DML in transaction
BEGIN;
-- INSERT in transaction
INSERT INTO partitioning_test VALUES(25, '2010-02-02');
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id = 25 ORDER BY 1;
id | time | new_column
----+------------+------------
25 | 02-02-2010 |
(1 row)
-- INSERT/SELECT in transaction
INSERT INTO partitioning_test SELECT * FROM partitioning_test WHERE id = 25;
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id = 25 ORDER BY 1;
id | time | new_column
----+------------+------------
25 | 02-02-2010 |
25 | 02-02-2010 |
(2 rows)
-- UPDATE in transaction
UPDATE partitioning_test SET time = '2010-10-10' WHERE id = 25;
-- see the data is updated
SELECT * FROM partitioning_test WHERE id = 25 ORDER BY 1;
id | time | new_column
----+------------+------------
25 | 10-10-2010 |
25 | 10-10-2010 |
(2 rows)
-- perform operations on partition and partioned tables together
INSERT INTO partitioning_test VALUES(26, '2010-02-02', 26);
INSERT INTO partitioning_test_2010 VALUES(26, '2010-02-02', 26);
COPY partitioning_test FROM STDIN WITH CSV;
COPY partitioning_test_2010 FROM STDIN WITH CSV;
-- see the data is loaded to shards (we should see 4 rows with same content)
SELECT * FROM partitioning_test WHERE id = 26 ORDER BY 1;
id | time | new_column
----+------------+------------
26 | 02-02-2010 | 26
26 | 02-02-2010 | 26
26 | 02-02-2010 | 26
26 | 02-02-2010 | 26
(4 rows)
ROLLBACK;
-- see rollback is successful
SELECT * FROM partitioning_test WHERE id = 26 ORDER BY 1;
id | time | new_column
----+------+------------
(0 rows)
-- DETACH and DROP in a transaction
BEGIN;
ALTER TABLE partitioning_test DETACH PARTITION partitioning_test_2011;
DROP TABLE partitioning_test_2011;
COMMIT;
-- see DROPed partitions content is not accessible
SELECT * FROM partitioning_test WHERE time >= '2011-01-01' AND time < '2012-01-01' ORDER BY 1;
id | time | new_column
----+------+------------
(0 rows)
--
-- Misc tests
--
-- test TRUNCATE
-- test TRUNCATE partition
TRUNCATE partitioning_test_2012;
-- see partition is TRUNCATEd
SELECT * FROM partitioning_test_2012 ORDER BY 1;
id | time | new_column
----+------+------------
(0 rows)
-- test TRUNCATE partitioned table
TRUNCATE partitioning_test;
-- see partitioned table is TRUNCATEd
SELECT * FROM partitioning_test ORDER BY 1;
id | time | new_column
----+------+------------
(0 rows)
-- test DROP
-- test DROP partition
INSERT INTO partitioning_test_2010 VALUES(27, '2010-02-01');
DROP TABLE partitioning_test_2010;
-- see DROPped partitions content is not accessible from partitioning_test;
SELECT * FROM partitioning_test WHERE time >= '2010-01-01' AND time < '2011-01-01' ORDER BY 1;
id | time | new_column
----+------+------------
(0 rows)
-- test DROP partitioned table
DROP TABLE partitioning_test;
-- dropping the parent should CASCADE to the children as well
SELECT table_name FROM information_schema.tables WHERE table_name LIKE 'partitioning_test%' ORDER BY 1;
table_name
---------------------------
partitioning_test_2009
partitioning_test_failure
(2 rows)
-- test distributing partitioned table colocated with non-partitioned table
CREATE TABLE partitioned_users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint) PARTITION BY RANGE (time);
CREATE TABLE partitioned_events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint) PARTITION BY RANGE (time);
SELECT create_distributed_table('partitioned_users_table', 'user_id', colocate_with => 'users_table');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('partitioned_events_table', 'user_id', colocate_with => 'events_table');
create_distributed_table
--------------------------
(1 row)
-- INSERT/SELECT from regular table to partitioned table
CREATE TABLE partitioned_users_table_2009 PARTITION OF partitioned_users_table FOR VALUES FROM ('2014-01-01') TO ('2015-01-01');
CREATE TABLE partitioned_events_table_2009 PARTITION OF partitioned_events_table FOR VALUES FROM ('2014-01-01') TO ('2015-01-01');
INSERT INTO partitioned_events_table SELECT * FROM events_table;
INSERT INTO partitioned_users_table_2009 SELECT * FROM users_table;
--
-- Complex JOINs, subqueries, UNIONs etc...
--
-- subquery with UNIONs on partitioned table
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
(SELECT *, random()
FROM
(SELECT
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
(SELECT
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM(
(SELECT
"events"."user_id", "events"."time", 0 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15))
UNION
(SELECT
"events"."user_id", "events"."time", 1 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (15, 16, 17, 18, 19) )
UNION
(SELECT
"events"."user_id", "events"."time", 2 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) )
UNION
(SELECT
"events"."user_id", "events"."time", 3 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13))) t1
GROUP BY "t1"."user_id") AS t) "q"
) AS final_query
GROUP BY types
ORDER BY types;
types | sumofeventtype
-------+----------------
0 | 55
1 | 38
2 | 70
3 | 58
(4 rows)
-- UNION and JOIN on both partitioned and regular tables
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
(SELECT
*, random()
FROM
(SELECT
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
(SELECT
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT
*
FROM
(SELECT
"events"."time", 0 AS event, "events"."user_id"
FROM
partitioned_events_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1)
UNION
(SELECT *
FROM
(
SELECT * FROM
(
SELECT
max("events"."time"),
0 AS event,
"events"."user_id"
FROM
events_table as "events", users_table as "users"
WHERE
events.user_id = users.user_id AND
event_type IN (10, 11, 12, 13, 14, 15)
GROUP BY "events"."user_id"
) as events_subquery_5
) events_subquery_2)
UNION
(SELECT *
FROM
(SELECT
"events"."time", 2 AS event, "events"."user_id"
FROM
partitioned_events_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3)
UNION
(SELECT *
FROM
(SELECT
"events"."time", 3 AS event, "events"."user_id"
FROM
events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)
) t1
GROUP BY "t1"."user_id") AS t) "q"
INNER JOIN
(SELECT
"users"."user_id"
FROM
partitioned_users_table as "users"
WHERE
value_1 > 50 and value_1 < 70) AS t
ON (t.user_id = q.user_id)) as final_query
GROUP BY
types
ORDER BY
types;
types | sumofeventtype
-------+----------------
0 | 115
2 | 160
3 | 158
(3 rows)
-- test LIST partitioning
CREATE TABLE list_partitioned_events_table (user_id int, time date, event_type int, value_2 int, value_3 float, value_4 bigint) PARTITION BY LIST (time);
CREATE TABLE list_partitioned_events_table_2014_01_01_05 PARTITION OF list_partitioned_events_table FOR VALUES IN ('2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05');
CREATE TABLE list_partitioned_events_table_2014_01_06_10 PARTITION OF list_partitioned_events_table FOR VALUES IN ('2014-01-06', '2014-01-07', '2014-01-08', '2014-01-09', '2014-01-10');
CREATE TABLE list_partitioned_events_table_2014_01_11_15 PARTITION OF list_partitioned_events_table FOR VALUES IN ('2014-01-11', '2014-01-12', '2014-01-13', '2014-01-14', '2014-01-15');
-- test distributing partitioned table colocated with another partitioned table
SELECT create_distributed_table('list_partitioned_events_table', 'user_id', colocate_with => 'partitioned_events_table');
create_distributed_table
--------------------------
(1 row)
-- INSERT/SELECT from partitioned table to partitioned table
INSERT INTO
list_partitioned_events_table
SELECT
user_id,
date_trunc('day', time) as time,
event_type,
value_2,
value_3,
value_4
FROM
events_table
WHERE
time >= '2014-01-01' AND
time <= '2014-01-15';
-- LEFT JOINs used with INNER JOINs on range partitioned table, list partitioned table and non-partitioned table
SELECT
count(*) AS cnt, "generated_group_field"
FROM
(SELECT
"eventQuery"."user_id", random(), generated_group_field
FROM
(SELECT
"multi_group_wrapper_1".*, generated_group_field, random()
FROM
(SELECT *
FROM
(SELECT
"list_partitioned_events_table"."time", "list_partitioned_events_table"."user_id" as event_user_id
FROM
list_partitioned_events_table as "list_partitioned_events_table"
WHERE
user_id > 80) "temp_data_queries"
INNER JOIN
(SELECT
"users"."user_id"
FROM
partitioned_users_table as "users"
WHERE
user_id > 80 and value_2 = 5) "user_filters_1"
ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1"
LEFT JOIN
(SELECT
"users"."user_id" AS "user_id", value_2 AS "generated_group_field"
FROM
partitioned_users_table as "users") "left_group_by_1"
ON ("left_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery"
GROUP BY
"generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
cnt | generated_group_field
-----+-----------------------
68 | 551
68 | 569
68 | 645
68 | 713
68 | 734
34 | 3
34 | 5
34 | 15
34 | 32
34 | 68
(10 rows)
--
-- Additional partitioning features
--
-- test multi column partitioning
CREATE TABLE multi_column_partitioning(c1 int, c2 int) PARTITION BY RANGE (c1, c2);
CREATE TABLE multi_column_partitioning_0_0_10_0 PARTITION OF multi_column_partitioning FOR VALUES FROM (0, 0) TO (10, 0);
SELECT create_distributed_table('multi_column_partitioning', 'c1');
create_distributed_table
--------------------------
(1 row)
-- test INSERT to multi-column partitioned table
INSERT INTO multi_column_partitioning VALUES(1, 1);
INSERT INTO multi_column_partitioning_0_0_10_0 VALUES(5, -5);
-- test INSERT to multi-column partitioned table where no suitable partition exists
INSERT INTO multi_column_partitioning VALUES(10, 1);
ERROR: no partition of relation "multi_column_partitioning_1660068" found for row
DETAIL: Partition key of the failing row contains (c1, c2) = (10, 1).
CONTEXT: while executing command on localhost:57637
-- test with MINVALUE/MAXVALUE
CREATE TABLE multi_column_partitioning_10_max_20_min PARTITION OF multi_column_partitioning FOR VALUES FROM (10, MAXVALUE) TO (20, MINVALUE);
-- test INSERT to partition with MINVALUE/MAXVALUE bounds
INSERT INTO multi_column_partitioning VALUES(11, -11);
INSERT INTO multi_column_partitioning_10_max_20_min VALUES(19, -19);
-- test INSERT to multi-column partitioned table where no suitable partition exists
INSERT INTO multi_column_partitioning VALUES(20, -20);
ERROR: no partition of relation "multi_column_partitioning_1660068" found for row
DETAIL: Partition key of the failing row contains (c1, c2) = (20, -20).
CONTEXT: while executing command on localhost:57637
-- see data is loaded to multi-column partitioned table
SELECT * FROM multi_column_partitioning;
c1 | c2
----+-----
1 | 1
5 | -5
19 | -19
11 | -11
(4 rows)
DROP TABLE IF EXISTS partitioning_test_2012, partitioning_test_2013, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning;
NOTICE: table "partitioning_test_2012" does not exist, skipping
NOTICE: table "partitioning_test_2013" does not exist, skipping

View File

@ -0,0 +1,934 @@
--
-- Distributed Partitioned Table Tests
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
--
-- Distributed Partitioned Table Creation Tests
--
-- 1-) Distributing partitioned table
-- create partitioned table
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ...
^
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin...
^
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2010 PARTITION OF partitionin...
^
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES (1, '2009-06-06');
^
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES (2, '2010-07-07');
^
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
^
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
ERROR: relation "partitioning_test_2010" does not exist
LINE 1: INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
^
-- distribute partitioned table
SELECT create_distributed_table('partitioning_test', 'id');
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test', 'id');
^
-- see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test ORDER BY 1;
^
-- see partitioned table and its partitions are distributed
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20...
^
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
GROUP BY
logicalrelid
ORDER BY
1,2;
ERROR: relation "partitioning_test" does not exist
LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t...
^
-- 2-) Creating partition of a distributed table
CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2011 PARTITION OF partitionin...
^
-- new partition is automatically distributed as well
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20...
^
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
GROUP BY
logicalrelid
ORDER BY
1,2;
ERROR: relation "partitioning_test" does not exist
LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t...
^
-- 3-) Attaching non distributed table to a distributed table
CREATE TABLE partitioning_test_2012(id int, time date);
-- load some data
INSERT INTO partitioning_test_2012 VALUES (5, '2012-06-06');
INSERT INTO partitioning_test_2012 VALUES (6, '2012-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01');
ERROR: syntax error at or near "ATTACH"
LINE 1: ALTER TABLE partitioning_test ATTACH PARTITION partitioning_...
^
-- attached partition is distributed as well
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20...
^
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
GROUP BY
logicalrelid
ORDER BY
1,2;
ERROR: relation "partitioning_test" does not exist
LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t...
^
-- see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test ORDER BY 1;
^
-- 4-) Attaching distributed table to distributed table
CREATE TABLE partitioning_test_2013(id int, time date);
SELECT create_distributed_table('partitioning_test_2013', 'id');
create_distributed_table
--------------------------
(1 row)
-- load some data
INSERT INTO partitioning_test_2013 VALUES (7, '2013-06-06');
INSERT INTO partitioning_test_2013 VALUES (8, '2013-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2013 FOR VALUES FROM ('2013-01-01') TO ('2014-01-01');
ERROR: syntax error at or near "ATTACH"
LINE 1: ALTER TABLE partitioning_test ATTACH PARTITION partitioning_...
^
-- see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test ORDER BY 1;
^
-- 5-) Failure cases while creating distributed partitioned tables
-- cannot distribute a partition if its parent is not distributed
CREATE TABLE partitioning_test_failure(id int, time date) PARTITION BY RANGE (time);
ERROR: syntax error at or near "PARTITION"
LINE 1: ...ABLE partitioning_test_failure(id int, time date) PARTITION ...
^
CREATE TABLE partitioning_test_failure_2009 PARTITION OF partitioning_test_failure FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_failure_2009 PARTITION OF par...
^
SELECT create_distributed_table('partitioning_test_failure_2009', 'id');
ERROR: relation "partitioning_test_failure_2009" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test_failure_2...
^
-- only hash distributed tables can have partitions
SELECT create_distributed_table('partitioning_test_failure', 'id', 'append');
ERROR: relation "partitioning_test_failure" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test_failure',...
^
SELECT create_distributed_table('partitioning_test_failure', 'id', 'range');
ERROR: relation "partitioning_test_failure" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test_failure',...
^
SELECT create_reference_table('partitioning_test_failure');
ERROR: relation "partitioning_test_failure" does not exist
LINE 1: SELECT create_reference_table('partitioning_test_failure');
^
-- replication factor > 1 is not allowed in distributed partitioned tables
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('partitioning_test_failure', 'id');
ERROR: relation "partitioning_test_failure" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test_failure',...
^
SET citus.shard_replication_factor TO 1;
-- non-distributed tables cannot have distributed partitions;
DROP TABLE partitioning_test_failure_2009;
ERROR: table "partitioning_test_failure_2009" does not exist
CREATE TABLE partitioning_test_failure_2009(id int, time date);
SELECT create_distributed_table('partitioning_test_failure_2009', 'id');
create_distributed_table
--------------------------
(1 row)
ALTER TABLE partitioning_test_failure ATTACH PARTITION partitioning_test_failure_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
ERROR: syntax error at or near "ATTACH"
LINE 1: ALTER TABLE partitioning_test_failure ATTACH PARTITION parti...
^
-- multi-level partitioning is not allowed
DROP TABLE partitioning_test_failure_2009;
CREATE TABLE partitioning_test_failure_2009 PARTITION OF partitioning_test_failure FOR VALUES FROM ('2009-01-01') TO ('2010-01-01') PARTITION BY RANGE (time);
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_failure_2009 PARTITION OF par...
^
SELECT create_distributed_table('partitioning_test_failure', 'id');
ERROR: relation "partitioning_test_failure" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test_failure',...
^
-- multi-level partitioning is not allowed in different order
DROP TABLE partitioning_test_failure_2009;
ERROR: table "partitioning_test_failure_2009" does not exist
SELECT create_distributed_table('partitioning_test_failure', 'id');
ERROR: relation "partitioning_test_failure" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test_failure',...
^
CREATE TABLE partitioning_test_failure_2009 PARTITION OF partitioning_test_failure FOR VALUES FROM ('2009-01-01') TO ('2010-01-01') PARTITION BY RANGE (time);
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_failure_2009 PARTITION OF par...
^
--
-- DMLs in distributed partitioned tables
--
-- test COPY
-- COPY data to partitioned table
COPY partitioning_test FROM STDIN WITH CSV;
ERROR: relation "partitioning_test" does not exist
9,2009-01-01
10,2010-01-01
11,2011-01-01
12,2012-01-01
\.
invalid command \.
-- COPY data to partition directly
COPY partitioning_test_2009 FROM STDIN WITH CSV;
ERROR: syntax error at or near "9"
LINE 1: 9,2009-01-01
^
13,2009-01-02
14,2009-01-03
\.
invalid command \.
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id >= 9 ORDER BY 1;
ERROR: syntax error at or near "13"
LINE 1: 13,2009-01-02
^
-- test INSERT
-- INSERT INTO the partitioned table
INSERT INTO partitioning_test VALUES(15, '2009-02-01');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES(15, '2009-02-01');
^
INSERT INTO partitioning_test VALUES(16, '2010-02-01');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES(16, '2010-02-01');
^
INSERT INTO partitioning_test VALUES(17, '2011-02-01');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES(17, '2011-02-01');
^
INSERT INTO partitioning_test VALUES(18, '2012-02-01');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES(18, '2012-02-01');
^
-- INSERT INTO the partitions directly table
INSERT INTO partitioning_test VALUES(19, '2009-02-02');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES(19, '2009-02-02');
^
INSERT INTO partitioning_test VALUES(20, '2010-02-02');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES(20, '2010-02-02');
^
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id >= 15 ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test WHERE id >= 15 ORDER BY 1;
^
-- test INSERT/SELECT
-- INSERT/SELECT from partition to partitioned table
INSERT INTO partitioning_test SELECT * FROM partitioning_test_2011;
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test SELECT * FROM partitioning_tes...
^
-- INSERT/SELECT from partitioned table to partition
INSERT INTO partitioning_test_2012 SELECT * FROM partitioning_test WHERE time >= '2012-01-01' AND time < '2013-01-01';
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test_2012 SELECT * FROM partitionin...
^
-- see the data is loaded to shards (rows in the given range should be duplicated)
SELECT * FROM partitioning_test WHERE time >= '2011-01-01' AND time < '2013-01-01' ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test WHERE time >= '2011-01-01' A...
^
-- test UPDATE
-- UPDATE partitioned table
UPDATE partitioning_test SET time = '2013-07-07' WHERE id = 7;
ERROR: relation "partitioning_test" does not exist
LINE 1: UPDATE partitioning_test SET time = '2013-07-07' WHERE id = ...
^
-- UPDATE partition directly
UPDATE partitioning_test_2013 SET time = '2013-08-08' WHERE id = 8;
-- see the data is updated
SELECT * FROM partitioning_test WHERE id = 7 OR id = 8 ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test WHERE id = 7 OR id = 8 ORDER...
^
-- UPDATE that tries to move a row to a non-existing partition (this should fail)
UPDATE partitioning_test SET time = '2020-07-07' WHERE id = 7;
ERROR: relation "partitioning_test" does not exist
LINE 1: UPDATE partitioning_test SET time = '2020-07-07' WHERE id = ...
^
-- UPDATE with subqueries on partitioned table
UPDATE
partitioning_test
SET
time = time + INTERVAL '1 day'
WHERE
id IN (SELECT id FROM partitioning_test WHERE id = 1);
ERROR: relation "partitioning_test" does not exist
LINE 2: partitioning_test
^
-- UPDATE with subqueries on partition
UPDATE
partitioning_test_2009
SET
time = time + INTERVAL '1 month'
WHERE
id IN (SELECT id FROM partitioning_test WHERE id = 2);
ERROR: relation "partitioning_test_2009" does not exist
LINE 2: partitioning_test_2009
^
-- see the data is updated
SELECT * FROM partitioning_test WHERE id = 1 OR id = 2 ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test WHERE id = 1 OR id = 2 ORDER...
^
-- test DELETE
-- DELETE from partitioned table
DELETE FROM partitioning_test WHERE id = 9;
ERROR: relation "partitioning_test" does not exist
LINE 1: DELETE FROM partitioning_test WHERE id = 9;
^
-- DELETE from partition directly
DELETE FROM partitioning_test_2010 WHERE id = 10;
ERROR: relation "partitioning_test_2010" does not exist
LINE 1: DELETE FROM partitioning_test_2010 WHERE id = 10;
^
-- see the data is deleted
SELECT * FROM partitioning_test WHERE id = 9 OR id = 10 ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test WHERE id = 9 OR id = 10 ORDE...
^
-- test master_modify_multiple_shards
-- master_modify_multiple_shards on partitioned table
SELECT master_modify_multiple_shards('UPDATE partitioning_test SET time = time + INTERVAL ''1 day''');
ERROR: relation "partitioning_test" does not exist
-- see rows are UPDATED
SELECT * FROM partitioning_test ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test ORDER BY 1;
^
-- master_modify_multiple_shards on partition directly
SELECT master_modify_multiple_shards('UPDATE partitioning_test_2009 SET time = time + INTERVAL ''1 day''');
ERROR: relation "partitioning_test_2009" does not exist
-- see rows are UPDATED
SELECT * FROM partitioning_test_2009 ORDER BY 1;
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: SELECT * FROM partitioning_test_2009 ORDER BY 1;
^
-- test master_modify_multiple_shards which fails in workers (updated value is outside of partition bounds)
SELECT master_modify_multiple_shards('UPDATE partitioning_test_2009 SET time = time + INTERVAL ''6 month''');
ERROR: relation "partitioning_test_2009" does not exist
--
-- DDL in distributed partitioned tables
--
-- test CREATE INDEX
-- CREATE INDEX on partitioned table - this will error out
CREATE INDEX partitioning_index ON partitioning_test(id);
ERROR: relation "partitioning_test" does not exist
-- CREATE INDEX on partition
CREATE INDEX partitioning_2009_index ON partitioning_test_2009(id);
ERROR: relation "partitioning_test_2009" does not exist
-- CREATE INDEX CONCURRENTLY on partition
CREATE INDEX CONCURRENTLY partitioned_2010_index ON partitioning_test_2010(id);
ERROR: relation "partitioning_test_2010" does not exist
-- see index is created
SELECT tablename, indexname FROM pg_indexes WHERE tablename LIKE 'partitioning_test%' ORDER BY indexname;
tablename | indexname
-----------+-----------
(0 rows)
-- test add COLUMN
-- add COLUMN to partitioned table
ALTER TABLE partitioning_test ADD new_column int;
ERROR: relation "partitioning_test" does not exist
-- add COLUMN to partition - this will error out
ALTER TABLE partitioning_test_2010 ADD new_column_2 int;
ERROR: relation "partitioning_test_2010" does not exist
-- see additional column is created
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test'::regclass ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT name, type FROM table_attrs WHERE relid = 'partitioni...
^
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test_2010'::regclass ORDER BY 1;
ERROR: relation "partitioning_test_2010" does not exist
LINE 1: SELECT name, type FROM table_attrs WHERE relid = 'partitioni...
^
-- test add PRIMARY KEY
-- add PRIMARY KEY to partitioned table - this will error out
ALTER TABLE partitioning_test ADD CONSTRAINT partitioning_primary PRIMARY KEY (id);
ERROR: relation "partitioning_test" does not exist
-- ADD PRIMARY KEY to partition
ALTER TABLE partitioning_test_2009 ADD CONSTRAINT partitioning_2009_primary PRIMARY KEY (id);
ERROR: relation "partitioning_test_2009" does not exist
-- see PRIMARY KEY is created
SELECT
table_name,
constraint_name,
constraint_type
FROM
information_schema.table_constraints
WHERE
table_name = 'partitioning_test_2009' AND
constraint_name = 'partitioning_2009_primary';
table_name | constraint_name | constraint_type
------------+-----------------+-----------------
(0 rows)
-- test ADD FOREIGN CONSTRAINT
-- add FOREIGN CONSTRAINT to partitioned table -- this will error out
ALTER TABLE partitioning_test ADD CONSTRAINT partitioning_foreign FOREIGN KEY (id) REFERENCES partitioning_test_2009 (id);
ERROR: relation "partitioning_test" does not exist
-- add FOREIGN CONSTRAINT to partition
INSERT INTO partitioning_test_2009 VALUES (5, '2009-06-06');
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: INSERT INTO partitioning_test_2009 VALUES (5, '2009-06-06');
^
INSERT INTO partitioning_test_2009 VALUES (6, '2009-07-07');
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: INSERT INTO partitioning_test_2009 VALUES (6, '2009-07-07');
^
INSERT INTO partitioning_test_2009 VALUES(12, '2009-02-01');
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: INSERT INTO partitioning_test_2009 VALUES(12, '2009-02-01');
^
INSERT INTO partitioning_test_2009 VALUES(18, '2009-02-01');
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: INSERT INTO partitioning_test_2009 VALUES(18, '2009-02-01');
^
ALTER TABLE partitioning_test_2012 ADD CONSTRAINT partitioning_2012_foreign FOREIGN KEY (id) REFERENCES partitioning_test_2009 (id) ON DELETE CASCADE;
ERROR: relation "partitioning_test_2009" does not exist
-- see FOREIGN KEY is created
SELECT "Constraint" FROM table_fkeys WHERE relid = 'partitioning_test_2012'::regclass ORDER BY 1;
Constraint
------------
(0 rows)
-- test ON DELETE CASCADE works
DELETE FROM partitioning_test_2009 WHERE id = 5;
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: DELETE FROM partitioning_test_2009 WHERE id = 5;
^
-- see that element is deleted from both partitions
SELECT * FROM partitioning_test_2009 WHERE id = 5 ORDER BY 1;
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: SELECT * FROM partitioning_test_2009 WHERE id = 5 ORDER BY 1...
^
SELECT * FROM partitioning_test_2012 WHERE id = 5 ORDER BY 1;
id | time
----+------------
5 | 06-06-2012
(1 row)
-- test DETACH partition
ALTER TABLE partitioning_test DETACH PARTITION partitioning_test_2009;
ERROR: syntax error at or near "DETACH"
LINE 1: ALTER TABLE partitioning_test DETACH PARTITION partitioning_...
^
-- see DETACHed partitions content is not accessible from partitioning_test;
SELECT * FROM partitioning_test WHERE time >= '2009-01-01' AND time < '2010-01-01' ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test WHERE time >= '2009-01-01' A...
^
--
-- Transaction tests
--
-- DDL in transaction
BEGIN;
ALTER TABLE partitioning_test ADD newer_column int;
ERROR: relation "partitioning_test" does not exist
-- see additional column is created
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test'::regclass ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
-- see rollback is successful
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test'::regclass ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT name, type FROM table_attrs WHERE relid = 'partitioni...
^
-- COPY in transaction
BEGIN;
COPY partitioning_test FROM STDIN WITH CSV;
ERROR: relation "partitioning_test" does not exist
22,2010-01-01,22
23,2011-01-01,23
24,2013-01-01,24
\.
invalid command \.
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id = 22 ORDER BY 1;
ERROR: syntax error at or near "22"
LINE 1: 22,2010-01-01,22
^
SELECT * FROM partitioning_test WHERE id = 23 ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
SELECT * FROM partitioning_test WHERE id = 24 ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
-- see rollback is successful
SELECT * FROM partitioning_test WHERE id >= 22 ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test WHERE id >= 22 ORDER BY 1;
^
-- DML in transaction
BEGIN;
-- INSERT in transaction
INSERT INTO partitioning_test VALUES(25, '2010-02-02');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES(25, '2010-02-02');
^
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id = 25 ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
-- INSERT/SELECT in transaction
INSERT INTO partitioning_test SELECT * FROM partitioning_test WHERE id = 25;
ERROR: current transaction is aborted, commands ignored until end of transaction block
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id = 25 ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
-- UPDATE in transaction
UPDATE partitioning_test SET time = '2010-10-10' WHERE id = 25;
ERROR: current transaction is aborted, commands ignored until end of transaction block
-- see the data is updated
SELECT * FROM partitioning_test WHERE id = 25 ORDER BY 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
-- perform operations on partition and partioned tables together
INSERT INTO partitioning_test VALUES(26, '2010-02-02', 26);
ERROR: current transaction is aborted, commands ignored until end of transaction block
INSERT INTO partitioning_test_2010 VALUES(26, '2010-02-02', 26);
ERROR: current transaction is aborted, commands ignored until end of transaction block
COPY partitioning_test FROM STDIN WITH CSV;
ERROR: current transaction is aborted, commands ignored until end of transaction block
26,2010-02-02,26
\.
invalid command \.
COPY partitioning_test_2010 FROM STDIN WITH CSV;
ERROR: syntax error at or near "26"
LINE 1: 26,2010-02-02,26
^
26,2010-02-02,26
\.
invalid command \.
-- see the data is loaded to shards (we should see 4 rows with same content)
SELECT * FROM partitioning_test WHERE id = 26 ORDER BY 1;
ERROR: syntax error at or near "26"
LINE 1: 26,2010-02-02,26
^
ROLLBACK;
-- see rollback is successful
SELECT * FROM partitioning_test WHERE id = 26 ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test WHERE id = 26 ORDER BY 1;
^
-- DETACH and DROP in a transaction
BEGIN;
ALTER TABLE partitioning_test DETACH PARTITION partitioning_test_2011;
ERROR: syntax error at or near "DETACH"
LINE 1: ALTER TABLE partitioning_test DETACH PARTITION partitioning_...
^
DROP TABLE partitioning_test_2011;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
-- see DROPed partitions content is not accessible
SELECT * FROM partitioning_test WHERE time >= '2011-01-01' AND time < '2012-01-01' ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test WHERE time >= '2011-01-01' A...
^
--
-- Misc tests
--
-- test TRUNCATE
-- test TRUNCATE partition
TRUNCATE partitioning_test_2012;
-- see partition is TRUNCATEd
SELECT * FROM partitioning_test_2012 ORDER BY 1;
id | time
----+------
(0 rows)
-- test TRUNCATE partitioned table
TRUNCATE partitioning_test;
ERROR: relation "partitioning_test" does not exist
-- see partitioned table is TRUNCATEd
SELECT * FROM partitioning_test ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test ORDER BY 1;
^
-- test DROP
-- test DROP partition
INSERT INTO partitioning_test_2010 VALUES(27, '2010-02-01');
ERROR: relation "partitioning_test_2010" does not exist
LINE 1: INSERT INTO partitioning_test_2010 VALUES(27, '2010-02-01');
^
DROP TABLE partitioning_test_2010;
ERROR: table "partitioning_test_2010" does not exist
-- see DROPped partitions content is not accessible from partitioning_test;
SELECT * FROM partitioning_test WHERE time >= '2010-01-01' AND time < '2011-01-01' ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test WHERE time >= '2010-01-01' A...
^
-- test DROP partitioned table
DROP TABLE partitioning_test;
ERROR: table "partitioning_test" does not exist
-- dropping the parent should CASCADE to the children as well
SELECT table_name FROM information_schema.tables WHERE table_name LIKE 'partitioning_test%' ORDER BY 1;
table_name
------------------------
partitioning_test_2012
partitioning_test_2013
(2 rows)
-- test distributing partitioned table colocated with non-partitioned table
CREATE TABLE partitioned_users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint) PARTITION BY RANGE (time);
ERROR: syntax error at or near "PARTITION"
LINE 1: ... int, value_2 int, value_3 float, value_4 bigint) PARTITION ...
^
CREATE TABLE partitioned_events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint) PARTITION BY RANGE (time);
ERROR: syntax error at or near "PARTITION"
LINE 1: ... int, value_2 int, value_3 float, value_4 bigint) PARTITION ...
^
SELECT create_distributed_table('partitioned_users_table', 'user_id', colocate_with => 'users_table');
ERROR: relation "partitioned_users_table" does not exist
LINE 1: SELECT create_distributed_table('partitioned_users_table', '...
^
SELECT create_distributed_table('partitioned_events_table', 'user_id', colocate_with => 'events_table');
ERROR: relation "partitioned_events_table" does not exist
LINE 1: SELECT create_distributed_table('partitioned_events_table', ...
^
-- INSERT/SELECT from regular table to partitioned table
CREATE TABLE partitioned_users_table_2009 PARTITION OF partitioned_users_table FOR VALUES FROM ('2014-01-01') TO ('2015-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioned_users_table_2009 PARTITION OF parti...
^
CREATE TABLE partitioned_events_table_2009 PARTITION OF partitioned_events_table FOR VALUES FROM ('2014-01-01') TO ('2015-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioned_events_table_2009 PARTITION OF part...
^
INSERT INTO partitioned_events_table SELECT * FROM events_table;
ERROR: relation "partitioned_events_table" does not exist
LINE 1: INSERT INTO partitioned_events_table SELECT * FROM events_ta...
^
INSERT INTO partitioned_users_table_2009 SELECT * FROM users_table;
ERROR: relation "partitioned_users_table_2009" does not exist
LINE 1: INSERT INTO partitioned_users_table_2009 SELECT * FROM users...
^
--
-- Complex JOINs, subqueries, UNIONs etc...
--
-- subquery with UNIONs on partitioned table
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
(SELECT *, random()
FROM
(SELECT
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
(SELECT
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM(
(SELECT
"events"."user_id", "events"."time", 0 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15))
UNION
(SELECT
"events"."user_id", "events"."time", 1 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (15, 16, 17, 18, 19) )
UNION
(SELECT
"events"."user_id", "events"."time", 2 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) )
UNION
(SELECT
"events"."user_id", "events"."time", 3 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13))) t1
GROUP BY "t1"."user_id") AS t) "q"
) AS final_query
GROUP BY types
ORDER BY types;
ERROR: relation "partitioned_events_table" does not exist
LINE 14: partitioned_events_table as "events"
^
-- UNION and JOIN on both partitioned and regular tables
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
(SELECT
*, random()
FROM
(SELECT
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
(SELECT
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT
*
FROM
(SELECT
"events"."time", 0 AS event, "events"."user_id"
FROM
partitioned_events_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1)
UNION
(SELECT *
FROM
(
SELECT * FROM
(
SELECT
max("events"."time"),
0 AS event,
"events"."user_id"
FROM
events_table as "events", users_table as "users"
WHERE
events.user_id = users.user_id AND
event_type IN (10, 11, 12, 13, 14, 15)
GROUP BY "events"."user_id"
) as events_subquery_5
) events_subquery_2)
UNION
(SELECT *
FROM
(SELECT
"events"."time", 2 AS event, "events"."user_id"
FROM
partitioned_events_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3)
UNION
(SELECT *
FROM
(SELECT
"events"."time", 3 AS event, "events"."user_id"
FROM
events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)
) t1
GROUP BY "t1"."user_id") AS t) "q"
INNER JOIN
(SELECT
"users"."user_id"
FROM
partitioned_users_table as "users"
WHERE
value_1 > 50 and value_1 < 70) AS t
ON (t.user_id = q.user_id)) as final_query
GROUP BY
types
ORDER BY
types;
ERROR: relation "partitioned_events_table" does not exist
LINE 18: partitioned_events_table as "events"
^
-- test LIST partitioning
CREATE TABLE list_partitioned_events_table (user_id int, time date, event_type int, value_2 int, value_3 float, value_4 bigint) PARTITION BY LIST (time);
ERROR: syntax error at or near "PARTITION"
LINE 1: ... int, value_2 int, value_3 float, value_4 bigint) PARTITION ...
^
CREATE TABLE list_partitioned_events_table_2014_01_01_05 PARTITION OF list_partitioned_events_table FOR VALUES IN ('2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05');
ERROR: syntax error at or near "PARTITION"
LINE 1: ...TABLE list_partitioned_events_table_2014_01_01_05 PARTITION ...
^
CREATE TABLE list_partitioned_events_table_2014_01_06_10 PARTITION OF list_partitioned_events_table FOR VALUES IN ('2014-01-06', '2014-01-07', '2014-01-08', '2014-01-09', '2014-01-10');
ERROR: syntax error at or near "PARTITION"
LINE 1: ...TABLE list_partitioned_events_table_2014_01_06_10 PARTITION ...
^
CREATE TABLE list_partitioned_events_table_2014_01_11_15 PARTITION OF list_partitioned_events_table FOR VALUES IN ('2014-01-11', '2014-01-12', '2014-01-13', '2014-01-14', '2014-01-15');
ERROR: syntax error at or near "PARTITION"
LINE 1: ...TABLE list_partitioned_events_table_2014_01_11_15 PARTITION ...
^
-- test distributing partitioned table colocated with another partitioned table
SELECT create_distributed_table('list_partitioned_events_table', 'user_id', colocate_with => 'partitioned_events_table');
ERROR: relation "list_partitioned_events_table" does not exist
LINE 1: SELECT create_distributed_table('list_partitioned_events_tab...
^
-- INSERT/SELECT from partitioned table to partitioned table
INSERT INTO
list_partitioned_events_table
SELECT
user_id,
date_trunc('day', time) as time,
event_type,
value_2,
value_3,
value_4
FROM
events_table
WHERE
time >= '2014-01-01' AND
time <= '2014-01-15';
ERROR: relation "list_partitioned_events_table" does not exist
LINE 2: list_partitioned_events_table
^
-- LEFT JOINs used with INNER JOINs on range partitioned table, list partitioned table and non-partitioned table
SELECT
count(*) AS cnt, "generated_group_field"
FROM
(SELECT
"eventQuery"."user_id", random(), generated_group_field
FROM
(SELECT
"multi_group_wrapper_1".*, generated_group_field, random()
FROM
(SELECT *
FROM
(SELECT
"list_partitioned_events_table"."time", "list_partitioned_events_table"."user_id" as event_user_id
FROM
list_partitioned_events_table as "list_partitioned_events_table"
WHERE
user_id > 80) "temp_data_queries"
INNER JOIN
(SELECT
"users"."user_id"
FROM
partitioned_users_table as "users"
WHERE
user_id > 80 and value_2 = 5) "user_filters_1"
ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1"
LEFT JOIN
(SELECT
"users"."user_id" AS "user_id", value_2 AS "generated_group_field"
FROM
partitioned_users_table as "users") "left_group_by_1"
ON ("left_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery"
GROUP BY
"generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
ERROR: relation "list_partitioned_events_table" does not exist
LINE 15: list_partitioned_events_table as "list_partitio...
^
--
-- Additional partitioning features
--
-- test multi column partitioning
CREATE TABLE multi_column_partitioning(c1 int, c2 int) PARTITION BY RANGE (c1, c2);
ERROR: syntax error at or near "PARTITION"
LINE 1: ...E TABLE multi_column_partitioning(c1 int, c2 int) PARTITION ...
^
CREATE TABLE multi_column_partitioning_0_0_10_0 PARTITION OF multi_column_partitioning FOR VALUES FROM (0, 0) TO (10, 0);
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE multi_column_partitioning_0_0_10_0 PARTITION OF...
^
SELECT create_distributed_table('multi_column_partitioning', 'c1');
ERROR: relation "multi_column_partitioning" does not exist
LINE 1: SELECT create_distributed_table('multi_column_partitioning',...
^
-- test INSERT to multi-column partitioned table
INSERT INTO multi_column_partitioning VALUES(1, 1);
ERROR: relation "multi_column_partitioning" does not exist
LINE 1: INSERT INTO multi_column_partitioning VALUES(1, 1);
^
INSERT INTO multi_column_partitioning_0_0_10_0 VALUES(5, -5);
ERROR: relation "multi_column_partitioning_0_0_10_0" does not exist
LINE 1: INSERT INTO multi_column_partitioning_0_0_10_0 VALUES(5, -5)...
^
-- test INSERT to multi-column partitioned table where no suitable partition exists
INSERT INTO multi_column_partitioning VALUES(10, 1);
ERROR: relation "multi_column_partitioning" does not exist
LINE 1: INSERT INTO multi_column_partitioning VALUES(10, 1);
^
-- test with MINVALUE/MAXVALUE
CREATE TABLE multi_column_partitioning_10_max_20_min PARTITION OF multi_column_partitioning FOR VALUES FROM (10, MAXVALUE) TO (20, MINVALUE);
ERROR: syntax error at or near "PARTITION"
LINE 1: ...ATE TABLE multi_column_partitioning_10_max_20_min PARTITION ...
^
-- test INSERT to partition with MINVALUE/MAXVALUE bounds
INSERT INTO multi_column_partitioning VALUES(11, -11);
ERROR: relation "multi_column_partitioning" does not exist
LINE 1: INSERT INTO multi_column_partitioning VALUES(11, -11);
^
INSERT INTO multi_column_partitioning_10_max_20_min VALUES(19, -19);
ERROR: relation "multi_column_partitioning_10_max_20_min" does not exist
LINE 1: INSERT INTO multi_column_partitioning_10_max_20_min VALUES(1...
^
-- test INSERT to multi-column partitioned table where no suitable partition exists
INSERT INTO multi_column_partitioning VALUES(20, -20);
ERROR: relation "multi_column_partitioning" does not exist
LINE 1: INSERT INTO multi_column_partitioning VALUES(20, -20);
^
-- see data is loaded to multi-column partitioned table
SELECT * FROM multi_column_partitioning;
ERROR: relation "multi_column_partitioning" does not exist
LINE 1: SELECT * FROM multi_column_partitioning;
^
DROP TABLE IF EXISTS partitioning_test_2012, partitioning_test_2013, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning;
NOTICE: table "partitioned_events_table" does not exist, skipping
NOTICE: table "partitioned_users_table" does not exist, skipping
NOTICE: table "list_partitioned_events_table" does not exist, skipping
NOTICE: table "multi_column_partitioning" does not exist, skipping

View File

@ -1393,7 +1393,7 @@ ALTER TABLE reference_table_ddl RENAME TO reference_table_ddl_test;
ERROR: renaming distributed tables is currently unsupported
ALTER TABLE reference_table_ddl SET WITH OIDS;
ERROR: alter table command is currently unsupported
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported.
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
-- now test reference tables against some helper UDFs that Citus provides
-- cannot delete / drop shards from a reference table
SELECT master_apply_delete_command('DELETE FROM reference_table_ddl');

View File

@ -23,11 +23,6 @@ test: multi_table_ddl
test: multi_name_lengths
test: multi_metadata_access
# ---
# Tests for partitioning support
# ---
test: multi_partitioning_utils
# ----------
# The following distributed tests depend on creating a partitioned table and
# uploading data to it.
@ -41,6 +36,12 @@ test: multi_behavioral_analytics_create_table
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries
test: multi_insert_select
# ---
# Tests for partitioning support
# ---
test: multi_partitioning_utils
test: multi_partitioning
# ----------
# Miscellaneous tests to check our query planning behavior
# ----------

View File

@ -332,7 +332,7 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.lineite
ALTER TABLE lineitem_alter ADD COLUMN int_column3 INTEGER,
ALTER COLUMN int_column1 SET STATISTICS 10;
ERROR: alter table command is currently unsupported
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported.
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
ALTER TABLE lineitem_alter DROP COLUMN int_column1, DROP COLUMN int_column2;
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.lineitem_alter'::regclass;
Column | Type | Modifiers
@ -364,12 +364,12 @@ ERROR: cannot execute ALTER TABLE command involving partition column
-- Verify that we error out on unsupported statement types
ALTER TABLE lineitem_alter ALTER COLUMN l_orderkey SET STATISTICS 100;
ERROR: alter table command is currently unsupported
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported.
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
ALTER TABLE lineitem_alter DROP CONSTRAINT IF EXISTS non_existent_contraint;
NOTICE: constraint "non_existent_contraint" of relation "lineitem_alter" does not exist, skipping
ALTER TABLE lineitem_alter SET WITHOUT OIDS;
ERROR: alter table command is currently unsupported
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT and TYPE subcommands are supported.
DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CONSTRAINT, ATTACH|DETACH PARTITION and TYPE subcommands are supported.
-- Verify that we error out in case of postgres errors on supported statement
-- types
ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type;

View File

@ -0,0 +1,665 @@
--
-- Distributed Partitioned Table Tests
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
--
-- Distributed Partitioned Table Creation Tests
--
-- 1-) Distributing partitioned table
-- create partitioned table
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
-- distribute partitioned table
SELECT create_distributed_table('partitioning_test', 'id');
-- see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
-- see partitioned table and its partitions are distributed
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
ORDER BY 1;
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
GROUP BY
logicalrelid
ORDER BY
1,2;
-- 2-) Creating partition of a distributed table
CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01');
-- new partition is automatically distributed as well
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
ORDER BY 1;
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2011')
GROUP BY
logicalrelid
ORDER BY
1,2;
-- 3-) Attaching non distributed table to a distributed table
CREATE TABLE partitioning_test_2012(id int, time date);
-- load some data
INSERT INTO partitioning_test_2012 VALUES (5, '2012-06-06');
INSERT INTO partitioning_test_2012 VALUES (6, '2012-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01');
-- attached partition is distributed as well
SELECT
logicalrelid
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
ORDER BY 1;
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2012')
GROUP BY
logicalrelid
ORDER BY
1,2;
-- see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
-- 4-) Attaching distributed table to distributed table
CREATE TABLE partitioning_test_2013(id int, time date);
SELECT create_distributed_table('partitioning_test_2013', 'id');
-- load some data
INSERT INTO partitioning_test_2013 VALUES (7, '2013-06-06');
INSERT INTO partitioning_test_2013 VALUES (8, '2013-07-07');
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2013 FOR VALUES FROM ('2013-01-01') TO ('2014-01-01');
-- see the data is loaded to shards
SELECT * FROM partitioning_test ORDER BY 1;
-- 5-) Failure cases while creating distributed partitioned tables
-- cannot distribute a partition if its parent is not distributed
CREATE TABLE partitioning_test_failure(id int, time date) PARTITION BY RANGE (time);
CREATE TABLE partitioning_test_failure_2009 PARTITION OF partitioning_test_failure FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
SELECT create_distributed_table('partitioning_test_failure_2009', 'id');
-- only hash distributed tables can have partitions
SELECT create_distributed_table('partitioning_test_failure', 'id', 'append');
SELECT create_distributed_table('partitioning_test_failure', 'id', 'range');
SELECT create_reference_table('partitioning_test_failure');
-- replication factor > 1 is not allowed in distributed partitioned tables
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('partitioning_test_failure', 'id');
SET citus.shard_replication_factor TO 1;
-- non-distributed tables cannot have distributed partitions;
DROP TABLE partitioning_test_failure_2009;
CREATE TABLE partitioning_test_failure_2009(id int, time date);
SELECT create_distributed_table('partitioning_test_failure_2009', 'id');
ALTER TABLE partitioning_test_failure ATTACH PARTITION partitioning_test_failure_2009 FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
-- multi-level partitioning is not allowed
DROP TABLE partitioning_test_failure_2009;
CREATE TABLE partitioning_test_failure_2009 PARTITION OF partitioning_test_failure FOR VALUES FROM ('2009-01-01') TO ('2010-01-01') PARTITION BY RANGE (time);
SELECT create_distributed_table('partitioning_test_failure', 'id');
-- multi-level partitioning is not allowed in different order
DROP TABLE partitioning_test_failure_2009;
SELECT create_distributed_table('partitioning_test_failure', 'id');
CREATE TABLE partitioning_test_failure_2009 PARTITION OF partitioning_test_failure FOR VALUES FROM ('2009-01-01') TO ('2010-01-01') PARTITION BY RANGE (time);
--
-- DMLs in distributed partitioned tables
--
-- test COPY
-- COPY data to partitioned table
COPY partitioning_test FROM STDIN WITH CSV;
9,2009-01-01
10,2010-01-01
11,2011-01-01
12,2012-01-01
\.
-- COPY data to partition directly
COPY partitioning_test_2009 FROM STDIN WITH CSV;
13,2009-01-02
14,2009-01-03
\.
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id >= 9 ORDER BY 1;
-- test INSERT
-- INSERT INTO the partitioned table
INSERT INTO partitioning_test VALUES(15, '2009-02-01');
INSERT INTO partitioning_test VALUES(16, '2010-02-01');
INSERT INTO partitioning_test VALUES(17, '2011-02-01');
INSERT INTO partitioning_test VALUES(18, '2012-02-01');
-- INSERT INTO the partitions directly table
INSERT INTO partitioning_test VALUES(19, '2009-02-02');
INSERT INTO partitioning_test VALUES(20, '2010-02-02');
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id >= 15 ORDER BY 1;
-- test INSERT/SELECT
-- INSERT/SELECT from partition to partitioned table
INSERT INTO partitioning_test SELECT * FROM partitioning_test_2011;
-- INSERT/SELECT from partitioned table to partition
INSERT INTO partitioning_test_2012 SELECT * FROM partitioning_test WHERE time >= '2012-01-01' AND time < '2013-01-01';
-- see the data is loaded to shards (rows in the given range should be duplicated)
SELECT * FROM partitioning_test WHERE time >= '2011-01-01' AND time < '2013-01-01' ORDER BY 1;
-- test UPDATE
-- UPDATE partitioned table
UPDATE partitioning_test SET time = '2013-07-07' WHERE id = 7;
-- UPDATE partition directly
UPDATE partitioning_test_2013 SET time = '2013-08-08' WHERE id = 8;
-- see the data is updated
SELECT * FROM partitioning_test WHERE id = 7 OR id = 8 ORDER BY 1;
-- UPDATE that tries to move a row to a non-existing partition (this should fail)
UPDATE partitioning_test SET time = '2020-07-07' WHERE id = 7;
-- UPDATE with subqueries on partitioned table
UPDATE
partitioning_test
SET
time = time + INTERVAL '1 day'
WHERE
id IN (SELECT id FROM partitioning_test WHERE id = 1);
-- UPDATE with subqueries on partition
UPDATE
partitioning_test_2009
SET
time = time + INTERVAL '1 month'
WHERE
id IN (SELECT id FROM partitioning_test WHERE id = 2);
-- see the data is updated
SELECT * FROM partitioning_test WHERE id = 1 OR id = 2 ORDER BY 1;
-- test DELETE
-- DELETE from partitioned table
DELETE FROM partitioning_test WHERE id = 9;
-- DELETE from partition directly
DELETE FROM partitioning_test_2010 WHERE id = 10;
-- see the data is deleted
SELECT * FROM partitioning_test WHERE id = 9 OR id = 10 ORDER BY 1;
-- test master_modify_multiple_shards
-- master_modify_multiple_shards on partitioned table
SELECT master_modify_multiple_shards('UPDATE partitioning_test SET time = time + INTERVAL ''1 day''');
-- see rows are UPDATED
SELECT * FROM partitioning_test ORDER BY 1;
-- master_modify_multiple_shards on partition directly
SELECT master_modify_multiple_shards('UPDATE partitioning_test_2009 SET time = time + INTERVAL ''1 day''');
-- see rows are UPDATED
SELECT * FROM partitioning_test_2009 ORDER BY 1;
-- test master_modify_multiple_shards which fails in workers (updated value is outside of partition bounds)
SELECT master_modify_multiple_shards('UPDATE partitioning_test_2009 SET time = time + INTERVAL ''6 month''');
--
-- DDL in distributed partitioned tables
--
-- test CREATE INDEX
-- CREATE INDEX on partitioned table - this will error out
CREATE INDEX partitioning_index ON partitioning_test(id);
-- CREATE INDEX on partition
CREATE INDEX partitioning_2009_index ON partitioning_test_2009(id);
-- CREATE INDEX CONCURRENTLY on partition
CREATE INDEX CONCURRENTLY partitioned_2010_index ON partitioning_test_2010(id);
-- see index is created
SELECT tablename, indexname FROM pg_indexes WHERE tablename LIKE 'partitioning_test%' ORDER BY indexname;
-- test add COLUMN
-- add COLUMN to partitioned table
ALTER TABLE partitioning_test ADD new_column int;
-- add COLUMN to partition - this will error out
ALTER TABLE partitioning_test_2010 ADD new_column_2 int;
-- see additional column is created
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test'::regclass ORDER BY 1;
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test_2010'::regclass ORDER BY 1;
-- test add PRIMARY KEY
-- add PRIMARY KEY to partitioned table - this will error out
ALTER TABLE partitioning_test ADD CONSTRAINT partitioning_primary PRIMARY KEY (id);
-- ADD PRIMARY KEY to partition
ALTER TABLE partitioning_test_2009 ADD CONSTRAINT partitioning_2009_primary PRIMARY KEY (id);
-- see PRIMARY KEY is created
SELECT
table_name,
constraint_name,
constraint_type
FROM
information_schema.table_constraints
WHERE
table_name = 'partitioning_test_2009' AND
constraint_name = 'partitioning_2009_primary';
-- test ADD FOREIGN CONSTRAINT
-- add FOREIGN CONSTRAINT to partitioned table -- this will error out
ALTER TABLE partitioning_test ADD CONSTRAINT partitioning_foreign FOREIGN KEY (id) REFERENCES partitioning_test_2009 (id);
-- add FOREIGN CONSTRAINT to partition
INSERT INTO partitioning_test_2009 VALUES (5, '2009-06-06');
INSERT INTO partitioning_test_2009 VALUES (6, '2009-07-07');
INSERT INTO partitioning_test_2009 VALUES(12, '2009-02-01');
INSERT INTO partitioning_test_2009 VALUES(18, '2009-02-01');
ALTER TABLE partitioning_test_2012 ADD CONSTRAINT partitioning_2012_foreign FOREIGN KEY (id) REFERENCES partitioning_test_2009 (id) ON DELETE CASCADE;
-- see FOREIGN KEY is created
SELECT "Constraint" FROM table_fkeys WHERE relid = 'partitioning_test_2012'::regclass ORDER BY 1;
-- test ON DELETE CASCADE works
DELETE FROM partitioning_test_2009 WHERE id = 5;
-- see that element is deleted from both partitions
SELECT * FROM partitioning_test_2009 WHERE id = 5 ORDER BY 1;
SELECT * FROM partitioning_test_2012 WHERE id = 5 ORDER BY 1;
-- test DETACH partition
ALTER TABLE partitioning_test DETACH PARTITION partitioning_test_2009;
-- see DETACHed partitions content is not accessible from partitioning_test;
SELECT * FROM partitioning_test WHERE time >= '2009-01-01' AND time < '2010-01-01' ORDER BY 1;
--
-- Transaction tests
--
-- DDL in transaction
BEGIN;
ALTER TABLE partitioning_test ADD newer_column int;
-- see additional column is created
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test'::regclass ORDER BY 1;
ROLLBACK;
-- see rollback is successful
SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test'::regclass ORDER BY 1;
-- COPY in transaction
BEGIN;
COPY partitioning_test FROM STDIN WITH CSV;
22,2010-01-01,22
23,2011-01-01,23
24,2013-01-01,24
\.
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id = 22 ORDER BY 1;
SELECT * FROM partitioning_test WHERE id = 23 ORDER BY 1;
SELECT * FROM partitioning_test WHERE id = 24 ORDER BY 1;
ROLLBACK;
-- see rollback is successful
SELECT * FROM partitioning_test WHERE id >= 22 ORDER BY 1;
-- DML in transaction
BEGIN;
-- INSERT in transaction
INSERT INTO partitioning_test VALUES(25, '2010-02-02');
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id = 25 ORDER BY 1;
-- INSERT/SELECT in transaction
INSERT INTO partitioning_test SELECT * FROM partitioning_test WHERE id = 25;
-- see the data is loaded to shards
SELECT * FROM partitioning_test WHERE id = 25 ORDER BY 1;
-- UPDATE in transaction
UPDATE partitioning_test SET time = '2010-10-10' WHERE id = 25;
-- see the data is updated
SELECT * FROM partitioning_test WHERE id = 25 ORDER BY 1;
-- perform operations on partition and partioned tables together
INSERT INTO partitioning_test VALUES(26, '2010-02-02', 26);
INSERT INTO partitioning_test_2010 VALUES(26, '2010-02-02', 26);
COPY partitioning_test FROM STDIN WITH CSV;
26,2010-02-02,26
\.
COPY partitioning_test_2010 FROM STDIN WITH CSV;
26,2010-02-02,26
\.
-- see the data is loaded to shards (we should see 4 rows with same content)
SELECT * FROM partitioning_test WHERE id = 26 ORDER BY 1;
ROLLBACK;
-- see rollback is successful
SELECT * FROM partitioning_test WHERE id = 26 ORDER BY 1;
-- DETACH and DROP in a transaction
BEGIN;
ALTER TABLE partitioning_test DETACH PARTITION partitioning_test_2011;
DROP TABLE partitioning_test_2011;
COMMIT;
-- see DROPed partitions content is not accessible
SELECT * FROM partitioning_test WHERE time >= '2011-01-01' AND time < '2012-01-01' ORDER BY 1;
--
-- Misc tests
--
-- test TRUNCATE
-- test TRUNCATE partition
TRUNCATE partitioning_test_2012;
-- see partition is TRUNCATEd
SELECT * FROM partitioning_test_2012 ORDER BY 1;
-- test TRUNCATE partitioned table
TRUNCATE partitioning_test;
-- see partitioned table is TRUNCATEd
SELECT * FROM partitioning_test ORDER BY 1;
-- test DROP
-- test DROP partition
INSERT INTO partitioning_test_2010 VALUES(27, '2010-02-01');
DROP TABLE partitioning_test_2010;
-- see DROPped partitions content is not accessible from partitioning_test;
SELECT * FROM partitioning_test WHERE time >= '2010-01-01' AND time < '2011-01-01' ORDER BY 1;
-- test DROP partitioned table
DROP TABLE partitioning_test;
-- dropping the parent should CASCADE to the children as well
SELECT table_name FROM information_schema.tables WHERE table_name LIKE 'partitioning_test%' ORDER BY 1;
-- test distributing partitioned table colocated with non-partitioned table
CREATE TABLE partitioned_users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint) PARTITION BY RANGE (time);
CREATE TABLE partitioned_events_table (user_id int, time timestamp, event_type int, value_2 int, value_3 float, value_4 bigint) PARTITION BY RANGE (time);
SELECT create_distributed_table('partitioned_users_table', 'user_id', colocate_with => 'users_table');
SELECT create_distributed_table('partitioned_events_table', 'user_id', colocate_with => 'events_table');
-- INSERT/SELECT from regular table to partitioned table
CREATE TABLE partitioned_users_table_2009 PARTITION OF partitioned_users_table FOR VALUES FROM ('2014-01-01') TO ('2015-01-01');
CREATE TABLE partitioned_events_table_2009 PARTITION OF partitioned_events_table FOR VALUES FROM ('2014-01-01') TO ('2015-01-01');
INSERT INTO partitioned_events_table SELECT * FROM events_table;
INSERT INTO partitioned_users_table_2009 SELECT * FROM users_table;
--
-- Complex JOINs, subqueries, UNIONs etc...
--
-- subquery with UNIONs on partitioned table
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
(SELECT *, random()
FROM
(SELECT
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
(SELECT
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM(
(SELECT
"events"."user_id", "events"."time", 0 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15))
UNION
(SELECT
"events"."user_id", "events"."time", 1 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (15, 16, 17, 18, 19) )
UNION
(SELECT
"events"."user_id", "events"."time", 2 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) )
UNION
(SELECT
"events"."user_id", "events"."time", 3 AS event
FROM
partitioned_events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13))) t1
GROUP BY "t1"."user_id") AS t) "q"
) AS final_query
GROUP BY types
ORDER BY types;
-- UNION and JOIN on both partitioned and regular tables
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
(SELECT
*, random()
FROM
(SELECT
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
(SELECT
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT
*
FROM
(SELECT
"events"."time", 0 AS event, "events"."user_id"
FROM
partitioned_events_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1)
UNION
(SELECT *
FROM
(
SELECT * FROM
(
SELECT
max("events"."time"),
0 AS event,
"events"."user_id"
FROM
events_table as "events", users_table as "users"
WHERE
events.user_id = users.user_id AND
event_type IN (10, 11, 12, 13, 14, 15)
GROUP BY "events"."user_id"
) as events_subquery_5
) events_subquery_2)
UNION
(SELECT *
FROM
(SELECT
"events"."time", 2 AS event, "events"."user_id"
FROM
partitioned_events_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3)
UNION
(SELECT *
FROM
(SELECT
"events"."time", 3 AS event, "events"."user_id"
FROM
events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)
) t1
GROUP BY "t1"."user_id") AS t) "q"
INNER JOIN
(SELECT
"users"."user_id"
FROM
partitioned_users_table as "users"
WHERE
value_1 > 50 and value_1 < 70) AS t
ON (t.user_id = q.user_id)) as final_query
GROUP BY
types
ORDER BY
types;
-- test LIST partitioning
CREATE TABLE list_partitioned_events_table (user_id int, time date, event_type int, value_2 int, value_3 float, value_4 bigint) PARTITION BY LIST (time);
CREATE TABLE list_partitioned_events_table_2014_01_01_05 PARTITION OF list_partitioned_events_table FOR VALUES IN ('2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05');
CREATE TABLE list_partitioned_events_table_2014_01_06_10 PARTITION OF list_partitioned_events_table FOR VALUES IN ('2014-01-06', '2014-01-07', '2014-01-08', '2014-01-09', '2014-01-10');
CREATE TABLE list_partitioned_events_table_2014_01_11_15 PARTITION OF list_partitioned_events_table FOR VALUES IN ('2014-01-11', '2014-01-12', '2014-01-13', '2014-01-14', '2014-01-15');
-- test distributing partitioned table colocated with another partitioned table
SELECT create_distributed_table('list_partitioned_events_table', 'user_id', colocate_with => 'partitioned_events_table');
-- INSERT/SELECT from partitioned table to partitioned table
INSERT INTO
list_partitioned_events_table
SELECT
user_id,
date_trunc('day', time) as time,
event_type,
value_2,
value_3,
value_4
FROM
events_table
WHERE
time >= '2014-01-01' AND
time <= '2014-01-15';
-- LEFT JOINs used with INNER JOINs on range partitioned table, list partitioned table and non-partitioned table
SELECT
count(*) AS cnt, "generated_group_field"
FROM
(SELECT
"eventQuery"."user_id", random(), generated_group_field
FROM
(SELECT
"multi_group_wrapper_1".*, generated_group_field, random()
FROM
(SELECT *
FROM
(SELECT
"list_partitioned_events_table"."time", "list_partitioned_events_table"."user_id" as event_user_id
FROM
list_partitioned_events_table as "list_partitioned_events_table"
WHERE
user_id > 80) "temp_data_queries"
INNER JOIN
(SELECT
"users"."user_id"
FROM
partitioned_users_table as "users"
WHERE
user_id > 80 and value_2 = 5) "user_filters_1"
ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1"
LEFT JOIN
(SELECT
"users"."user_id" AS "user_id", value_2 AS "generated_group_field"
FROM
partitioned_users_table as "users") "left_group_by_1"
ON ("left_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery"
GROUP BY
"generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
--
-- Additional partitioning features
--
-- test multi column partitioning
CREATE TABLE multi_column_partitioning(c1 int, c2 int) PARTITION BY RANGE (c1, c2);
CREATE TABLE multi_column_partitioning_0_0_10_0 PARTITION OF multi_column_partitioning FOR VALUES FROM (0, 0) TO (10, 0);
SELECT create_distributed_table('multi_column_partitioning', 'c1');
-- test INSERT to multi-column partitioned table
INSERT INTO multi_column_partitioning VALUES(1, 1);
INSERT INTO multi_column_partitioning_0_0_10_0 VALUES(5, -5);
-- test INSERT to multi-column partitioned table where no suitable partition exists
INSERT INTO multi_column_partitioning VALUES(10, 1);
-- test with MINVALUE/MAXVALUE
CREATE TABLE multi_column_partitioning_10_max_20_min PARTITION OF multi_column_partitioning FOR VALUES FROM (10, MAXVALUE) TO (20, MINVALUE);
-- test INSERT to partition with MINVALUE/MAXVALUE bounds
INSERT INTO multi_column_partitioning VALUES(11, -11);
INSERT INTO multi_column_partitioning_10_max_20_min VALUES(19, -19);
-- test INSERT to multi-column partitioned table where no suitable partition exists
INSERT INTO multi_column_partitioning VALUES(20, -20);
-- see data is loaded to multi-column partitioned table
SELECT * FROM multi_column_partitioning;
DROP TABLE IF EXISTS partitioning_test_2012, partitioning_test_2013, partitioned_events_table, partitioned_users_table, list_partitioned_events_table, multi_column_partitioning;