pull/1467/merge
Önder Kalacı 2017-07-14 11:30:08 +00:00 committed by GitHub
commit e9f2211805
14 changed files with 855 additions and 67 deletions

View File

@ -29,6 +29,7 @@
#include "commands/defrem.h"
#include "commands/extension.h"
#include "commands/trigger.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/distribution_column.h"
#include "distributed/master_metadata_utility.h"
@ -38,6 +39,7 @@
#include "distributed/multi_copy.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_utility.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/reference_table_utils.h"
@ -76,11 +78,9 @@ static char LookupDistributionMethod(Oid distributionMethodOid);
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber);
static bool LocalTableEmpty(Oid tableId);
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
char *colocateWithTableName,
int shardCount, int replicationFactor);
static Oid ColumnType(Oid relationId, char *columnName);
static void CopyLocalDataIntoShards(Oid relationId);
static void CopyLocalDataIntoShards(Oid destinationDistributedRelationId, List *
sourceLocalRelationList);
static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
#if (PG_VERSION_NUM >= 100000)
static bool RelationUsesIdentityColumns(TupleDesc relationDesc);
@ -280,7 +280,7 @@ CreateReferenceTable(Oid relationId)
/* copy over data for regular relations */
if (relationKind == RELKIND_RELATION)
{
CopyLocalDataIntoShards(relationId);
CopyLocalDataIntoShards(relationId, list_make1_oid(relationId));
}
}
@ -339,9 +339,9 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
"(OIDS) option in their definitions.")));
}
/* verify target relation is either regular or foreign table */
relationKind = relation->rd_rel->relkind;
if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE)
if (!SupportedRelationKind(relation))
{
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot distribute relation: %s",
@ -351,13 +351,6 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
}
#if (PG_VERSION_NUM >= 100000)
if (relation->rd_rel->relispartition)
{
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot distribute relation: %s", relationName),
errdetail("Distributing partition tables is unsupported.")));
}
if (RelationUsesIdentityColumns(relationDesc))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -367,6 +360,20 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
}
#endif
/* partitions cannot be distributed unless thier parent is already partitioned */
if (PartitionTable(relationId) && !IsDistributedTable(PartitionParentOid(relationId)))
{
char *parentRelationName = get_rel_name(PartitionParentOid(relationId));
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errmsg("cannot distributed relation \"%s\" which is partition"
" of \"%s\"", relationName, parentRelationName),
errdetail("Citus does not support partitioning among local "
"tables and distributed tables"),
errhint("First distribute the partitioned table \"%s\"",
parentRelationName)));
}
/* check that table is empty if that is required */
if (requireEmpty && !LocalTableEmpty(relationId))
{
@ -608,7 +615,7 @@ CreateTruncateTrigger(Oid relationId)
/*
* CreateHashDistributedTable creates a hash distributed table.
*/
static void
void
CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
char *colocateWithTableName, int shardCount,
int replicationFactor)
@ -664,7 +671,11 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
/* relax empty table requirement for regular (non-foreign) tables */
relationKind = get_rel_relkind(relationId);
#if (PG_VERSION_NUM >= 100000)
if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE)
#else
if (relationKind == RELKIND_RELATION)
#endif
{
requireEmpty = false;
}
@ -688,10 +699,53 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor);
}
/* copy over data for regular relations */
if (relationKind == RELKIND_RELATION)
/*
* Copy over data for regular relations. Note that here we skip the partitions
* and copy them via the partitioned table below.
*/
if (relationKind == RELKIND_RELATION && !PartitionTable(relationId))
{
CopyLocalDataIntoShards(relationId);
CopyLocalDataIntoShards(relationId, list_make1_oid(relationId));
}
/* we also need to check for partitioned tables */
if (PartitionedTable(relationId))
{
List *partitionList = PartitionList(relationId);
char *parentRelationName = get_rel_name(relationId);
ListCell *partitionCell = NULL;
/* partitioned distributed tables are only supported with replication */
if (replicationFactor != 1)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot distribute relation \"%s\"",
parentRelationName),
errdetail("Relation \"%s\" has partitions and "
"distributing partitioned tables "
"with shard replication factor greater "
"than 1 is not supported.", parentRelationName)));
}
/*
* For each partition, create a co-located hash distributed table. Note that
* we pick the parent tables distribution key as the distribution key for
* the partitions.
*/
foreach(partitionCell, partitionList)
{
Oid partitionOid = lfirst_oid(partitionCell);
CreateHashDistributedTable(partitionOid, distributionColumnName,
parentRelationName, shardCount,
replicationFactor);
}
/*
* After converting all the partition tables into distributed tables, now lets
* copy the existing data from the local partitions.
*/
CopyLocalDataIntoShards(relationId, partitionList);
}
heap_close(pgDistColocation, NoLock);
@ -742,7 +796,7 @@ EnsureReplicationSettings(Oid relationId, char replicationModel)
/*
* CopyLocalDataIntoShards copies data from the local table, which is hidden
* CopyLocalDataIntoShards copies data from list of local tables, which are hidden
* after converting it to a distributed table, into the shards of the distributed
* table.
*
@ -758,33 +812,68 @@ EnsureReplicationSettings(Oid relationId, char replicationModel)
* want to read from the local table. To keep it simple, we perform a heap scan
* directly on the table.
*
* Any writes on the table that are started during this operation will be handled
* as distributed queries once the current transaction commits. SELECTs will
* continue to read from the local table until the current transaction commits,
* after which new SELECTs will be handled as distributed queries.
* Any writes on both the source tables and target table that are started
* during this operation will be handled as distributed queries once the
* current transaction commits. SELECTs will continue to read from the
* local table until the current transaction commits, after which new
* SELECTs will be handled as distributed queries.
*
* After copying local data into the distributed table, the local data remains
* in place and should be truncated at a later time.
*
* Note that it is allowed that destinationDistributedRelationId also appears on
* sourceOidList.
*/
static void
CopyLocalDataIntoShards(Oid distributedRelationId)
CopyLocalDataIntoShards(Oid destinationDistributedRelationId, List *sourceOidList)
{
DestReceiver *copyDest = NULL;
List *columnNameList = NIL;
Relation distributedRelation = NULL;
List *sourceRelationList = NULL;
TupleDesc tupleDescriptor = NULL;
bool stopOnFailure = true;
EState *estate = NULL;
HeapScanDesc scan = NULL;
HeapTuple tuple = NULL;
ExprContext *econtext = NULL;
MemoryContext oldContext = NULL;
TupleTableSlot *slot = NULL;
uint64 rowsCopied = 0;
ListCell *sourceOidCell = NULL;
ListCell *sourceRelationCell = NULL;
/* take an ExclusiveLock to block all operations except SELECT */
distributedRelation = heap_open(distributedRelationId, ExclusiveLock);
distributedRelation = heap_open(destinationDistributedRelationId, ExclusiveLock);
/* take ExclusiveLock on all source relations as well */
foreach(sourceOidCell, sourceOidList)
{
Oid sourceRelationId = lfirst_oid(sourceOidCell);
Relation localRelation = heap_open(sourceRelationId, ExclusiveLock);
/*
* This check currently cannot be exercised by any code path. However,
* it should be here as a precaution in case the support is expanded.
*/
if (list_length(sourceOidList) > 1 &&
PartitionParentOid(sourceRelationId) != destinationDistributedRelationId)
{
NameData localRelationNameData = localRelation->rd_rel->relname;
char *localRelationName = pstrdup(NameStr(localRelationNameData));
NameData distributedRelationNameData = localRelation->rd_rel->relname;
char *distributedRelationName = pstrdup(NameStr(distributedRelationNameData));
ereport(ERROR, (errmsg("Local relation \"%s\" cannot be copied into "
"distributed relation \"%s\"", localRelationName,
distributedRelationName),
errdetail("Relations do not have partitioning hierarcy")));
}
sourceRelationList = lappend(sourceRelationList, localRelation);
}
/*
* All writes have finished, make sure that we can see them by using the
@ -808,15 +897,21 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
econtext->ecxt_scantuple = slot;
copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
(DestReceiver *) CreateCitusCopyDestReceiver(destinationDistributedRelationId,
columnNameList, estate,
stopOnFailure);
/* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor);
/* iterate on the local relations, copy each into the destination relation */
foreach(sourceRelationCell, sourceRelationList)
{
Relation localRelation = (Relation) lfirst(sourceRelationCell);
bool printNoticeMessage = true;
/* begin reading from local table */
scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL);
HeapScanDesc scan = heap_beginscan(localRelation, GetActiveSnapshot(), 0, NULL);
oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
@ -832,8 +927,11 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
/* make sure we roll back on cancellation */
CHECK_FOR_INTERRUPTS();
if (rowsCopied == 0)
if (printNoticeMessage)
{
/* we only want to print this once per relation */
printNoticeMessage = false;
ereport(NOTICE, (errmsg("Copying data from local table...")));
}
@ -845,16 +943,20 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
}
}
if (rowsCopied % 1000000 != 0)
{
ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied)));
}
MemoryContextSwitchTo(oldContext);
/* finish reading from the local table */
heap_endscan(scan);
/* keep the lock */
heap_close(localRelation, NoLock);
}
if (rowsCopied % 1000000 != 0)
{
ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied)));
}
/* finish writing into the shards */
copyDest->rShutdown(copyDest);

View File

@ -42,6 +42,7 @@
#include "distributed/metadata_sync.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
@ -512,6 +513,80 @@ multi_ProcessUtility(PlannedStmt *pstmt,
}
}
#if (PG_VERSION_NUM >= 100000)
if (IsA(parsetree, CreateStmt))
{
CreateStmt *createStatement = (CreateStmt *) parsetree;
/* if a partition is being created */
if (createStatement->inhRelations != NIL && createStatement->partbound != NULL)
{
RangeVar *parentRelation = linitial(createStatement->inhRelations);
char *parentSchemaName = parentRelation->schemaname ?
parentRelation->schemaname : "public";
Oid parentId = get_relname_relid(parentRelation->relname, get_namespace_oid(
parentSchemaName, false));
char *schemaName = createStatement->relation->schemaname ?
createStatement->relation->schemaname : "public";
Oid relationId = get_relname_relid(createStatement->relation->relname,
get_namespace_oid(schemaName, false));
/* if the table is being attached to a distribtued table, it should be distributed as well */
if (IsDistributedTable(parentId))
{
Var *parentPartitionKey = DistPartitionKey(parentId);
char *parentPartitionKeyStr =
get_relid_attribute_name(parentId,
parentPartitionKey->varattno);
CreateHashDistributedTable(relationId, parentPartitionKeyStr,
get_rel_name(parentId), 0, 0);
}
}
}
else if (IsA(parsetree, AlterTableStmt))
{
AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree;
List *commandList = alterTableStatement->cmds;
ListCell *commandCell = NULL;
foreach(commandCell, commandList)
{
AlterTableCmd *alterTableCommand = (AlterTableCmd *) lfirst(commandCell);
AlterTableType alterTableType = alterTableCommand->subtype;
if (alterTableType == AT_AttachPartition)
{
PartitionCmd *partitionCommand = (PartitionCmd *) alterTableCommand->def;
char *relationName = partitionCommand->name->relname;
char *schemaName = partitionCommand->name->schemaname ?
partitionCommand->name->schemaname : "public";
Oid relationId = get_relname_relid(relationName,
get_namespace_oid(schemaName, false));
Oid parentId = PartitionParentOid(relationId);
if (IsDistributedTable(parentId))
{
Var *parentPartitionKey = DistPartitionKey(parentId);
char *parentPartitionKeyStr =
get_relid_attribute_name(parentId,
parentPartitionKey->varattno);
CreateHashDistributedTable(relationId, parentPartitionKeyStr,
get_rel_name(parentId), 0, 0);
}
}
}
}
#endif
/* TODO: fold VACUUM's processing into the above block */
if (IsA(parsetree, VacuumStmt))
{
@ -1035,6 +1110,30 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
constraint->skip_validation = true;
}
}
#if (PG_VERSION_NUM >= 100000)
else if (alterTableType == AT_AttachPartition)
{
PartitionCmd *partitionCommand = (PartitionCmd *) command->def;
char *relationName = partitionCommand->name->relname;
char *schemaName = partitionCommand->name->schemaname ?
partitionCommand->name->schemaname : "public";
Oid relationId = get_relname_relid(relationName,
get_namespace_oid(schemaName, false));
Oid parentId = leftRelationId;
/*
* Do not generate tasks if relation is not distributed and the parent
* is distributed. Because, we'll manually convert the relation into
* distribtued relation and co-locate with its parent.
*/
if (!IsDistributedTable(relationId) && IsDistributedTable(parentId))
{
return NIL;
}
}
#endif
}
ddlJob = palloc0(sizeof(DDLJob));
@ -1783,6 +1882,9 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
case AT_DropConstraint:
case AT_EnableTrigAll:
case AT_DisableTrigAll:
#if (PG_VERSION_NUM >= 100000)
case AT_AttachPartition:
#endif
{
/*
* We will not perform any special check for ALTER TABLE DROP CONSTRAINT

View File

@ -31,6 +31,7 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/reference_table_utils.h"
@ -251,6 +252,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
List *targetTableForeignConstraintCommands = NIL;
ListCell *sourceShardCell = NULL;
bool includeSequenceDefaults = false;
char *alterTableAttachPartitionCommand = NULL;
/* make sure that tables are hash partitioned */
CheckHashPartitionedTable(targetRelationId);
@ -288,6 +290,13 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
targetTableDDLEvents = GetTableDDLEvents(targetRelationId, includeSequenceDefaults);
targetTableForeignConstraintCommands = GetTableForeignConstraintCommands(
targetRelationId);
if (PartitionTable(targetRelationId))
{
alterTableAttachPartitionCommand =
GenerateAlterTableAttachPartitionCommand(targetRelationId);
}
targetShardStorageType = ShardStorageType(targetRelationId);
foreach(sourceShardCell, sourceShardIntervalList)
@ -315,7 +324,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
sourceNodePort, sourceShardIndex, newShardId,
targetTableRelationOwner,
targetTableDDLEvents,
targetTableForeignConstraintCommands);
targetTableForeignConstraintCommands,
alterTableAttachPartitionCommand);
if (created)
{
const RelayFileState shardState = FILE_FINALIZED;

View File

@ -736,7 +736,11 @@ ShardStorageType(Oid relationId)
char shardStorageType = 0;
char relationType = get_rel_relkind(relationId);
#if (PG_VERSION_NUM >= 100000)
if (relationType == RELKIND_RELATION || relationType == RELKIND_PARTITIONED_TABLE)
#else
if (relationType == RELKIND_RELATION)
#endif
{
shardStorageType = SHARD_STORAGE_TABLE;
}

View File

@ -24,6 +24,9 @@
#include "commands/tablecmds.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#if (PG_VERSION_NUM >= 100000)
#include "catalog/partition.h"
#endif
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/multi_client_executor.h"
@ -31,6 +34,7 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/placement_connection.h"
@ -396,10 +400,12 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
relationId);
int shardIndex = -1; /* not used in this code path */
bool created = false;
char *alterTableAttachPartitionCommand = NULL;
created = WorkerCreateShard(relationId, nodeName, nodePort, shardIndex,
shardId, newPlacementOwner, ddlEventList,
foreignConstraintCommandList);
foreignConstraintCommandList,
alterTableAttachPartitionCommand);
if (created)
{
const RelayFileState shardState = FILE_FINALIZED;
@ -438,7 +444,8 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
bool
WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
int shardIndex, uint64 shardId, char *newShardOwner,
List *ddlCommandList, List *foreignConstraintCommandList)
List *ddlCommandList, List *foreignConstraintCommandList,
char *alterTableAttachPartitionCommand)
{
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
@ -531,6 +538,45 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
}
}
/*
* If the shard is created for a partition, send the command to create the
* partitioning hierarcy on the shard.
*/
if (alterTableAttachPartitionCommand != NULL)
{
Oid parentRelationId = PartitionParentOid(relationId);
uint64 correspondingParentShardId = InvalidOid;
StringInfo applyAttachPartitionCommand = makeStringInfo();
List *queryResultList = NIL;
Oid parentSchemaId = InvalidOid;
char *parentSchemaName = NULL;
char *escapedParentSchemaName = NULL;
char *escapedCommand = NULL;
Assert(PartitionTable(relationId));
parentSchemaId = get_rel_namespace(parentRelationId);
parentSchemaName = get_namespace_name(parentSchemaId);
escapedParentSchemaName = quote_literal_cstr(parentSchemaName);
escapedCommand = quote_literal_cstr(alterTableAttachPartitionCommand);
correspondingParentShardId = ColocatedShardIdInRelation(parentRelationId,
shardIndex);
appendStringInfo(applyAttachPartitionCommand,
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, correspondingParentShardId,
escapedParentSchemaName, shardId, escapedSchemaName,
escapedCommand);
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner,
applyAttachPartitionCommand);
if (queryResultList == NIL)
{
shardCreated = false;
}
}
return shardCreated;
}

View File

@ -59,7 +59,6 @@
static void AppendOptionListToString(StringInfo stringData, List *options);
static bool SupportedRelationKind(Relation relation);
static const char * convert_aclright_to_string(int aclright);
@ -494,7 +493,7 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults)
* SupportedRelationKind returns true if the given relation is supported as a
* distributed relation.
*/
static bool
bool
SupportedRelationKind(Relation relation)
{
char relationKind = relation->rd_rel->relkind;

View File

@ -197,6 +197,26 @@ PartitionList(Oid parentRelationId)
}
/*
* Wrapper around get_partition_parent
* *
* Note: Because this function assumes that the relation whose OID is passed
* as an argument will have precisely one parent, it should only be called
* when it is known that the relation is a partition.
*/
Oid
PartitionParentOid(Oid partitionOid)
{
Oid partitionParentOid = InvalidOid;
#if (PG_VERSION_NUM >= 100000)
partitionParentOid = get_partition_parent(partitionOid);
#endif
return partitionParentOid;
}
/*
* GenerateDetachPartitionCommand gets a partition table and returns
* "ALTER TABLE parent_table DETACH PARTITION partitionName" command.

View File

@ -34,6 +34,7 @@ extern char * pg_get_serverdef_string(Oid tableRelationId);
extern char * pg_get_sequencedef_string(Oid sequenceRelid);
extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId);
extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation);
extern bool SupportedRelationKind(Relation relation);
extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId);
extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid,
int64 shardid, StringInfo buffer);
@ -48,5 +49,9 @@ extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid,
extern char * generate_relation_name(Oid relid, List *namespaces);
extern char * generate_qualified_relation_name(Oid relid);
/* TODO: THIS SHOULD NOT BE HERE */
extern void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
char *colocateWithTableName,
int shardCount, int replicationFactor);
#endif /* CITUS_RULEUTILS_H */

View File

@ -116,7 +116,8 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId);
extern void CreateReferenceTableShard(Oid distributedTableId);
extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
int shardIndex, uint64 shardId, char *newShardOwner,
List *ddlCommandList, List *foreignConstraintCommadList);
List *ddlCommandList, List *foreignConstraintCommadList,
char *alterTableAttachPartitionCommand);
extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
extern void CheckHashPartitionedTable(Oid distributedTableId);
extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName,

View File

@ -16,6 +16,7 @@ extern bool PartitionTable(Oid relationId);
extern bool IsChildTable(Oid relationId);
extern bool IsParentTable(Oid relationId);
extern List * PartitionList(Oid parentRelationId);
extern Oid PartitionParentOid(Oid partitionOid);
extern char * GenerateDetachPartitionCommand(Oid partitionTableId);
extern char * GenerateAlterTableAttachPartitionCommand(Oid partitionTableId);
extern char * GeneratePartitioningInformation(Oid tableId);

View File

@ -0,0 +1,191 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000;
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
-- this should error out given that parent of the partition is not distributed
SELECT create_distributed_table('partitioning_test_2010', 'id');
ERROR: cannot distributed relation "partitioning_test_2010" which is partition of "partitioning_test"
DETAIL: Citus does not support partitioning among local tables and distributed tables
HINT: First distribute the partitioned table "partitioning_test"
-- this should suceed
SELECT create_distributed_table('partitioning_test', 'id');
NOTICE: Copying data from local table...
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
-- check the data
SELECT * FROM partitioning_test ORDER BY 1;
id | time
----+------------
1 | 06-06-2009
2 | 07-07-2010
3 | 09-09-2009
4 | 03-03-2010
(4 rows)
-- check the metadata
SELECT
*
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
ORDER BY 1;
logicalrelid | partmethod | partkey | colocationid | repmodel
------------------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
partitioning_test | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
partitioning_test_2009 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
partitioning_test_2010 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
(3 rows)
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | count
------------------------+-------
partitioning_test | 4
partitioning_test_2009 | 4
partitioning_test_2010 | 4
(3 rows)
SELECT
nodename, nodeport, count(*)
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') )
GROUP BY
nodename, nodeport
ORDER BY
1,2,3;
nodename | nodeport | count
-----------+----------+-------
localhost | 57637 | 6
localhost | 57638 | 6
(2 rows)
-- now create a partition and see that it also becomes a distributed table
CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01');
SELECT
*
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011')
ORDER BY 1;
logicalrelid | partmethod | partkey | colocationid | repmodel
------------------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
partitioning_test | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
partitioning_test_2009 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
partitioning_test_2010 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
partitioning_test_2011 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
(4 rows)
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011')
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | count
------------------------+-------
partitioning_test | 4
partitioning_test_2009 | 4
partitioning_test_2010 | 4
partitioning_test_2011 | 4
(4 rows)
SELECT
nodename, nodeport, count(*)
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') )
GROUP BY
nodename, nodeport
ORDER BY
1,2,3;
nodename | nodeport | count
-----------+----------+-------
localhost | 57637 | 8
localhost | 57638 | 8
(2 rows)
-- citus can also support ALTER TABLE .. ATTACH PARTITION
-- even if the partition is not distributed
CREATE TABLE partitioning_test_2012(id int, time date);
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01');
SELECT
*
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012')
ORDER BY 1;
logicalrelid | partmethod | partkey | colocationid | repmodel
------------------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
partitioning_test | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
partitioning_test_2009 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
partitioning_test_2010 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
partitioning_test_2011 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
partitioning_test_2012 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
(5 rows)
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012')
GROUP BY
logicalrelid
ORDER BY
1,2;
logicalrelid | count
------------------------+-------
partitioning_test | 4
partitioning_test_2009 | 4
partitioning_test_2010 | 4
partitioning_test_2011 | 4
partitioning_test_2012 | 4
(5 rows)
SELECT
nodename, nodeport, count(*)
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') )
GROUP BY
nodename, nodeport
ORDER BY
1,2,3;
nodename | nodeport | count
-----------+----------+-------
localhost | 57637 | 10
localhost | 57638 | 10
(2 rows)
-- dropping the parent should CASCADE to the children as well
DROP TABLE partitioning_test;
\d+ partitioning_test*
-- set the colocationid sequence back to 1 to make sure
-- that this file does not break other tests
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;

View File

@ -0,0 +1,177 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000;
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ...
^
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin...
^
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2010 PARTITION OF partitionin...
^
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES (1, '2009-06-06');
^
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
ERROR: relation "partitioning_test" does not exist
LINE 1: INSERT INTO partitioning_test VALUES (2, '2010-07-07');
^
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
ERROR: relation "partitioning_test_2009" does not exist
LINE 1: INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
^
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
ERROR: relation "partitioning_test_2010" does not exist
LINE 1: INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
^
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
-- this should error out given that parent of the partition is not distributed
SELECT create_distributed_table('partitioning_test_2010', 'id');
ERROR: relation "partitioning_test_2010" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test_2010', 'i...
^
-- this should suceed
SELECT create_distributed_table('partitioning_test', 'id');
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT create_distributed_table('partitioning_test', 'id');
^
-- check the data
SELECT * FROM partitioning_test ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 1: SELECT * FROM partitioning_test ORDER BY 1;
^
-- check the metadata
SELECT
*
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20...
^
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
GROUP BY
logicalrelid
ORDER BY
1,2;
ERROR: relation "partitioning_test" does not exist
LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t...
^
SELECT
nodename, nodeport, count(*)
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') )
GROUP BY
nodename, nodeport
ORDER BY
1,2,3;
ERROR: relation "partitioning_test" does not exist
LINE 6: ...shardid FROM pg_dist_shard WHERE logicalrelid IN ('partition...
^
-- now create a partition and see that it also becomes a distributed table
CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01');
ERROR: syntax error at or near "PARTITION"
LINE 1: CREATE TABLE partitioning_test_2011 PARTITION OF partitionin...
^
SELECT
*
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011')
ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20...
^
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011')
GROUP BY
logicalrelid
ORDER BY
1,2;
ERROR: relation "partitioning_test" does not exist
LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t...
^
SELECT
nodename, nodeport, count(*)
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') )
GROUP BY
nodename, nodeport
ORDER BY
1,2,3;
ERROR: relation "partitioning_test" does not exist
LINE 6: ...shardid FROM pg_dist_shard WHERE logicalrelid IN ('partition...
^
-- citus can also support ALTER TABLE .. ATTACH PARTITION
-- even if the partition is not distributed
CREATE TABLE partitioning_test_2012(id int, time date);
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01');
ERROR: syntax error at or near "ATTACH"
LINE 1: ALTER TABLE partitioning_test ATTACH PARTITION partitioning_...
^
SELECT
*
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012')
ORDER BY 1;
ERROR: relation "partitioning_test" does not exist
LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20...
^
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012')
GROUP BY
logicalrelid
ORDER BY
1,2;
ERROR: relation "partitioning_test" does not exist
LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t...
^
SELECT
nodename, nodeport, count(*)
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') )
GROUP BY
nodename, nodeport
ORDER BY
1,2,3;
ERROR: relation "partitioning_test" does not exist
LINE 6: ...shardid FROM pg_dist_shard WHERE logicalrelid IN ('partition...
^
-- dropping the parent should CASCADE to the children as well
DROP TABLE partitioning_test;
ERROR: table "partitioning_test" does not exist
\d+ partitioning_test*
Table "public.partitioning_test_2012"
Column | Type | Modifiers | Storage | Stats target | Description
--------+---------+-----------+---------+--------------+-------------
id | integer | | plain | |
time | date | | plain | |
-- set the colocationid sequence back to 1 to make sure
-- that this file does not break other tests
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;

View File

@ -26,7 +26,7 @@ test: multi_metadata_access
# ---
# Tests for partitioning support
# ---
test: multi_partitioning_utils
test: multi_partitioning_utils multi_partitioning
# ----------
# The following distributed tests depend on creating a partitioned table and

View File

@ -0,0 +1,130 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000;
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
-- create its partitions
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
-- this should error out given that parent of the partition is not distributed
SELECT create_distributed_table('partitioning_test_2010', 'id');
-- this should suceed
SELECT create_distributed_table('partitioning_test', 'id');
-- check the data
SELECT * FROM partitioning_test ORDER BY 1;
-- check the metadata
SELECT
*
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
ORDER BY 1;
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
GROUP BY
logicalrelid
ORDER BY
1,2;
SELECT
nodename, nodeport, count(*)
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') )
GROUP BY
nodename, nodeport
ORDER BY
1,2,3;
-- now create a partition and see that it also becomes a distributed table
CREATE TABLE partitioning_test_2011 PARTITION OF partitioning_test FOR VALUES FROM ('2011-01-01') TO ('2012-01-01');
SELECT
*
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011')
ORDER BY 1;
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011')
GROUP BY
logicalrelid
ORDER BY
1,2;
SELECT
nodename, nodeport, count(*)
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011') )
GROUP BY
nodename, nodeport
ORDER BY
1,2,3;
-- citus can also support ALTER TABLE .. ATTACH PARTITION
-- even if the partition is not distributed
CREATE TABLE partitioning_test_2012(id int, time date);
ALTER TABLE partitioning_test ATTACH PARTITION partitioning_test_2012 FOR VALUES FROM ('2012-01-01') TO ('2013-01-01');
SELECT
*
FROM
pg_dist_partition
WHERE
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012')
ORDER BY 1;
SELECT
logicalrelid, count(*)
FROM pg_dist_shard
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012')
GROUP BY
logicalrelid
ORDER BY
1,2;
SELECT
nodename, nodeport, count(*)
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010', 'partitioning_test_2011', 'partitioning_test_2012') )
GROUP BY
nodename, nodeport
ORDER BY
1,2,3;
-- dropping the parent should CASCADE to the children as well
DROP TABLE partitioning_test;
\d+ partitioning_test*
-- set the colocationid sequence back to 1 to make sure
-- that this file does not break other tests
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;