mirror of https://github.com/citusdata/citus.git
First pass of creating distributed tables
parent
58947e0dcf
commit
c594a056ae
|
@ -29,6 +29,7 @@
|
|||
#include "commands/defrem.h"
|
||||
#include "commands/extension.h"
|
||||
#include "commands/trigger.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
|
@ -38,6 +39,7 @@
|
|||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_logical_planner.h"
|
||||
#include "distributed/multi_utility.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/pg_dist_colocation.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
|
@ -80,7 +82,8 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN
|
|||
char *colocateWithTableName,
|
||||
int shardCount, int replicationFactor);
|
||||
static Oid ColumnType(Oid relationId, char *columnName);
|
||||
static void CopyLocalDataIntoShards(Oid relationId);
|
||||
static void CopyLocalDataIntoShards(Oid destinationDistributedRelationId, List *
|
||||
sourceLocalRelationList);
|
||||
static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
static bool RelationUsesIdentityColumns(TupleDesc relationDesc);
|
||||
|
@ -280,7 +283,7 @@ CreateReferenceTable(Oid relationId)
|
|||
/* copy over data for regular relations */
|
||||
if (relationKind == RELKIND_RELATION)
|
||||
{
|
||||
CopyLocalDataIntoShards(relationId);
|
||||
CopyLocalDataIntoShards(relationId, list_make1_oid(relationId));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -339,9 +342,9 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
|||
"(OIDS) option in their definitions.")));
|
||||
}
|
||||
|
||||
/* verify target relation is either regular or foreign table */
|
||||
relationKind = relation->rd_rel->relkind;
|
||||
if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE)
|
||||
|
||||
if (!SupportedRelationKind(relation))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("cannot distribute relation: %s",
|
||||
|
@ -351,13 +354,6 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
|||
}
|
||||
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
if (relation->rd_rel->relispartition)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("cannot distribute relation: %s", relationName),
|
||||
errdetail("Distributing partition tables is unsupported.")));
|
||||
}
|
||||
|
||||
if (RelationUsesIdentityColumns(relationDesc))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
|
@ -367,6 +363,20 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
|||
}
|
||||
#endif
|
||||
|
||||
/* partitions cannot be distributed unless thier parent is already partitioned */
|
||||
if (PartitionTable(relationId) && !IsDistributedTable(PartitionParentOid(relationId)))
|
||||
{
|
||||
char *parentRelationName = get_rel_name(PartitionParentOid(relationId));
|
||||
char *relationName = get_rel_name(relationId);
|
||||
|
||||
ereport(ERROR, (errmsg("cannot distributed relation \"%s\" which is partition"
|
||||
" of \"%s\"", relationName, parentRelationName),
|
||||
errdetail("Citus does not support partitioning among local "
|
||||
"tables and distributed tables"),
|
||||
errhint("First distribute the partitioned table \"%s\"",
|
||||
parentRelationName)));
|
||||
}
|
||||
|
||||
/* check that table is empty if that is required */
|
||||
if (requireEmpty && !LocalTableEmpty(relationId))
|
||||
{
|
||||
|
@ -664,7 +674,11 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
|||
|
||||
/* relax empty table requirement for regular (non-foreign) tables */
|
||||
relationKind = get_rel_relkind(relationId);
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE)
|
||||
#else
|
||||
if (relationKind == RELKIND_RELATION)
|
||||
#endif
|
||||
{
|
||||
requireEmpty = false;
|
||||
}
|
||||
|
@ -688,10 +702,53 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
|||
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor);
|
||||
}
|
||||
|
||||
/* copy over data for regular relations */
|
||||
if (relationKind == RELKIND_RELATION)
|
||||
/*
|
||||
* Copy over data for regular relations. Note that here we skip the partitions
|
||||
* and copy them via the partitioned table below.
|
||||
*/
|
||||
if (relationKind == RELKIND_RELATION && !PartitionTable(relationId))
|
||||
{
|
||||
CopyLocalDataIntoShards(relationId);
|
||||
CopyLocalDataIntoShards(relationId, list_make1_oid(relationId));
|
||||
}
|
||||
|
||||
/* we also need to check for partitioned tables */
|
||||
if (PartitionedTable(relationId))
|
||||
{
|
||||
List *partitionList = PartitionList(relationId);
|
||||
char *parentRelationName = get_rel_name(relationId);
|
||||
ListCell *partitionCell = NULL;
|
||||
|
||||
/* partitioned distributed tables are only supported with replication */
|
||||
if (replicationFactor != 1)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||
errmsg("cannot distribute relation \"%s\"",
|
||||
parentRelationName),
|
||||
errdetail("Relation \"%s\" has partitions and "
|
||||
"distributing partitioned tables "
|
||||
"with shard replication factor greater "
|
||||
"than 1 is not supported.", parentRelationName)));
|
||||
}
|
||||
|
||||
/*
|
||||
* For each partition, create a co-located hash distributed table. Note that
|
||||
* we pick the parent tables distribution key as the distribution key for
|
||||
* the partitions.
|
||||
*/
|
||||
foreach(partitionCell, partitionList)
|
||||
{
|
||||
Oid partitionOid = lfirst_oid(partitionCell);
|
||||
|
||||
CreateHashDistributedTable(partitionOid, distributionColumnName,
|
||||
parentRelationName, shardCount,
|
||||
replicationFactor);
|
||||
}
|
||||
|
||||
/*
|
||||
* After converting all the partition tables into distributed tables, now lets
|
||||
* copy the existing data from the local partitions.
|
||||
*/
|
||||
CopyLocalDataIntoShards(relationId, partitionList);
|
||||
}
|
||||
|
||||
heap_close(pgDistColocation, NoLock);
|
||||
|
@ -742,7 +799,7 @@ EnsureReplicationSettings(Oid relationId, char replicationModel)
|
|||
|
||||
|
||||
/*
|
||||
* CopyLocalDataIntoShards copies data from the local table, which is hidden
|
||||
* CopyLocalDataIntoShards copies data from list of local tables, which are hidden
|
||||
* after converting it to a distributed table, into the shards of the distributed
|
||||
* table.
|
||||
*
|
||||
|
@ -758,33 +815,68 @@ EnsureReplicationSettings(Oid relationId, char replicationModel)
|
|||
* want to read from the local table. To keep it simple, we perform a heap scan
|
||||
* directly on the table.
|
||||
*
|
||||
* Any writes on the table that are started during this operation will be handled
|
||||
* as distributed queries once the current transaction commits. SELECTs will
|
||||
* continue to read from the local table until the current transaction commits,
|
||||
* after which new SELECTs will be handled as distributed queries.
|
||||
* Any writes on both the source tables and target table that are started
|
||||
* during this operation will be handled as distributed queries once the
|
||||
* current transaction commits. SELECTs will continue to read from the
|
||||
* local table until the current transaction commits, after which new
|
||||
* SELECTs will be handled as distributed queries.
|
||||
*
|
||||
* After copying local data into the distributed table, the local data remains
|
||||
* in place and should be truncated at a later time.
|
||||
*
|
||||
* Note that it is allowed that destinationDistributedRelationId also appears on
|
||||
* sourceOidList.
|
||||
*/
|
||||
static void
|
||||
CopyLocalDataIntoShards(Oid distributedRelationId)
|
||||
CopyLocalDataIntoShards(Oid destinationDistributedRelationId, List *sourceOidList)
|
||||
{
|
||||
DestReceiver *copyDest = NULL;
|
||||
List *columnNameList = NIL;
|
||||
Relation distributedRelation = NULL;
|
||||
List *sourceRelationList = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
bool stopOnFailure = true;
|
||||
|
||||
EState *estate = NULL;
|
||||
HeapScanDesc scan = NULL;
|
||||
HeapTuple tuple = NULL;
|
||||
ExprContext *econtext = NULL;
|
||||
MemoryContext oldContext = NULL;
|
||||
TupleTableSlot *slot = NULL;
|
||||
uint64 rowsCopied = 0;
|
||||
|
||||
ListCell *sourceOidCell = NULL;
|
||||
ListCell *sourceRelationCell = NULL;
|
||||
|
||||
/* take an ExclusiveLock to block all operations except SELECT */
|
||||
distributedRelation = heap_open(distributedRelationId, ExclusiveLock);
|
||||
distributedRelation = heap_open(destinationDistributedRelationId, ExclusiveLock);
|
||||
|
||||
/* take ExclusiveLock on all source relations as well */
|
||||
foreach(sourceOidCell, sourceOidList)
|
||||
{
|
||||
Oid sourceRelationId = lfirst_oid(sourceOidCell);
|
||||
Relation localRelation = heap_open(sourceRelationId, ExclusiveLock);
|
||||
|
||||
/*
|
||||
* This check currently cannot be exercised by any code path. However,
|
||||
* it should be here as a precaution in case the support is expanded.
|
||||
*/
|
||||
if (list_length(sourceOidList) > 1 &&
|
||||
PartitionParentOid(sourceRelationId) != destinationDistributedRelationId)
|
||||
{
|
||||
NameData localRelationNameData = localRelation->rd_rel->relname;
|
||||
char *localRelationName = pstrdup(NameStr(localRelationNameData));
|
||||
|
||||
NameData distributedRelationNameData = localRelation->rd_rel->relname;
|
||||
char *distributedRelationName = pstrdup(NameStr(distributedRelationNameData));
|
||||
|
||||
ereport(ERROR, (errmsg("Local relation \"%s\" cannot be copied into "
|
||||
"distributed relation \"%s\"", localRelationName,
|
||||
distributedRelationName),
|
||||
errdetail("Relations do not have partitioning hierarcy")));
|
||||
}
|
||||
|
||||
sourceRelationList = lappend(sourceRelationList, localRelation);
|
||||
}
|
||||
|
||||
/*
|
||||
* All writes have finished, make sure that we can see them by using the
|
||||
|
@ -808,41 +900,59 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
|
|||
econtext->ecxt_scantuple = slot;
|
||||
|
||||
copyDest =
|
||||
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
|
||||
(DestReceiver *) CreateCitusCopyDestReceiver(destinationDistributedRelationId,
|
||||
columnNameList, estate,
|
||||
stopOnFailure);
|
||||
|
||||
/* initialise state for writing to shards, we'll open connections on demand */
|
||||
copyDest->rStartup(copyDest, 0, tupleDescriptor);
|
||||
|
||||
/* begin reading from local table */
|
||||
scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL);
|
||||
|
||||
oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
||||
|
||||
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
|
||||
/* iterate on the local relations, copy each into the destination relation */
|
||||
foreach(sourceRelationCell, sourceRelationList)
|
||||
{
|
||||
/* materialize tuple and send it to a shard */
|
||||
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
|
||||
copyDest->receiveSlot(slot, copyDest);
|
||||
Relation localRelation = (Relation) lfirst(sourceRelationCell);
|
||||
bool printNoticeMessage = true;
|
||||
|
||||
/* clear tuple memory */
|
||||
ResetPerTupleExprContext(estate);
|
||||
/* begin reading from local table */
|
||||
HeapScanDesc scan = heap_beginscan(localRelation, GetActiveSnapshot(), 0, NULL);
|
||||
|
||||
/* make sure we roll back on cancellation */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
||||
|
||||
if (rowsCopied == 0)
|
||||
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
|
||||
{
|
||||
ereport(NOTICE, (errmsg("Copying data from local table...")));
|
||||
/* materialize tuple and send it to a shard */
|
||||
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
|
||||
copyDest->receiveSlot(slot, copyDest);
|
||||
|
||||
/* clear tuple memory */
|
||||
ResetPerTupleExprContext(estate);
|
||||
|
||||
/* make sure we roll back on cancellation */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
if (printNoticeMessage)
|
||||
{
|
||||
/* we only want to print this once per relation */
|
||||
printNoticeMessage = false;
|
||||
|
||||
ereport(NOTICE, (errmsg("Copying data from local table...")));
|
||||
}
|
||||
|
||||
rowsCopied++;
|
||||
|
||||
if (rowsCopied % 1000000 == 0)
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied)));
|
||||
}
|
||||
}
|
||||
|
||||
rowsCopied++;
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
if (rowsCopied % 1000000 == 0)
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied)));
|
||||
}
|
||||
/* finish reading from the local table */
|
||||
heap_endscan(scan);
|
||||
|
||||
/* keep the lock */
|
||||
heap_close(localRelation, NoLock);
|
||||
}
|
||||
|
||||
if (rowsCopied % 1000000 != 0)
|
||||
|
@ -850,11 +960,6 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
|
|||
ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied)));
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
/* finish reading from the local table */
|
||||
heap_endscan(scan);
|
||||
|
||||
/* finish writing into the shards */
|
||||
copyDest->rShutdown(copyDest);
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/pg_dist_shard.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
|
@ -251,6 +252,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
|
|||
List *targetTableForeignConstraintCommands = NIL;
|
||||
ListCell *sourceShardCell = NULL;
|
||||
bool includeSequenceDefaults = false;
|
||||
char *alterTableAttachPartitionCommand = NULL;
|
||||
|
||||
/* make sure that tables are hash partitioned */
|
||||
CheckHashPartitionedTable(targetRelationId);
|
||||
|
@ -288,6 +290,13 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
|
|||
targetTableDDLEvents = GetTableDDLEvents(targetRelationId, includeSequenceDefaults);
|
||||
targetTableForeignConstraintCommands = GetTableForeignConstraintCommands(
|
||||
targetRelationId);
|
||||
|
||||
if (PartitionTable(targetRelationId))
|
||||
{
|
||||
alterTableAttachPartitionCommand =
|
||||
GenerateAlterTableAttachPartitionCommand(targetRelationId);
|
||||
}
|
||||
|
||||
targetShardStorageType = ShardStorageType(targetRelationId);
|
||||
|
||||
foreach(sourceShardCell, sourceShardIntervalList)
|
||||
|
@ -315,7 +324,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
|
|||
sourceNodePort, sourceShardIndex, newShardId,
|
||||
targetTableRelationOwner,
|
||||
targetTableDDLEvents,
|
||||
targetTableForeignConstraintCommands);
|
||||
targetTableForeignConstraintCommands,
|
||||
alterTableAttachPartitionCommand);
|
||||
if (created)
|
||||
{
|
||||
const RelayFileState shardState = FILE_FINALIZED;
|
||||
|
|
|
@ -736,7 +736,11 @@ ShardStorageType(Oid relationId)
|
|||
char shardStorageType = 0;
|
||||
|
||||
char relationType = get_rel_relkind(relationId);
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
if (relationType == RELKIND_RELATION || relationType == RELKIND_PARTITIONED_TABLE)
|
||||
#else
|
||||
if (relationType == RELKIND_RELATION)
|
||||
#endif
|
||||
{
|
||||
shardStorageType = SHARD_STORAGE_TABLE;
|
||||
}
|
||||
|
|
|
@ -24,6 +24,9 @@
|
|||
#include "commands/tablecmds.h"
|
||||
#include "catalog/indexing.h"
|
||||
#include "catalog/namespace.h"
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
#include "catalog/partition.h"
|
||||
#endif
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
|
@ -31,6 +34,7 @@
|
|||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/pg_dist_shard.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
|
@ -396,10 +400,12 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
|
|||
relationId);
|
||||
int shardIndex = -1; /* not used in this code path */
|
||||
bool created = false;
|
||||
char *alterTableAttachPartitionCommand = NULL;
|
||||
|
||||
created = WorkerCreateShard(relationId, nodeName, nodePort, shardIndex,
|
||||
shardId, newPlacementOwner, ddlEventList,
|
||||
foreignConstraintCommandList);
|
||||
foreignConstraintCommandList,
|
||||
alterTableAttachPartitionCommand);
|
||||
if (created)
|
||||
{
|
||||
const RelayFileState shardState = FILE_FINALIZED;
|
||||
|
@ -438,7 +444,8 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
|
|||
bool
|
||||
WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
|
||||
int shardIndex, uint64 shardId, char *newShardOwner,
|
||||
List *ddlCommandList, List *foreignConstraintCommandList)
|
||||
List *ddlCommandList, List *foreignConstraintCommandList,
|
||||
char *alterTableAttachPartitionCommand)
|
||||
{
|
||||
Oid schemaId = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
|
@ -531,6 +538,45 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If the shard is created for a partition, send the command to create the
|
||||
* partitioning hierarcy on the shard.
|
||||
*/
|
||||
if (alterTableAttachPartitionCommand != NULL)
|
||||
{
|
||||
Oid parentRelationId = PartitionParentOid(relationId);
|
||||
uint64 correspondingParentShardId = InvalidOid;
|
||||
StringInfo applyAttachPartitionCommand = makeStringInfo();
|
||||
List *queryResultList = NIL;
|
||||
|
||||
Oid parentSchemaId = InvalidOid;
|
||||
char *parentSchemaName = NULL;
|
||||
char *escapedParentSchemaName = NULL;
|
||||
char *escapedCommand = NULL;
|
||||
|
||||
Assert(PartitionTable(relationId));
|
||||
|
||||
parentSchemaId = get_rel_namespace(parentRelationId);
|
||||
parentSchemaName = get_namespace_name(parentSchemaId);
|
||||
escapedParentSchemaName = quote_literal_cstr(parentSchemaName);
|
||||
escapedCommand = quote_literal_cstr(alterTableAttachPartitionCommand);
|
||||
|
||||
correspondingParentShardId = ColocatedShardIdInRelation(parentRelationId,
|
||||
shardIndex);
|
||||
|
||||
appendStringInfo(applyAttachPartitionCommand,
|
||||
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, correspondingParentShardId,
|
||||
escapedParentSchemaName, shardId, escapedSchemaName,
|
||||
escapedCommand);
|
||||
|
||||
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner,
|
||||
applyAttachPartitionCommand);
|
||||
if (queryResultList == NIL)
|
||||
{
|
||||
shardCreated = false;
|
||||
}
|
||||
}
|
||||
|
||||
return shardCreated;
|
||||
}
|
||||
|
||||
|
|
|
@ -59,7 +59,6 @@
|
|||
|
||||
|
||||
static void AppendOptionListToString(StringInfo stringData, List *options);
|
||||
static bool SupportedRelationKind(Relation relation);
|
||||
static const char * convert_aclright_to_string(int aclright);
|
||||
|
||||
|
||||
|
@ -494,7 +493,7 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults)
|
|||
* SupportedRelationKind returns true if the given relation is supported as a
|
||||
* distributed relation.
|
||||
*/
|
||||
static bool
|
||||
bool
|
||||
SupportedRelationKind(Relation relation)
|
||||
{
|
||||
char relationKind = relation->rd_rel->relkind;
|
||||
|
|
|
@ -197,6 +197,26 @@ PartitionList(Oid parentRelationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* Wrapper around get_partition_parent
|
||||
* *
|
||||
* Note: Because this function assumes that the relation whose OID is passed
|
||||
* as an argument will have precisely one parent, it should only be called
|
||||
* when it is known that the relation is a partition.
|
||||
*/
|
||||
Oid
|
||||
PartitionParentOid(Oid partitionOid)
|
||||
{
|
||||
Oid partitionParentOid = InvalidOid;
|
||||
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
partitionParentOid = get_partition_parent(partitionOid);
|
||||
#endif
|
||||
|
||||
return partitionParentOid;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateDetachPartitionCommand gets a partition table and returns
|
||||
* "ALTER TABLE parent_table DETACH PARTITION partitionName" command.
|
||||
|
|
|
@ -34,6 +34,7 @@ extern char * pg_get_serverdef_string(Oid tableRelationId);
|
|||
extern char * pg_get_sequencedef_string(Oid sequenceRelid);
|
||||
extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId);
|
||||
extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation);
|
||||
extern bool SupportedRelationKind(Relation relation);
|
||||
extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId);
|
||||
extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid,
|
||||
int64 shardid, StringInfo buffer);
|
||||
|
|
|
@ -116,7 +116,8 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId);
|
|||
extern void CreateReferenceTableShard(Oid distributedTableId);
|
||||
extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
|
||||
int shardIndex, uint64 shardId, char *newShardOwner,
|
||||
List *ddlCommandList, List *foreignConstraintCommadList);
|
||||
List *ddlCommandList, List *foreignConstraintCommadList,
|
||||
char *alterTableAttachPartitionCommand);
|
||||
extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
|
||||
extern void CheckHashPartitionedTable(Oid distributedTableId);
|
||||
extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName,
|
||||
|
|
|
@ -16,6 +16,7 @@ extern bool PartitionTable(Oid relationId);
|
|||
extern bool IsChildTable(Oid relationId);
|
||||
extern bool IsParentTable(Oid relationId);
|
||||
extern List * PartitionList(Oid parentRelationId);
|
||||
extern Oid PartitionParentOid(Oid partitionOid);
|
||||
extern char * GenerateDetachPartitionCommand(Oid partitionTableId);
|
||||
extern char * GenerateAlterTableAttachPartitionCommand(Oid partitionTableId);
|
||||
extern char * GeneratePartitioningInformation(Oid tableId);
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000;
|
||||
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
|
||||
|
||||
-- create its partitions
|
||||
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
|
||||
-- load some data and distribute tables
|
||||
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
|
||||
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
|
||||
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
|
||||
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
|
||||
SET citus.shard_count TO 4;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
-- this should error out given that parent of the partition is not distributed
|
||||
SELECT create_distributed_table('partitioning_test_2010', 'id');
|
||||
ERROR: cannot distributed relation "partitioning_test_2010" which is partition of "partitioning_test"
|
||||
DETAIL: Citus does not support partitioning among local tables and distributed tables
|
||||
HINT: First distribute the partitioned table "partitioning_test"
|
||||
-- this should suceed
|
||||
SELECT create_distributed_table('partitioning_test', 'id');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: Copying data from local table...
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- check the data
|
||||
SELECT * FROM partitioning_test ORDER BY 1;
|
||||
id | time
|
||||
----+------------
|
||||
1 | 06-06-2009
|
||||
2 | 07-07-2010
|
||||
3 | 09-09-2009
|
||||
4 | 03-03-2010
|
||||
(4 rows)
|
||||
|
||||
-- check the metadata
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
|
||||
ORDER BY 1;
|
||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
||||
------------------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
|
||||
partitioning_test | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
|
||||
partitioning_test_2009 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
|
||||
partitioning_test_2010 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 2 | c
|
||||
(3 rows)
|
||||
|
||||
SELECT
|
||||
logicalrelid, count(*)
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
|
||||
GROUP BY
|
||||
logicalrelid
|
||||
ORDER BY
|
||||
1,2;
|
||||
logicalrelid | count
|
||||
------------------------+-------
|
||||
partitioning_test | 4
|
||||
partitioning_test_2009 | 4
|
||||
partitioning_test_2010 | 4
|
||||
(3 rows)
|
||||
|
||||
SELECT
|
||||
nodename, nodeport, count(*)
|
||||
FROM
|
||||
pg_dist_shard_placement
|
||||
WHERE
|
||||
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') )
|
||||
GROUP BY
|
||||
nodename, nodeport
|
||||
ORDER BY
|
||||
1,2,3;
|
||||
nodename | nodeport | count
|
||||
-----------+----------+-------
|
||||
localhost | 57637 | 6
|
||||
localhost | 57638 | 6
|
||||
(2 rows)
|
||||
|
||||
-- dropping the parent should CASCADE to the children as well
|
||||
DROP TABLE partitioning_test;
|
||||
\d+ partitioning_test*
|
|
@ -0,0 +1,88 @@
|
|||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000;
|
||||
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
|
||||
ERROR: syntax error at or near "PARTITION"
|
||||
LINE 1: CREATE TABLE partitioning_test(id int, time date) PARTITION ...
|
||||
^
|
||||
|
||||
-- create its partitions
|
||||
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||
ERROR: syntax error at or near "PARTITION"
|
||||
LINE 1: CREATE TABLE partitioning_test_2009 PARTITION OF partitionin...
|
||||
^
|
||||
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
|
||||
ERROR: syntax error at or near "PARTITION"
|
||||
LINE 1: CREATE TABLE partitioning_test_2010 PARTITION OF partitionin...
|
||||
^
|
||||
-- load some data and distribute tables
|
||||
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
|
||||
ERROR: relation "partitioning_test" does not exist
|
||||
LINE 1: INSERT INTO partitioning_test VALUES (1, '2009-06-06');
|
||||
^
|
||||
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
|
||||
ERROR: relation "partitioning_test" does not exist
|
||||
LINE 1: INSERT INTO partitioning_test VALUES (2, '2010-07-07');
|
||||
^
|
||||
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
|
||||
ERROR: relation "partitioning_test_2009" does not exist
|
||||
LINE 1: INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
|
||||
^
|
||||
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
|
||||
ERROR: relation "partitioning_test_2010" does not exist
|
||||
LINE 1: INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
|
||||
^
|
||||
SET citus.shard_count TO 4;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
-- this should error out given that parent of the partition is not distributed
|
||||
SELECT create_distributed_table('partitioning_test_2010', 'id');
|
||||
ERROR: relation "partitioning_test_2010" does not exist
|
||||
LINE 1: SELECT create_distributed_table('partitioning_test_2010', 'i...
|
||||
^
|
||||
-- this should suceed
|
||||
SELECT create_distributed_table('partitioning_test', 'id');
|
||||
ERROR: relation "partitioning_test" does not exist
|
||||
LINE 1: SELECT create_distributed_table('partitioning_test', 'id');
|
||||
^
|
||||
-- check the data
|
||||
SELECT * FROM partitioning_test ORDER BY 1;
|
||||
ERROR: relation "partitioning_test" does not exist
|
||||
LINE 1: SELECT * FROM partitioning_test ORDER BY 1;
|
||||
^
|
||||
-- check the metadata
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
|
||||
ORDER BY 1;
|
||||
ERROR: relation "partitioning_test" does not exist
|
||||
LINE 6: logicalrelid IN ('partitioning_test', 'partitioning_test_20...
|
||||
^
|
||||
SELECT
|
||||
logicalrelid, count(*)
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
|
||||
GROUP BY
|
||||
logicalrelid
|
||||
ORDER BY
|
||||
1,2;
|
||||
ERROR: relation "partitioning_test" does not exist
|
||||
LINE 4: WHERE logicalrelid IN ('partitioning_test', 'partitioning_t...
|
||||
^
|
||||
SELECT
|
||||
nodename, nodeport, count(*)
|
||||
FROM
|
||||
pg_dist_shard_placement
|
||||
WHERE
|
||||
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') )
|
||||
GROUP BY
|
||||
nodename, nodeport
|
||||
ORDER BY
|
||||
1,2,3;
|
||||
ERROR: relation "partitioning_test" does not exist
|
||||
LINE 6: ...shardid FROM pg_dist_shard WHERE logicalrelid IN ('partition...
|
||||
^
|
||||
-- dropping the parent should CASCADE to the children as well
|
||||
DROP TABLE partitioning_test;
|
||||
ERROR: table "partitioning_test" does not exist
|
||||
\d+ partitioning_test*
|
|
@ -19,7 +19,7 @@ WHERE
|
|||
logicalrelid = 'reference_table_test'::regclass;
|
||||
partmethod | partkeyisnull | colocationid | repmodel
|
||||
------------+---------------+--------------+----------
|
||||
n | t | 1 | t
|
||||
n | t | 3 | t
|
||||
(1 row)
|
||||
|
||||
-- now see that shard min/max values are NULL
|
||||
|
|
|
@ -25,7 +25,7 @@ test: multi_metadata_access
|
|||
# ---
|
||||
# Tests for partitioning support
|
||||
# ---
|
||||
test: multi_partitioning_utils
|
||||
test: multi_partitioning_utils multi_partitioning
|
||||
|
||||
# ----------
|
||||
# The following distributed tests depend on creating a partitioned table and
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1660000;
|
||||
|
||||
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
|
||||
|
||||
-- create its partitions
|
||||
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
|
||||
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01');
|
||||
|
||||
-- load some data and distribute tables
|
||||
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
|
||||
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
|
||||
|
||||
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
|
||||
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
|
||||
|
||||
SET citus.shard_count TO 4;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
-- this should error out given that parent of the partition is not distributed
|
||||
SELECT create_distributed_table('partitioning_test_2010', 'id');
|
||||
|
||||
-- this should suceed
|
||||
SELECT create_distributed_table('partitioning_test', 'id');
|
||||
|
||||
-- check the data
|
||||
SELECT * FROM partitioning_test ORDER BY 1;
|
||||
|
||||
-- check the metadata
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
|
||||
ORDER BY 1;
|
||||
|
||||
SELECT
|
||||
logicalrelid, count(*)
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010')
|
||||
GROUP BY
|
||||
logicalrelid
|
||||
ORDER BY
|
||||
1,2;
|
||||
|
||||
SELECT
|
||||
nodename, nodeport, count(*)
|
||||
FROM
|
||||
pg_dist_shard_placement
|
||||
WHERE
|
||||
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid IN ('partitioning_test', 'partitioning_test_2009', 'partitioning_test_2010') )
|
||||
GROUP BY
|
||||
nodename, nodeport
|
||||
ORDER BY
|
||||
1,2,3;
|
||||
|
||||
-- dropping the parent should CASCADE to the children as well
|
||||
DROP TABLE partitioning_test;
|
||||
|
||||
\d+ partitioning_test*
|
Loading…
Reference in New Issue