mirror of https://github.com/citusdata/citus.git
Support distributing by multiple columns
parent
2954fb0ee8
commit
5a57ee0878
|
@ -1072,7 +1072,8 @@ CreateDistributedTableLike(TableConversionState *con)
|
||||||
newShardCount = con->shardCount;
|
newShardCount = con->shardCount;
|
||||||
}
|
}
|
||||||
char partitionMethod = PartitionMethod(con->relationId);
|
char partitionMethod = PartitionMethod(con->relationId);
|
||||||
CreateDistributedTable(con->newRelationId, newDistributionKey, partitionMethod,
|
CreateDistributedTable(con->newRelationId, list_make1(newDistributionKey),
|
||||||
|
partitionMethod,
|
||||||
newShardCount, true, newColocateWith, false);
|
newShardCount, true, newColocateWith, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1012,7 +1012,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId)
|
||||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||||
Var *distributionColumn = NULL;
|
Var *distributionColumn = NULL;
|
||||||
InsertIntoPgDistPartition(citusLocalTableId, distributionMethod,
|
InsertIntoPgDistPartition(citusLocalTableId, distributionMethod,
|
||||||
distributionColumn, colocationId,
|
list_make1(distributionColumn), colocationId,
|
||||||
replicationModel);
|
replicationModel);
|
||||||
|
|
||||||
/* set shard storage type according to relation type */
|
/* set shard storage type according to relation type */
|
||||||
|
|
|
@ -36,6 +36,7 @@
|
||||||
#include "commands/sequence.h"
|
#include "commands/sequence.h"
|
||||||
#include "commands/tablecmds.h"
|
#include "commands/tablecmds.h"
|
||||||
#include "commands/trigger.h"
|
#include "commands/trigger.h"
|
||||||
|
#include "distributed/argutils.h"
|
||||||
#include "distributed/commands/multi_copy.h"
|
#include "distributed/commands/multi_copy.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
|
@ -126,10 +127,16 @@ static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
|
||||||
DestReceiver *copyDest,
|
DestReceiver *copyDest,
|
||||||
TupleTableSlot *slot,
|
TupleTableSlot *slot,
|
||||||
EState *estate);
|
EState *estate);
|
||||||
|
static void CreateDistributedTableModern(Oid relationId,
|
||||||
|
List *distributionColumnTexts,
|
||||||
|
Oid distributionMethodOid,
|
||||||
|
text *colocateWithTableNameText,
|
||||||
|
int32 *shardCountPtr);
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
PG_FUNCTION_INFO_V1(master_create_distributed_table);
|
PG_FUNCTION_INFO_V1(master_create_distributed_table);
|
||||||
PG_FUNCTION_INFO_V1(create_distributed_table);
|
PG_FUNCTION_INFO_V1(create_distributed_table);
|
||||||
|
PG_FUNCTION_INFO_V1(create_distributed_table_multi_column);
|
||||||
PG_FUNCTION_INFO_V1(create_reference_table);
|
PG_FUNCTION_INFO_V1(create_reference_table);
|
||||||
|
|
||||||
|
|
||||||
|
@ -172,7 +179,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
Assert(distributionColumn != NULL);
|
Assert(distributionColumn != NULL);
|
||||||
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
||||||
|
|
||||||
CreateDistributedTable(relationId, distributionColumn, distributionMethod,
|
CreateDistributedTable(relationId, list_make1(distributionColumn), distributionMethod,
|
||||||
ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
|
ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
|
||||||
|
|
||||||
relation_close(relation, NoLock);
|
relation_close(relation, NoLock);
|
||||||
|
@ -181,6 +188,43 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Datum
|
||||||
|
create_distributed_table_multi_column(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
PG_ENSURE_ARGNOTNULL(0, "table_name");
|
||||||
|
PG_ENSURE_ARGNOTNULL(1, "distribution_column");
|
||||||
|
PG_ENSURE_ARGNOTNULL(2, "distribution_type");
|
||||||
|
PG_ENSURE_ARGNOTNULL(3, "colocate_with");
|
||||||
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
|
ArrayType *distributionColumnArray = PG_GETARG_ARRAYTYPE_P(1);
|
||||||
|
Oid distributionMethodOid = PG_GETARG_OID(2);
|
||||||
|
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
|
||||||
|
int32 *shardCountPtr = NULL;
|
||||||
|
int32 shardCount = 0;
|
||||||
|
if (!PG_ARGISNULL(4))
|
||||||
|
{
|
||||||
|
shardCount = PG_GETARG_INT32(4);
|
||||||
|
shardCountPtr = &shardCount;
|
||||||
|
}
|
||||||
|
int distributionColumnCount = ArrayObjectCount(distributionColumnArray);
|
||||||
|
Datum *distributionColumnArrayDatum = DeconstructArrayObject(distributionColumnArray);
|
||||||
|
|
||||||
|
List *distributionColumnTextList = NIL;
|
||||||
|
for (int i = 0; i < distributionColumnCount; i++)
|
||||||
|
{
|
||||||
|
text *distributionColumnText = DatumGetTextP(distributionColumnArrayDatum[i]);
|
||||||
|
distributionColumnTextList = lappend(
|
||||||
|
distributionColumnTextList, distributionColumnText);
|
||||||
|
}
|
||||||
|
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
CreateDistributedTableModern(relationId, distributionColumnTextList,
|
||||||
|
distributionMethodOid, colocateWithTableNameText,
|
||||||
|
shardCountPtr);
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* create_distributed_table gets a table name, distribution column,
|
* create_distributed_table gets a table name, distribution column,
|
||||||
* distribution method and colocate_with option, then it creates a
|
* distribution method and colocate_with option, then it creates a
|
||||||
|
@ -195,17 +239,37 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
bool viaDeprecatedAPI = false;
|
|
||||||
|
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
text *distributionColumnText = PG_GETARG_TEXT_P(1);
|
text *distributionColumnText = PG_GETARG_TEXT_P(1);
|
||||||
Oid distributionMethodOid = PG_GETARG_OID(2);
|
Oid distributionMethodOid = PG_GETARG_OID(2);
|
||||||
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
|
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
|
||||||
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
|
int32 *shardCountPtr = NULL;
|
||||||
|
int32 shardCount = 0;
|
||||||
|
if (!PG_ARGISNULL(4))
|
||||||
|
{
|
||||||
|
shardCount = PG_GETARG_INT32(4);
|
||||||
|
shardCountPtr = &shardCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
CreateDistributedTableModern(relationId, list_make1(distributionColumnText),
|
||||||
|
distributionMethodOid, colocateWithTableNameText,
|
||||||
|
shardCountPtr);
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
CreateDistributedTableModern(Oid relationId,
|
||||||
|
List *distributionColumnTextList,
|
||||||
|
Oid distributionMethodOid,
|
||||||
|
text *colocateWithTableNameText,
|
||||||
|
int32 *shardCountPtr)
|
||||||
|
{
|
||||||
|
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
|
||||||
bool shardCountIsStrict = false;
|
bool shardCountIsStrict = false;
|
||||||
int shardCount = ShardCount;
|
int shardCount = ShardCount;
|
||||||
if (!PG_ARGISNULL(4))
|
if (shardCountPtr)
|
||||||
{
|
{
|
||||||
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
|
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
|
||||||
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
|
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
|
||||||
|
@ -214,7 +278,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
"and shard_count at the same time")));
|
"and shard_count at the same time")));
|
||||||
}
|
}
|
||||||
|
|
||||||
shardCount = PG_GETARG_INT32(4);
|
shardCount = *shardCountPtr;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* if shard_count parameter is given than we have to
|
* if shard_count parameter is given than we have to
|
||||||
|
@ -242,10 +306,18 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
relation_close(relation, NoLock);
|
relation_close(relation, NoLock);
|
||||||
|
|
||||||
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
List *distributionColumnList = NIL;
|
||||||
Var *distributionColumn = BuildDistributionKeyFromColumnName(relation,
|
text *distributionColumnText = NULL;
|
||||||
distributionColumnName);
|
foreach_ptr(distributionColumnText, distributionColumnTextList)
|
||||||
Assert(distributionColumn != NULL);
|
{
|
||||||
|
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
||||||
|
Var *distributionColumn = BuildDistributionKeyFromColumnName(
|
||||||
|
relation,
|
||||||
|
distributionColumnName);
|
||||||
|
Assert(distributionColumn != NULL);
|
||||||
|
distributionColumnList = lappend(distributionColumnList, distributionColumn);
|
||||||
|
}
|
||||||
|
|
||||||
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
||||||
|
|
||||||
if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
|
if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
|
||||||
|
@ -255,11 +327,12 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
shardCount, MAX_SHARD_COUNT)));
|
shardCount, MAX_SHARD_COUNT)));
|
||||||
}
|
}
|
||||||
|
|
||||||
CreateDistributedTable(relationId, distributionColumn, distributionMethod,
|
bool viaDeprecatedAPI = false;
|
||||||
|
|
||||||
|
CreateDistributedTable(relationId, distributionColumnList,
|
||||||
|
distributionMethod,
|
||||||
shardCount, shardCountIsStrict, colocateWithTableName,
|
shardCount, shardCountIsStrict, colocateWithTableName,
|
||||||
viaDeprecatedAPI);
|
viaDeprecatedAPI);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -311,7 +384,7 @@ create_reference_table(PG_FUNCTION_ARGS)
|
||||||
errdetail("There are no active worker nodes.")));
|
errdetail("There are no active worker nodes.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE,
|
CreateDistributedTable(relationId, list_make1(distributionColumn), DISTRIBUTE_BY_NONE,
|
||||||
ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
|
ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -368,7 +441,8 @@ EnsureRelationExists(Oid relationId)
|
||||||
* day, once we deprecate master_create_distribute_table completely.
|
* day, once we deprecate master_create_distribute_table completely.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod,
|
CreateDistributedTable(Oid relationId, List *distributionColumnList,
|
||||||
|
char distributionMethod,
|
||||||
int shardCount, bool shardCountIsStrict,
|
int shardCount, bool shardCountIsStrict,
|
||||||
char *colocateWithTableName, bool viaDeprecatedAPI)
|
char *colocateWithTableName, bool viaDeprecatedAPI)
|
||||||
{
|
{
|
||||||
|
@ -444,13 +518,15 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
||||||
* ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
|
* ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
|
||||||
* our caller already acquired lock on relationId.
|
* our caller already acquired lock on relationId.
|
||||||
*/
|
*/
|
||||||
uint32 colocationId = ColocationIdForNewTable(relationId, distributionColumn,
|
uint32 colocationId = ColocationIdForNewTable(relationId, linitial(
|
||||||
|
distributionColumnList),
|
||||||
distributionMethod, replicationModel,
|
distributionMethod, replicationModel,
|
||||||
shardCount, shardCountIsStrict,
|
shardCount, shardCountIsStrict,
|
||||||
colocateWithTableName,
|
colocateWithTableName,
|
||||||
viaDeprecatedAPI);
|
viaDeprecatedAPI);
|
||||||
|
|
||||||
EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod,
|
EnsureRelationCanBeDistributed(relationId, linitial(distributionColumnList),
|
||||||
|
distributionMethod,
|
||||||
colocationId, replicationModel, viaDeprecatedAPI);
|
colocationId, replicationModel, viaDeprecatedAPI);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -464,7 +540,7 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
||||||
Oid colocatedTableId = ColocatedTableId(colocationId);
|
Oid colocatedTableId = ColocatedTableId(colocationId);
|
||||||
|
|
||||||
/* create an entry for distributed table in pg_dist_partition */
|
/* create an entry for distributed table in pg_dist_partition */
|
||||||
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
|
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumnList,
|
||||||
colocationId, replicationModel);
|
colocationId, replicationModel);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -538,7 +614,7 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
||||||
Oid partitionRelationId = InvalidOid;
|
Oid partitionRelationId = InvalidOid;
|
||||||
foreach_oid(partitionRelationId, partitionList)
|
foreach_oid(partitionRelationId, partitionList)
|
||||||
{
|
{
|
||||||
CreateDistributedTable(partitionRelationId, distributionColumn,
|
CreateDistributedTable(partitionRelationId, distributionColumnList,
|
||||||
distributionMethod, shardCount, false,
|
distributionMethod, shardCount, false,
|
||||||
colocateWithTableName, viaDeprecatedAPI);
|
colocateWithTableName, viaDeprecatedAPI);
|
||||||
}
|
}
|
||||||
|
|
|
@ -363,7 +363,7 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
|
||||||
SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(parentRelationId,
|
SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(parentRelationId,
|
||||||
relationId);
|
relationId);
|
||||||
|
|
||||||
CreateDistributedTable(relationId, parentDistributionColumn,
|
CreateDistributedTable(relationId, list_make1(parentDistributionColumn),
|
||||||
parentDistributionMethod, ShardCount, false,
|
parentDistributionMethod, ShardCount, false,
|
||||||
parentRelationName, viaDeprecatedAPI);
|
parentRelationName, viaDeprecatedAPI);
|
||||||
}
|
}
|
||||||
|
@ -440,7 +440,8 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
|
||||||
SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(
|
SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(
|
||||||
relationId, partitionRelationId);
|
relationId, partitionRelationId);
|
||||||
|
|
||||||
CreateDistributedTable(partitionRelationId, distributionColumn,
|
CreateDistributedTable(partitionRelationId,
|
||||||
|
list_make1(distributionColumn),
|
||||||
distributionMethod, ShardCount, false,
|
distributionMethod, ShardCount, false,
|
||||||
parentRelationName, viaDeprecatedAPI);
|
parentRelationName, viaDeprecatedAPI);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1733,11 +1733,9 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||||
Var *distributionColumn, uint32 colocationId,
|
List *distributionColumnList, uint32 colocationId,
|
||||||
char replicationModel)
|
char replicationModel)
|
||||||
{
|
{
|
||||||
char *distributionColumnString = NULL;
|
|
||||||
|
|
||||||
Datum newValues[Natts_pg_dist_partition];
|
Datum newValues[Natts_pg_dist_partition];
|
||||||
bool newNulls[Natts_pg_dist_partition];
|
bool newNulls[Natts_pg_dist_partition];
|
||||||
|
|
||||||
|
@ -1758,15 +1756,29 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||||
/* set partkey column to NULL for reference tables */
|
/* set partkey column to NULL for reference tables */
|
||||||
if (distributionMethod != DISTRIBUTE_BY_NONE)
|
if (distributionMethod != DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
distributionColumnString = nodeToString((Node *) distributionColumn);
|
Datum *distributionColumnDatumArray =
|
||||||
|
palloc0(list_length(distributionColumnList) * sizeof(Datum));
|
||||||
|
|
||||||
newValues[Anum_pg_dist_partition_partkey - 1] =
|
Node *distributionColumn;
|
||||||
CStringGetTextDatum(distributionColumnString);
|
int distributionColumnIndex = 0;
|
||||||
|
foreach_ptr(distributionColumn, distributionColumnList)
|
||||||
|
{
|
||||||
|
distributionColumnDatumArray[distributionColumnIndex] = CStringGetTextDatum(
|
||||||
|
nodeToString(distributionColumn));
|
||||||
|
distributionColumnIndex++;
|
||||||
|
}
|
||||||
|
newValues[Anum_pg_dist_partition_partkey - 1] = distributionColumnDatumArray[0];
|
||||||
|
ArrayType *distributionColumnArray = DatumArrayToArrayType(
|
||||||
|
distributionColumnDatumArray, list_length(distributionColumnList), TEXTOID);
|
||||||
|
newValues[Anum_pg_dist_partition_partkeys - 1] = PointerGetDatum(
|
||||||
|
distributionColumnArray);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
newValues[Anum_pg_dist_partition_partkey - 1] = PointerGetDatum(NULL);
|
newValues[Anum_pg_dist_partition_partkey - 1] = PointerGetDatum(NULL);
|
||||||
newNulls[Anum_pg_dist_partition_partkey - 1] = true;
|
newNulls[Anum_pg_dist_partition_partkey - 1] = true;
|
||||||
|
newValues[Anum_pg_dist_partition_partkeys - 1] = PointerGetDatum(NULL);
|
||||||
|
newNulls[Anum_pg_dist_partition_partkeys - 1] = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues,
|
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues,
|
||||||
|
|
|
@ -49,3 +49,9 @@ DROP TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger ON pg_catalog.p
|
||||||
DROP FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check();
|
DROP FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check();
|
||||||
|
|
||||||
#include "udfs/citus_cleanup_orphaned_shards/10.1-1.sql"
|
#include "udfs/citus_cleanup_orphaned_shards/10.1-1.sql"
|
||||||
|
|
||||||
|
#include "udfs/create_distributed_table/10.2-1.sql";
|
||||||
|
|
||||||
|
ALTER TABLE pg_catalog.pg_dist_partition ADD COLUMN partkeys text[];
|
||||||
|
UPDATE pg_catalog.pg_dist_partition SET partkeys = ARRAY[partkey] WHERE partkey IS NOT NULL;
|
||||||
|
-- TODO: Maybe drop partkey column
|
||||||
|
|
|
@ -85,3 +85,15 @@ CREATE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger
|
||||||
FOR EACH STATEMENT EXECUTE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check();
|
FOR EACH STATEMENT EXECUTE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check();
|
||||||
|
|
||||||
DROP PROCEDURE pg_catalog.citus_cleanup_orphaned_shards();
|
DROP PROCEDURE pg_catalog.citus_cleanup_orphaned_shards();
|
||||||
|
|
||||||
|
DROP FUNCTION create_distributed_table(table_name regclass,
|
||||||
|
distribution_columns text[],
|
||||||
|
distribution_type citus.distribution_type,
|
||||||
|
colocate_with text,
|
||||||
|
shard_count int);
|
||||||
|
-- TODO: Uncomment once moved to migration for 10.2
|
||||||
|
-- #include "../udfs/create_distributed_table/10.1-1.sql";
|
||||||
|
|
||||||
|
-- TODO: Check that no multi column distribution tables were created
|
||||||
|
ALTER TABLE pg_catalog.pg_dist_partition DROP COLUMN partkeys;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type, text, int);
|
||||||
|
CREATE OR REPLACE FUNCTION create_distributed_table(table_name regclass,
|
||||||
|
distribution_column text,
|
||||||
|
distribution_type citus.distribution_type DEFAULT 'hash',
|
||||||
|
colocate_with text DEFAULT 'default',
|
||||||
|
shard_count int DEFAULT NULL)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'MODULE_PATHNAME', $$create_distributed_table$$;
|
||||||
|
COMMENT ON FUNCTION create_distributed_table(table_name regclass,
|
||||||
|
distribution_column text,
|
||||||
|
distribution_type citus.distribution_type,
|
||||||
|
colocate_with text,
|
||||||
|
shard_count int)
|
||||||
|
IS 'creates a distributed table';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION create_distributed_table(table_name regclass,
|
||||||
|
distribution_columns text[],
|
||||||
|
distribution_type citus.distribution_type DEFAULT 'hash',
|
||||||
|
colocate_with text DEFAULT 'default',
|
||||||
|
shard_count int DEFAULT NULL)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'MODULE_PATHNAME', $$create_distributed_table_multi_column$$;
|
||||||
|
COMMENT ON FUNCTION create_distributed_table(table_name regclass,
|
||||||
|
distribution_columns text[],
|
||||||
|
distribution_type citus.distribution_type,
|
||||||
|
colocate_with text,
|
||||||
|
shard_count int)
|
||||||
|
IS 'creates a distributed table';
|
|
@ -1,4 +1,4 @@
|
||||||
DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type, text);
|
DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type, text, int);
|
||||||
CREATE OR REPLACE FUNCTION create_distributed_table(table_name regclass,
|
CREATE OR REPLACE FUNCTION create_distributed_table(table_name regclass,
|
||||||
distribution_column text,
|
distribution_column text,
|
||||||
distribution_type citus.distribution_type DEFAULT 'hash',
|
distribution_type citus.distribution_type DEFAULT 'hash',
|
||||||
|
@ -13,3 +13,18 @@ COMMENT ON FUNCTION create_distributed_table(table_name regclass,
|
||||||
colocate_with text,
|
colocate_with text,
|
||||||
shard_count int)
|
shard_count int)
|
||||||
IS 'creates a distributed table';
|
IS 'creates a distributed table';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION create_distributed_table(table_name regclass,
|
||||||
|
distribution_columns text[],
|
||||||
|
distribution_type citus.distribution_type DEFAULT 'hash',
|
||||||
|
colocate_with text DEFAULT 'default',
|
||||||
|
shard_count int DEFAULT NULL)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'MODULE_PATHNAME', $$create_distributed_table_multi_column$$;
|
||||||
|
COMMENT ON FUNCTION create_distributed_table(table_name regclass,
|
||||||
|
distribution_columns text[],
|
||||||
|
distribution_type citus.distribution_type,
|
||||||
|
colocate_with text,
|
||||||
|
shard_count int)
|
||||||
|
IS 'creates a distributed table';
|
||||||
|
|
|
@ -233,7 +233,7 @@ extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
||||||
char shardState, uint64 shardLength,
|
char shardState, uint64 shardLength,
|
||||||
int32 groupId);
|
int32 groupId);
|
||||||
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||||
Var *distributionColumn, uint32 colocationId,
|
List *distributionColumnList, uint32 colocationId,
|
||||||
char replicationModel);
|
char replicationModel);
|
||||||
extern void DeletePartitionRow(Oid distributedRelationId);
|
extern void DeletePartitionRow(Oid distributedRelationId);
|
||||||
extern void DeleteShardRow(uint64 shardId);
|
extern void DeleteShardRow(uint64 shardId);
|
||||||
|
@ -242,7 +242,7 @@ extern void UpdatePartitionShardPlacementStates(ShardPlacement *parentShardPlace
|
||||||
extern void MarkShardPlacementInactive(ShardPlacement *shardPlacement);
|
extern void MarkShardPlacementInactive(ShardPlacement *shardPlacement);
|
||||||
extern void UpdateShardPlacementState(uint64 placementId, char shardState);
|
extern void UpdateShardPlacementState(uint64 placementId, char shardState);
|
||||||
extern void DeleteShardPlacementRow(uint64 placementId);
|
extern void DeleteShardPlacementRow(uint64 placementId);
|
||||||
extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,
|
extern void CreateDistributedTable(Oid relationId, List *distributionColumnList,
|
||||||
char distributionMethod, int shardCount,
|
char distributionMethod, int shardCount,
|
||||||
bool shardCountIsStrict, char *colocateWithTableName,
|
bool shardCountIsStrict, char *colocateWithTableName,
|
||||||
bool viaDeprecatedAPI);
|
bool viaDeprecatedAPI);
|
||||||
|
|
|
@ -27,6 +27,7 @@ typedef struct FormData_pg_dist_partition
|
||||||
text partkey; /* partition key expression */
|
text partkey; /* partition key expression */
|
||||||
uint32 colocationid; /* id of the co-location group of particular table belongs to */
|
uint32 colocationid; /* id of the co-location group of particular table belongs to */
|
||||||
char repmodel; /* replication model; see codes below */
|
char repmodel; /* replication model; see codes below */
|
||||||
|
ArrayType partkeys; /* partition key expressions */
|
||||||
#endif
|
#endif
|
||||||
} FormData_pg_dist_partition;
|
} FormData_pg_dist_partition;
|
||||||
|
|
||||||
|
@ -41,12 +42,13 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition;
|
||||||
* compiler constants for pg_dist_partitions
|
* compiler constants for pg_dist_partitions
|
||||||
* ----------------
|
* ----------------
|
||||||
*/
|
*/
|
||||||
#define Natts_pg_dist_partition 5
|
#define Natts_pg_dist_partition 6
|
||||||
#define Anum_pg_dist_partition_logicalrelid 1
|
#define Anum_pg_dist_partition_logicalrelid 1
|
||||||
#define Anum_pg_dist_partition_partmethod 2
|
#define Anum_pg_dist_partition_partmethod 2
|
||||||
#define Anum_pg_dist_partition_partkey 3
|
#define Anum_pg_dist_partition_partkey 3
|
||||||
#define Anum_pg_dist_partition_colocationid 4
|
#define Anum_pg_dist_partition_colocationid 4
|
||||||
#define Anum_pg_dist_partition_repmodel 5
|
#define Anum_pg_dist_partition_repmodel 5
|
||||||
|
#define Anum_pg_dist_partition_partkeys 6
|
||||||
|
|
||||||
/* valid values for partmethod include append, hash, and range */
|
/* valid values for partmethod include append, hash, and range */
|
||||||
#define DISTRIBUTE_BY_APPEND 'a'
|
#define DISTRIBUTE_BY_APPEND 'a'
|
||||||
|
|
|
@ -259,7 +259,7 @@ NOTICE: renaming the new table to columnar_citus_integration.table_option
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option'::regclass;
|
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option'::regclass;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
logicalrelid | partmethod | partkey | colocationid | repmodel | partkeys
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
@ -578,7 +578,7 @@ NOTICE: renaming the new table to columnar_citus_integration.table_option
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option'::regclass;
|
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option'::regclass;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
logicalrelid | partmethod | partkey | colocationid | repmodel | partkeys
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
@ -817,7 +817,7 @@ NOTICE: renaming the new table to columnar_citus_integration.table_option_refer
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option_reference'::regclass;
|
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option_reference'::regclass;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
logicalrelid | partmethod | partkey | colocationid | repmodel | partkeys
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
@ -1050,7 +1050,7 @@ NOTICE: renaming the new table to columnar_citus_integration.table_option_citus
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option_citus_local'::regclass;
|
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option_citus_local'::regclass;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
logicalrelid | partmethod | partkey | colocationid | repmodel | partkeys
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ ORDER BY 1, 2;
|
||||||
|
|
||||||
-- verify there are no tables that could prevent add/remove node operations
|
-- verify there are no tables that could prevent add/remove node operations
|
||||||
SELECT * FROM pg_dist_partition;
|
SELECT * FROM pg_dist_partition;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
logicalrelid | partmethod | partkey | colocationid | repmodel | partkeys
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
CREATE SCHEMA multi_column_distribution;
|
||||||
|
SET search_path TO multi_column_distribution;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.next_shard_id TO 27905500;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 27905500;
|
||||||
|
create table t(id int, a int);
|
||||||
|
select create_distributed_table('t', ARRAY['id'], colocate_with := 'none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%');
|
||||||
|
logicalrelid | partmethod | partkey | colocationid | repmodel | partkeys
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905500 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}"}
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
create table t2(id int, a int);
|
||||||
|
select create_distributed_table('t2', ARRAY['id', 'a'], colocate_with := 'none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%');
|
||||||
|
logicalrelid | partmethod | partkey | colocationid | repmodel | partkeys
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905500 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}"}
|
||||||
|
t2 | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 27905501 | s | {"{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}","{VAR :varno 1 :varattno 2 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 2 :location -1}"}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA multi_column_distribution CASCADE;
|
|
@ -639,12 +639,13 @@ SELECT * FROM print_extension_changes();
|
||||||
| function citus_cleanup_orphaned_shards()
|
| function citus_cleanup_orphaned_shards()
|
||||||
| function citus_local_disk_space_stats() record
|
| function citus_local_disk_space_stats() record
|
||||||
| function create_distributed_table(regclass,text,citus.distribution_type,text,integer) void
|
| function create_distributed_table(regclass,text,citus.distribution_type,text,integer) void
|
||||||
|
| function create_distributed_table(regclass,text[],citus.distribution_type,text,integer) void
|
||||||
| function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint)
|
| function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint)
|
||||||
| function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name,real) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer)
|
| function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name,real) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer)
|
||||||
| function worker_partitioned_relation_size(regclass) bigint
|
| function worker_partitioned_relation_size(regclass) bigint
|
||||||
| function worker_partitioned_relation_total_size(regclass) bigint
|
| function worker_partitioned_relation_total_size(regclass) bigint
|
||||||
| function worker_partitioned_table_size(regclass) bigint
|
| function worker_partitioned_table_size(regclass) bigint
|
||||||
(15 rows)
|
(16 rows)
|
||||||
|
|
||||||
-- Test downgrade to 10.1-1 from 10.2-1
|
-- Test downgrade to 10.1-1 from 10.2-1
|
||||||
ALTER EXTENSION citus UPDATE TO '10.2-1';
|
ALTER EXTENSION citus UPDATE TO '10.2-1';
|
||||||
|
|
|
@ -60,7 +60,7 @@ DROP TABLE testtableddl;
|
||||||
RESET citus.shard_replication_factor;
|
RESET citus.shard_replication_factor;
|
||||||
-- ensure no metadata of distributed tables are remaining
|
-- ensure no metadata of distributed tables are remaining
|
||||||
SELECT * FROM pg_dist_partition;
|
SELECT * FROM pg_dist_partition;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
logicalrelid | partmethod | partkey | colocationid | repmodel | partkeys
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
|
|
@ -1246,7 +1246,7 @@ NOTICE: renaming the new table to single_node.test_2
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'test_2'::regclass;
|
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'test_2'::regclass;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
logicalrelid | partmethod | partkey | colocationid | repmodel | partkeys
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
|
|
@ -109,6 +109,7 @@ ORDER BY 1;
|
||||||
function coord_combine_agg_sfunc(internal,oid,cstring,anyelement)
|
function coord_combine_agg_sfunc(internal,oid,cstring,anyelement)
|
||||||
function create_distributed_function(regprocedure,text,text)
|
function create_distributed_function(regprocedure,text,text)
|
||||||
function create_distributed_table(regclass,text,citus.distribution_type,text,integer)
|
function create_distributed_table(regclass,text,citus.distribution_type,text,integer)
|
||||||
|
function create_distributed_table(regclass,text[],citus.distribution_type,text,integer)
|
||||||
function create_intermediate_result(text,text)
|
function create_intermediate_result(text,text)
|
||||||
function create_reference_table(regclass)
|
function create_reference_table(regclass)
|
||||||
function distributed_tables_colocated(regclass,regclass)
|
function distributed_tables_colocated(regclass,regclass)
|
||||||
|
@ -248,5 +249,5 @@ ORDER BY 1;
|
||||||
view citus_worker_stat_activity
|
view citus_worker_stat_activity
|
||||||
view pg_dist_shard_placement
|
view pg_dist_shard_placement
|
||||||
view time_partitions
|
view time_partitions
|
||||||
(232 rows)
|
(233 rows)
|
||||||
|
|
||||||
|
|
|
@ -61,7 +61,7 @@ test: multi_explain hyperscale_tutorial partitioned_intermediate_results distrib
|
||||||
test: multi_basic_queries cross_join multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
test: multi_basic_queries cross_join multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
||||||
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
|
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
|
||||||
test: sql_procedure multi_function_in_join row_types materialized_view undistribute_table
|
test: sql_procedure multi_function_in_join row_types materialized_view undistribute_table
|
||||||
test: multi_subquery_in_where_reference_clause adaptive_executor propagate_set_commands geqo
|
test: multi_subquery_in_where_reference_clause adaptive_executor propagate_set_commands geqo multi_column_distribution
|
||||||
# this should be run alone as it gets too many clients
|
# this should be run alone as it gets too many clients
|
||||||
test: join_pushdown
|
test: join_pushdown
|
||||||
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc statement_cancel_error_message
|
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc statement_cancel_error_message
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
CREATE SCHEMA multi_column_distribution;
|
||||||
|
SET search_path TO multi_column_distribution;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.next_shard_id TO 27905500;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 27905500;
|
||||||
|
|
||||||
|
create table t(id int, a int);
|
||||||
|
select create_distributed_table('t', ARRAY['id'], colocate_with := 'none');
|
||||||
|
select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%');
|
||||||
|
|
||||||
|
create table t2(id int, a int);
|
||||||
|
select create_distributed_table('t2', ARRAY['id', 'a'], colocate_with := 'none');
|
||||||
|
select * from pg_dist_partition WHERE NOT (logicalrelid::text LIKE '%.%');
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA multi_column_distribution CASCADE;
|
Loading…
Reference in New Issue