mirror of https://github.com/citusdata/citus.git
Merge 41bb6fb3be
into 9d4e586457
commit
bb5d2e0e73
|
@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
||||||
5.2-1 5.2-2 5.2-3 5.2-4 \
|
5.2-1 5.2-2 5.2-3 5.2-4 \
|
||||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
||||||
6.1-1 6.1-2 6.1-3
|
6.1-1 6.1-2 6.1-3 6.1-4
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -101,6 +101,8 @@ $(EXTENSION)--6.1-2.sql: $(EXTENSION)--6.1-1.sql $(EXTENSION)--6.1-1--6.1-2.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql
|
$(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--6.1-4.sql: $(EXTENSION)--6.1-3.sql $(EXTENSION)--6.1-3--6.1-4.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
/* citus--6.1-3--6.1-4.sql */
|
||||||
|
|
||||||
|
SET search_path = 'pg_catalog';
|
||||||
|
|
||||||
|
ALTER TABLE pg_dist_colocation ADD COLUMN defaultgroup BOOLEAN;
|
||||||
|
|
||||||
|
UPDATE pg_dist_colocation SET defaultgroup = TRUE;
|
||||||
|
|
||||||
|
DROP INDEX pg_dist_colocation_configuration_index;
|
||||||
|
|
||||||
|
CREATE INDEX pg_dist_colocation_configuration_index
|
||||||
|
ON pg_dist_colocation USING btree(shardcount, replicationfactor, distributioncolumntype, defaultgroup);
|
||||||
|
|
||||||
|
DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type);
|
||||||
|
|
||||||
|
CREATE FUNCTION create_distributed_table(table_name regclass,
|
||||||
|
distribution_column text,
|
||||||
|
distribution_type citus.distribution_type DEFAULT 'hash',
|
||||||
|
colocate_with text DEFAULT 'default')
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$create_distributed_table$$;
|
||||||
|
COMMENT ON FUNCTION create_distributed_table(table_name regclass,
|
||||||
|
distribution_column text,
|
||||||
|
distribution_type citus.distribution_type,
|
||||||
|
colocate_with text)
|
||||||
|
IS 'creates a distributed table';
|
||||||
|
|
||||||
|
CREATE FUNCTION mark_colocation_group_default(colocation_id integer)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
SET search_path = pg_catalog
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
shard_count integer;
|
||||||
|
replication_factor integer;
|
||||||
|
distribution_column_type oid;
|
||||||
|
BEGIN
|
||||||
|
-- get colocation group configuration
|
||||||
|
SELECT
|
||||||
|
shardcount,
|
||||||
|
replicationfactor,
|
||||||
|
distributioncolumntype
|
||||||
|
INTO
|
||||||
|
shard_count,
|
||||||
|
replication_factor,
|
||||||
|
distribution_column_type
|
||||||
|
FROM
|
||||||
|
pg_dist_colocation
|
||||||
|
WHERE
|
||||||
|
colocationid = colocation_id;
|
||||||
|
|
||||||
|
-- set all defaults to false
|
||||||
|
UPDATE
|
||||||
|
pg_dist_colocation
|
||||||
|
SET
|
||||||
|
defaultgroup = false
|
||||||
|
WHERE
|
||||||
|
shardcount = shard_count AND
|
||||||
|
replicationfactor = replication_factor AND
|
||||||
|
distributioncolumntype = distribution_column_type;
|
||||||
|
|
||||||
|
-- set new default colocation group
|
||||||
|
UPDATE
|
||||||
|
pg_dist_colocation
|
||||||
|
SET
|
||||||
|
defaultgroup = true
|
||||||
|
WHERE
|
||||||
|
colocationid = colocation_id;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
|
COMMENT ON FUNCTION mark_colocation_group_default(colocation_id integer)
|
||||||
|
IS 'marks given colocation group as default';
|
||||||
|
|
||||||
|
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '6.1-3'
|
default_version = '6.1-4'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -9,7 +9,6 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
#include "miscadmin.h"
|
|
||||||
|
|
||||||
#include "access/genam.h"
|
#include "access/genam.h"
|
||||||
#include "access/hash.h"
|
#include "access/hash.h"
|
||||||
|
@ -33,7 +32,6 @@
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
#include "commands/sequence.h"
|
|
||||||
#include "commands/trigger.h"
|
#include "commands/trigger.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/distribution_column.h"
|
#include "distributed/distribution_column.h"
|
||||||
|
@ -41,7 +39,6 @@
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_logical_planner.h"
|
#include "distributed/multi_logical_planner.h"
|
||||||
#include "distributed/pg_dist_colocation.h"
|
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
#include "executor/spi.h"
|
#include "executor/spi.h"
|
||||||
#include "nodes/execnodes.h"
|
#include "nodes/execnodes.h"
|
||||||
|
@ -63,6 +60,7 @@
|
||||||
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
char distributionMethod, uint32 colocationId);
|
char distributionMethod, uint32 colocationId);
|
||||||
static char LookupDistributionMethod(Oid distributionMethodOid);
|
static char LookupDistributionMethod(Oid distributionMethodOid);
|
||||||
|
static Oid ColumnType(Oid relationId, char *columnName);
|
||||||
static void RecordDistributedRelationDependencies(Oid distributedRelationId,
|
static void RecordDistributedRelationDependencies(Oid distributedRelationId,
|
||||||
Node *distributionKey);
|
Node *distributionKey);
|
||||||
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
|
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
|
||||||
|
@ -76,10 +74,8 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation,
|
||||||
uint32 colocationId);
|
uint32 colocationId);
|
||||||
static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||||
Var *distributionColumn, uint32 colocationId);
|
Var *distributionColumn, uint32 colocationId);
|
||||||
static uint32 ColocationId(int shardCount, int replicationFactor,
|
|
||||||
Oid distributionColumnType);
|
|
||||||
static uint32 GetNextColocationId(void);
|
|
||||||
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
|
char *colocateWithTableName,
|
||||||
int shardCount, int replicationFactor);
|
int shardCount, int replicationFactor);
|
||||||
|
|
||||||
|
|
||||||
|
@ -123,10 +119,29 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
text *distributionColumnText = PG_GETARG_TEXT_P(1);
|
text *distributionColumnText = PG_GETARG_TEXT_P(1);
|
||||||
Oid distributionMethodOid = PG_GETARG_OID(2);
|
Oid distributionMethodOid = PG_GETARG_OID(2);
|
||||||
|
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
|
||||||
|
|
||||||
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
||||||
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
||||||
|
|
||||||
|
/* check if we try to colocate with hash distributed tables */
|
||||||
|
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
|
||||||
|
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
|
||||||
|
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
|
||||||
|
{
|
||||||
|
Oid colocateWithTableOid = ResolveRelationId(colocateWithTableNameText);
|
||||||
|
char colocateWithTableDistributionMethod = PartitionMethod(colocateWithTableOid);
|
||||||
|
|
||||||
|
if (colocateWithTableDistributionMethod != DISTRIBUTE_BY_HASH ||
|
||||||
|
distributionMethod != DISTRIBUTE_BY_HASH)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot distribute relation"),
|
||||||
|
errdetail("Currently, colocate_with option is only supported "
|
||||||
|
"for hash distributed tables.")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* if distribution method is not hash, just create partition metadata */
|
/* if distribution method is not hash, just create partition metadata */
|
||||||
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
||||||
{
|
{
|
||||||
|
@ -136,7 +151,8 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* use configuration values for shard count and shard replication factor */
|
/* use configuration values for shard count and shard replication factor */
|
||||||
CreateHashDistributedTable(relationId, distributionColumnName, ShardCount,
|
CreateHashDistributedTable(relationId, distributionColumnName,
|
||||||
|
colocateWithTableName, ShardCount,
|
||||||
ShardReplicationFactor);
|
ShardReplicationFactor);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
|
@ -154,6 +170,7 @@ create_reference_table(PG_FUNCTION_ARGS)
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
int shardCount = 1;
|
int shardCount = 1;
|
||||||
AttrNumber firstColumnAttrNumber = 1;
|
AttrNumber firstColumnAttrNumber = 1;
|
||||||
|
char *colocateWithTableName = "default";
|
||||||
|
|
||||||
char *firstColumnName = get_attname(relationId, firstColumnAttrNumber);
|
char *firstColumnName = get_attname(relationId, firstColumnAttrNumber);
|
||||||
if (firstColumnName == NULL)
|
if (firstColumnName == NULL)
|
||||||
|
@ -164,8 +181,8 @@ create_reference_table(PG_FUNCTION_ARGS)
|
||||||
"least one column", relationName)));
|
"least one column", relationName)));
|
||||||
}
|
}
|
||||||
|
|
||||||
CreateHashDistributedTable(relationId, firstColumnName, shardCount,
|
CreateHashDistributedTable(relationId, firstColumnName, colocateWithTableName,
|
||||||
ShardReplicationFactor);
|
shardCount, ShardReplicationFactor);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -725,6 +742,19 @@ LookupDistributionMethod(Oid distributionMethodOid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColumnType returns the column type of the given column.
|
||||||
|
*/
|
||||||
|
static Oid
|
||||||
|
ColumnType(Oid relationId, char *columnName)
|
||||||
|
{
|
||||||
|
AttrNumber columnIndex = get_attnum(relationId, columnName);
|
||||||
|
Oid columnType = get_atttype(relationId, columnIndex);
|
||||||
|
|
||||||
|
return columnType;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SupportFunctionForColumn locates a support function given a column, an access method,
|
* SupportFunctionForColumn locates a support function given a column, an access method,
|
||||||
* and and id of a support function. This function returns InvalidOid if there is no
|
* and and id of a support function. This function returns InvalidOid if there is no
|
||||||
|
@ -848,148 +878,23 @@ CreateTruncateTrigger(Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ColocationId searches pg_dist_colocation for shard count, replication factor
|
|
||||||
* and distribution column type. If a matching entry is found, it returns the
|
|
||||||
* colocation id, otherwise it returns INVALID_COLOCATION_ID.
|
|
||||||
*/
|
|
||||||
static uint32
|
|
||||||
ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
|
|
||||||
{
|
|
||||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
|
||||||
HeapTuple colocationTuple = NULL;
|
|
||||||
SysScanDesc scanDescriptor;
|
|
||||||
const int scanKeyCount = 3;
|
|
||||||
ScanKeyData scanKey[scanKeyCount];
|
|
||||||
bool indexOK = true;
|
|
||||||
|
|
||||||
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
|
|
||||||
|
|
||||||
/* set scan arguments */
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount,
|
|
||||||
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount));
|
|
||||||
ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor,
|
|
||||||
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor));
|
|
||||||
ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype,
|
|
||||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType));
|
|
||||||
|
|
||||||
scanDescriptor = systable_beginscan(pgDistColocation,
|
|
||||||
DistColocationConfigurationIndexId(),
|
|
||||||
indexOK, NULL, scanKeyCount, scanKey);
|
|
||||||
|
|
||||||
colocationTuple = systable_getnext(scanDescriptor);
|
|
||||||
if (HeapTupleIsValid(colocationTuple))
|
|
||||||
{
|
|
||||||
Form_pg_dist_colocation colocationForm =
|
|
||||||
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);
|
|
||||||
|
|
||||||
colocationId = colocationForm->colocationid;
|
|
||||||
}
|
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
|
||||||
heap_close(pgDistColocation, AccessShareLock);
|
|
||||||
|
|
||||||
return colocationId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CreateColocationGroup creates a new colocation id and writes it into
|
|
||||||
* pg_dist_colocation with the given configuration. It also returns the created
|
|
||||||
* colocation id.
|
|
||||||
*/
|
|
||||||
uint32
|
|
||||||
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType)
|
|
||||||
{
|
|
||||||
uint32 colocationId = GetNextColocationId();
|
|
||||||
Relation pgDistColocation = NULL;
|
|
||||||
TupleDesc tupleDescriptor = NULL;
|
|
||||||
HeapTuple heapTuple = NULL;
|
|
||||||
Datum values[Natts_pg_dist_colocation];
|
|
||||||
bool isNulls[Natts_pg_dist_colocation];
|
|
||||||
|
|
||||||
/* form new colocation tuple */
|
|
||||||
memset(values, 0, sizeof(values));
|
|
||||||
memset(isNulls, false, sizeof(isNulls));
|
|
||||||
|
|
||||||
values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId);
|
|
||||||
values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount);
|
|
||||||
values[Anum_pg_dist_colocation_replicationfactor - 1] =
|
|
||||||
UInt32GetDatum(replicationFactor);
|
|
||||||
values[Anum_pg_dist_colocation_distributioncolumntype - 1] =
|
|
||||||
ObjectIdGetDatum(distributionColumnType);
|
|
||||||
|
|
||||||
/* open colocation relation and insert the new tuple */
|
|
||||||
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
|
|
||||||
|
|
||||||
tupleDescriptor = RelationGetDescr(pgDistColocation);
|
|
||||||
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
|
||||||
|
|
||||||
simple_heap_insert(pgDistColocation, heapTuple);
|
|
||||||
CatalogUpdateIndexes(pgDistColocation, heapTuple);
|
|
||||||
|
|
||||||
/* increment the counter so that next command can see the row */
|
|
||||||
CommandCounterIncrement();
|
|
||||||
heap_close(pgDistColocation, RowExclusiveLock);
|
|
||||||
|
|
||||||
return colocationId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* GetNextColocationId allocates and returns a unique colocationId for the
|
|
||||||
* colocation group to be created. This allocation occurs both in shared memory
|
|
||||||
* and in write ahead logs; writing to logs avoids the risk of having
|
|
||||||
* colocationId collisions.
|
|
||||||
*
|
|
||||||
* Please note that the caller is still responsible for finalizing colocationId
|
|
||||||
* with the master node. Further note that this function relies on an internal
|
|
||||||
* sequence created in initdb to generate unique identifiers.
|
|
||||||
*/
|
|
||||||
static uint32
|
|
||||||
GetNextColocationId()
|
|
||||||
{
|
|
||||||
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
|
|
||||||
Oid sequenceId = ResolveRelationId(sequenceName);
|
|
||||||
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
|
||||||
Oid savedUserId = InvalidOid;
|
|
||||||
int savedSecurityContext = 0;
|
|
||||||
Datum colocationIdDatum = 0;
|
|
||||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
|
||||||
|
|
||||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
|
||||||
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
|
||||||
|
|
||||||
/* generate new and unique colocation id from sequence */
|
|
||||||
colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
|
|
||||||
|
|
||||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
|
||||||
|
|
||||||
colocationId = DatumGetUInt32(colocationIdDatum);
|
|
||||||
|
|
||||||
return colocationId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateHashDistributedTable creates a hash distributed table with given
|
* CreateHashDistributedTable creates a hash distributed table with given
|
||||||
* shard count and shard replication factor.
|
* shard count and shard replication factor.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
|
char *colocateWithTableName,
|
||||||
int shardCount, int replicationFactor)
|
int shardCount, int replicationFactor)
|
||||||
{
|
{
|
||||||
Relation distributedRelation = NULL;
|
Relation distributedRelation = NULL;
|
||||||
Relation pgDistColocation = NULL;
|
Relation pgDistColocation = NULL;
|
||||||
Var *distributionColumn = NULL;
|
|
||||||
Oid distributionColumnType = 0;
|
|
||||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||||
|
Oid colocationTableId = InvalidOid;
|
||||||
|
Oid distributionColumnType = InvalidOid;
|
||||||
|
|
||||||
/* get distribution column type */
|
/* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */
|
||||||
distributedRelation = relation_open(relationId, AccessShareLock);
|
distributedRelation = relation_open(relationId, AccessShareLock);
|
||||||
distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation,
|
|
||||||
distributionColumnName);
|
|
||||||
distributionColumnType = distributionColumn->vartype;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Get an exclusive lock on the colocation system catalog. Therefore, we
|
* Get an exclusive lock on the colocation system catalog. Therefore, we
|
||||||
|
@ -998,35 +903,66 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
*/
|
*/
|
||||||
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
|
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
|
||||||
|
|
||||||
/* check for existing colocations */
|
/* get distribution column data type */
|
||||||
colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType);
|
distributionColumnType = ColumnType(relationId, distributionColumnName);
|
||||||
|
|
||||||
/*
|
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
|
||||||
* If there is a colocation group for the current configuration, get a
|
|
||||||
* colocated table from the group and use its shards as a reference to
|
|
||||||
* create new shards. Otherwise, create a new colocation group and create
|
|
||||||
* shards with the default round robin algorithm.
|
|
||||||
*/
|
|
||||||
if (colocationId != INVALID_COLOCATION_ID)
|
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
/* check for default colocation group */
|
||||||
|
colocationId = DefaultColocationGroupId(shardCount, replicationFactor,
|
||||||
Oid colocatedTableId = ColocatedTableId(colocationId);
|
distributionColumnType);
|
||||||
ConvertToDistributedTable(relationId, distributionColumnName,
|
if (colocationId == INVALID_COLOCATION_ID)
|
||||||
DISTRIBUTE_BY_HASH, colocationId);
|
{
|
||||||
|
bool defaultColocationGroup = true;
|
||||||
CreateColocatedShards(relationId, colocatedTableId);
|
colocationId = CreateColocationGroup(shardCount, replicationFactor,
|
||||||
ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d",
|
distributionColumnType,
|
||||||
relationName, colocationId)));
|
defaultColocationGroup);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
colocationTableId = ColocatedTableId(colocationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
bool defaultColocationGroup = false;
|
||||||
colocationId = CreateColocationGroup(shardCount, replicationFactor,
|
colocationId = CreateColocationGroup(shardCount, replicationFactor,
|
||||||
distributionColumnType);
|
distributionColumnType,
|
||||||
ConvertToDistributedTable(relationId, distributionColumnName,
|
defaultColocationGroup);
|
||||||
DISTRIBUTE_BY_HASH, colocationId);
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Var *colocationTablePartitionColumn = NULL;
|
||||||
|
Oid colocationTablePartitionColumnType = InvalidOid;
|
||||||
|
|
||||||
/* use the default way to create shards */
|
/* get colocation group of the target table */
|
||||||
|
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
|
||||||
|
colocationTableId = ResolveRelationId(colocateWithTableNameText);
|
||||||
|
|
||||||
|
colocationId = TableColocationId(colocationTableId);
|
||||||
|
|
||||||
|
colocationTablePartitionColumn = PartitionKey(colocationTableId);
|
||||||
|
colocationTablePartitionColumnType = colocationTablePartitionColumn->vartype;
|
||||||
|
|
||||||
|
if (colocationTablePartitionColumnType != distributionColumnType)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot colocate with %s", colocateWithTableName),
|
||||||
|
errdetail("Distribution column types are different.")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* create distributed table metadata */
|
||||||
|
ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH,
|
||||||
|
colocationId);
|
||||||
|
|
||||||
|
/* create shards */
|
||||||
|
if (colocationTableId != InvalidOid)
|
||||||
|
{
|
||||||
|
CreateColocatedShards(relationId, colocationTableId);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor);
|
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,20 +10,24 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
#include "access/genam.h"
|
#include "access/genam.h"
|
||||||
#include "access/heapam.h"
|
#include "access/heapam.h"
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "catalog/indexing.h"
|
#include "catalog/indexing.h"
|
||||||
|
#include "commands/sequence.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_logical_planner.h"
|
#include "distributed/multi_logical_planner.h"
|
||||||
|
#include "distributed/pg_dist_colocation.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
#include "utils/builtins.h"
|
||||||
#include "utils/fmgroids.h"
|
#include "utils/fmgroids.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
@ -36,7 +40,10 @@ static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval,
|
||||||
ShardInterval *rightShardInterval);
|
ShardInterval *rightShardInterval);
|
||||||
static int CompareShardPlacementsByNode(const void *leftElement,
|
static int CompareShardPlacementsByNode(const void *leftElement,
|
||||||
const void *rightElement);
|
const void *rightElement);
|
||||||
|
static uint32 GetNextColocationId(void);
|
||||||
static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId);
|
static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId);
|
||||||
|
static List * ColocationGroupTableList(Oid colocationId);
|
||||||
|
static void DeleteColocationGroup(uint32 colocationId);
|
||||||
|
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
|
@ -86,11 +93,13 @@ static void
|
||||||
MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
|
MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
|
||||||
{
|
{
|
||||||
uint32 sourceColocationId = INVALID_COLOCATION_ID;
|
uint32 sourceColocationId = INVALID_COLOCATION_ID;
|
||||||
|
uint32 targetColocationId = INVALID_COLOCATION_ID;
|
||||||
Relation pgDistColocation = NULL;
|
Relation pgDistColocation = NULL;
|
||||||
Var *sourceDistributionColumn = NULL;
|
Var *sourceDistributionColumn = NULL;
|
||||||
Var *targetDistributionColumn = NULL;
|
Var *targetDistributionColumn = NULL;
|
||||||
Oid sourceDistributionColumnType = InvalidOid;
|
Oid sourceDistributionColumnType = InvalidOid;
|
||||||
Oid targetDistributionColumnType = InvalidOid;
|
Oid targetDistributionColumnType = InvalidOid;
|
||||||
|
bool defaultColocationGroup = false;
|
||||||
|
|
||||||
CheckHashPartitionedTable(sourceRelationId);
|
CheckHashPartitionedTable(sourceRelationId);
|
||||||
CheckHashPartitionedTable(targetRelationId);
|
CheckHashPartitionedTable(targetRelationId);
|
||||||
|
@ -132,15 +141,39 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
|
||||||
{
|
{
|
||||||
uint32 shardCount = ShardIntervalCount(sourceRelationId);
|
uint32 shardCount = ShardIntervalCount(sourceRelationId);
|
||||||
uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId);
|
uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId);
|
||||||
|
uint32 defaultColocationId = INVALID_COLOCATION_ID;
|
||||||
|
|
||||||
|
/* check if there is a default colocation group */
|
||||||
|
defaultColocationId = DefaultColocationGroupId(shardCount, shardReplicationFactor,
|
||||||
|
sourceDistributionColumnType);
|
||||||
|
if (defaultColocationId == INVALID_COLOCATION_ID)
|
||||||
|
{
|
||||||
|
defaultColocationGroup = true;
|
||||||
|
}
|
||||||
|
|
||||||
sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor,
|
sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor,
|
||||||
sourceDistributionColumnType);
|
sourceDistributionColumnType,
|
||||||
|
defaultColocationGroup);
|
||||||
UpdateRelationColocationGroup(sourceRelationId, sourceColocationId);
|
UpdateRelationColocationGroup(sourceRelationId, sourceColocationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
targetColocationId = TableColocationId(targetRelationId);
|
||||||
|
|
||||||
/* finally set colocation group for the target relation */
|
/* finally set colocation group for the target relation */
|
||||||
UpdateRelationColocationGroup(targetRelationId, sourceColocationId);
|
UpdateRelationColocationGroup(targetRelationId, sourceColocationId);
|
||||||
|
|
||||||
|
/* if there is not any remaining table in the colocation group, delete it */
|
||||||
|
if (targetColocationId != INVALID_COLOCATION_ID)
|
||||||
|
{
|
||||||
|
List *colocatedTableList = ColocationGroupTableList(targetColocationId);
|
||||||
|
int colocatedTableCount = list_length(colocatedTableList);
|
||||||
|
|
||||||
|
if (colocatedTableCount == 0)
|
||||||
|
{
|
||||||
|
DeleteColocationGroup(targetColocationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
heap_close(pgDistColocation, NoLock);
|
heap_close(pgDistColocation, NoLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -339,6 +372,150 @@ CompareShardPlacementsByNode(const void *leftElement, const void *rightElement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TableColocationId function returns co-location id of given table. This function
|
||||||
|
* errors out if given table is not distributed.
|
||||||
|
*/
|
||||||
|
uint32
|
||||||
|
TableColocationId(Oid distributedTableId)
|
||||||
|
{
|
||||||
|
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
||||||
|
|
||||||
|
return cacheEntry->colocationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DefaultColocationId searches pg_dist_colocation for the default colocation group
|
||||||
|
* with the given configuration: shard count, replication factor and distribution
|
||||||
|
* column type. If a matching entry is found, it returns the colocation id,
|
||||||
|
* otherwise it returns INVALID_COLOCATION_ID.
|
||||||
|
*/
|
||||||
|
uint32
|
||||||
|
DefaultColocationGroupId(int shardCount, int replicationFactor,
|
||||||
|
Oid distributionColumnType)
|
||||||
|
{
|
||||||
|
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||||
|
HeapTuple colocationTuple = NULL;
|
||||||
|
SysScanDesc scanDescriptor;
|
||||||
|
const int scanKeyCount = 4;
|
||||||
|
ScanKeyData scanKey[scanKeyCount];
|
||||||
|
bool indexOK = true;
|
||||||
|
bool defaultColocationGroup = true;
|
||||||
|
|
||||||
|
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
|
||||||
|
|
||||||
|
/* set scan arguments */
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount,
|
||||||
|
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount));
|
||||||
|
ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor,
|
||||||
|
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor));
|
||||||
|
ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType));
|
||||||
|
ScanKeyInit(&scanKey[3], Anum_pg_dist_colocation_defaultgroup,
|
||||||
|
BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(defaultColocationGroup));
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan(pgDistColocation,
|
||||||
|
DistColocationConfigurationIndexId(),
|
||||||
|
indexOK, NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
colocationTuple = systable_getnext(scanDescriptor);
|
||||||
|
if (HeapTupleIsValid(colocationTuple))
|
||||||
|
{
|
||||||
|
Form_pg_dist_colocation colocationForm =
|
||||||
|
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);
|
||||||
|
|
||||||
|
colocationId = colocationForm->colocationid;
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
heap_close(pgDistColocation, AccessShareLock);
|
||||||
|
|
||||||
|
return colocationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateColocationGroup creates a new colocation id and writes it into
|
||||||
|
* pg_dist_colocation with the given configuration. It also returns the created
|
||||||
|
* colocation id.
|
||||||
|
*/
|
||||||
|
uint32
|
||||||
|
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType,
|
||||||
|
bool defaultColocationGroup)
|
||||||
|
{
|
||||||
|
uint32 colocationId = GetNextColocationId();
|
||||||
|
Relation pgDistColocation = NULL;
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
Datum values[Natts_pg_dist_colocation];
|
||||||
|
bool isNulls[Natts_pg_dist_colocation];
|
||||||
|
|
||||||
|
/* form new colocation tuple */
|
||||||
|
memset(values, 0, sizeof(values));
|
||||||
|
memset(isNulls, false, sizeof(isNulls));
|
||||||
|
|
||||||
|
values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId);
|
||||||
|
values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount);
|
||||||
|
values[Anum_pg_dist_colocation_replicationfactor - 1] =
|
||||||
|
UInt32GetDatum(replicationFactor);
|
||||||
|
values[Anum_pg_dist_colocation_distributioncolumntype - 1] =
|
||||||
|
ObjectIdGetDatum(distributionColumnType);
|
||||||
|
values[Anum_pg_dist_colocation_defaultgroup - 1] =
|
||||||
|
BoolGetDatum(defaultColocationGroup);
|
||||||
|
|
||||||
|
/* open colocation relation and insert the new tuple */
|
||||||
|
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
|
||||||
|
|
||||||
|
tupleDescriptor = RelationGetDescr(pgDistColocation);
|
||||||
|
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
||||||
|
|
||||||
|
simple_heap_insert(pgDistColocation, heapTuple);
|
||||||
|
CatalogUpdateIndexes(pgDistColocation, heapTuple);
|
||||||
|
|
||||||
|
/* increment the counter so that next command can see the row */
|
||||||
|
CommandCounterIncrement();
|
||||||
|
heap_close(pgDistColocation, RowExclusiveLock);
|
||||||
|
|
||||||
|
return colocationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetNextColocationId allocates and returns a unique colocationId for the
|
||||||
|
* colocation group to be created. This allocation occurs both in shared memory
|
||||||
|
* and in write ahead logs; writing to logs avoids the risk of having
|
||||||
|
* colocationId collisions.
|
||||||
|
*
|
||||||
|
* Please note that the caller is still responsible for finalizing colocationId
|
||||||
|
* with the master node. Further note that this function relies on an internal
|
||||||
|
* sequence created in initdb to generate unique identifiers.
|
||||||
|
*/
|
||||||
|
static uint32
|
||||||
|
GetNextColocationId(void)
|
||||||
|
{
|
||||||
|
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
|
||||||
|
Oid sequenceId = ResolveRelationId(sequenceName);
|
||||||
|
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
||||||
|
Oid savedUserId = InvalidOid;
|
||||||
|
int savedSecurityContext = 0;
|
||||||
|
Datum colocationIdDatum = 0;
|
||||||
|
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||||
|
|
||||||
|
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
|
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||||
|
|
||||||
|
/* generate new and unique colocation id from sequence */
|
||||||
|
colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
|
||||||
|
|
||||||
|
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||||
|
|
||||||
|
colocationId = DatumGetUInt32(colocationIdDatum);
|
||||||
|
|
||||||
|
return colocationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UpdateRelationColocationGroup updates colocation group in pg_dist_partition
|
* UpdateRelationColocationGroup updates colocation group in pg_dist_partition
|
||||||
* for the given relation.
|
* for the given relation.
|
||||||
|
@ -396,19 +573,6 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* TableColocationId function returns co-location id of given table. This function
|
|
||||||
* errors out if given table is not distributed.
|
|
||||||
*/
|
|
||||||
uint32
|
|
||||||
TableColocationId(Oid distributedTableId)
|
|
||||||
{
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
|
||||||
|
|
||||||
return cacheEntry->colocationId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TablesColocated function checks whether given two tables are co-located and
|
* TablesColocated function checks whether given two tables are co-located and
|
||||||
* returns true if they are co-located. A table is always co-located with itself.
|
* returns true if they are co-located. A table is always co-located with itself.
|
||||||
|
@ -475,6 +639,30 @@ ColocatedTableList(Oid distributedTableId)
|
||||||
uint32 tableColocationId = TableColocationId(distributedTableId);
|
uint32 tableColocationId = TableColocationId(distributedTableId);
|
||||||
List *colocatedTableList = NIL;
|
List *colocatedTableList = NIL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If distribution type of the table is not hash, the table is only co-located
|
||||||
|
* with itself.
|
||||||
|
*/
|
||||||
|
if (tableColocationId == INVALID_COLOCATION_ID)
|
||||||
|
{
|
||||||
|
colocatedTableList = lappend_oid(colocatedTableList, distributedTableId);
|
||||||
|
return colocatedTableList;
|
||||||
|
}
|
||||||
|
|
||||||
|
colocatedTableList = ColocationGroupTableList(tableColocationId);
|
||||||
|
|
||||||
|
return colocatedTableList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColocationGroupTableList returns the list of tables in the given colocation
|
||||||
|
* group. If the colocation group is INVALID_COLOCATION_ID, it returns NIL.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
ColocationGroupTableList(Oid colocationId)
|
||||||
|
{
|
||||||
|
List *colocatedTableList = NIL;
|
||||||
Relation pgDistPartition = NULL;
|
Relation pgDistPartition = NULL;
|
||||||
TupleDesc tupleDescriptor = NULL;
|
TupleDesc tupleDescriptor = NULL;
|
||||||
SysScanDesc scanDescriptor = NULL;
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
@ -487,14 +675,13 @@ ColocatedTableList(Oid distributedTableId)
|
||||||
* If distribution type of the table is not hash, the table is only co-located
|
* If distribution type of the table is not hash, the table is only co-located
|
||||||
* with itself.
|
* with itself.
|
||||||
*/
|
*/
|
||||||
if (tableColocationId == INVALID_COLOCATION_ID)
|
if (colocationId == INVALID_COLOCATION_ID)
|
||||||
{
|
{
|
||||||
colocatedTableList = lappend_oid(colocatedTableList, distributedTableId);
|
return NIL;
|
||||||
return colocatedTableList;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
|
||||||
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(tableColocationId));
|
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId));
|
||||||
|
|
||||||
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
|
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
|
||||||
tupleDescriptor = RelationGetDescr(pgDistPartition);
|
tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
@ -633,3 +820,44 @@ ColocatedShardIdInRelation(Oid relationId, int shardIndex)
|
||||||
|
|
||||||
return tableCacheEntry->sortedShardIntervalArray[shardIndex]->shardId;
|
return tableCacheEntry->sortedShardIntervalArray[shardIndex]->shardId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DeleteColocationGroup deletes the colocation group from pg_dist_colocation.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
DeleteColocationGroup(uint32 colocationId)
|
||||||
|
{
|
||||||
|
Relation pgDistColocation = NULL;
|
||||||
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
int scanKeyCount = 1;
|
||||||
|
ScanKeyData scanKey[scanKeyCount];
|
||||||
|
bool indexOK = false;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
|
||||||
|
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid,
|
||||||
|
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId));
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK,
|
||||||
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
if (!HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not find valid entry for colocation group %d",
|
||||||
|
colocationId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
simple_heap_delete(pgDistColocation, &(heapTuple->t_self));
|
||||||
|
|
||||||
|
CatalogUpdateIndexes(pgDistColocation, heapTuple);
|
||||||
|
CitusInvalidateRelcacheByRelid(DistColocationRelationId());
|
||||||
|
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
heap_close(pgDistColocation, RowExclusiveLock);
|
||||||
|
|
||||||
|
/* increment the counter so that next command can see the row */
|
||||||
|
CommandCounterIncrement();
|
||||||
|
}
|
||||||
|
|
|
@ -260,8 +260,9 @@ DistributedTableCacheEntry(Oid distributedRelationId)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("relation %u is not distributed",
|
char *relationName = get_rel_name(distributedRelationId);
|
||||||
distributedRelationId)));
|
ereport(ERROR, (errmsg("relation %s is not distributed",
|
||||||
|
relationName)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,11 @@
|
||||||
#define INVALID_COLOCATION_ID 0
|
#define INVALID_COLOCATION_ID 0
|
||||||
|
|
||||||
extern uint32 TableColocationId(Oid distributedTableId);
|
extern uint32 TableColocationId(Oid distributedTableId);
|
||||||
|
extern uint32 DefaultColocationGroupId(int shardCount, int replicationFactor,
|
||||||
|
Oid distributionColumnType);
|
||||||
|
extern uint32 CreateColocationGroup(int shardCount, int replicationFactor,
|
||||||
|
Oid distributionColumnType,
|
||||||
|
bool defaultColocationGroup);
|
||||||
extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId);
|
extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId);
|
||||||
extern bool ShardsColocated(ShardInterval *leftShardInterval,
|
extern bool ShardsColocated(ShardInterval *leftShardInterval,
|
||||||
ShardInterval *rightShardInterval);
|
ShardInterval *rightShardInterval);
|
||||||
|
@ -25,8 +30,6 @@ extern List * ColocatedTableList(Oid distributedTableId);
|
||||||
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
|
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
|
||||||
extern Oid ColocatedTableId(Oid colocationId);
|
extern Oid ColocatedTableId(Oid colocationId);
|
||||||
extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex);
|
extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex);
|
||||||
extern uint32 CreateColocationGroup(int shardCount, int replicationFactor,
|
|
||||||
Oid distributionColumnType);
|
|
||||||
|
|
||||||
|
|
||||||
#endif /* COLOCATION_UTILS_H_ */
|
#endif /* COLOCATION_UTILS_H_ */
|
||||||
|
|
|
@ -22,6 +22,7 @@ typedef struct FormData_pg_dist_colocation
|
||||||
uint32 shardcount;
|
uint32 shardcount;
|
||||||
uint32 replicationfactor;
|
uint32 replicationfactor;
|
||||||
Oid distributioncolumntype;
|
Oid distributioncolumntype;
|
||||||
|
bool defaultgroup;
|
||||||
} FormData_pg_dist_colocation;
|
} FormData_pg_dist_colocation;
|
||||||
|
|
||||||
/* ----------------
|
/* ----------------
|
||||||
|
@ -35,11 +36,12 @@ typedef FormData_pg_dist_colocation *Form_pg_dist_colocation;
|
||||||
* compiler constants for pg_dist_colocation
|
* compiler constants for pg_dist_colocation
|
||||||
* ----------------
|
* ----------------
|
||||||
*/
|
*/
|
||||||
#define Natts_pg_dist_colocation 4
|
#define Natts_pg_dist_colocation 5
|
||||||
#define Anum_pg_dist_colocation_colocationid 1
|
#define Anum_pg_dist_colocation_colocationid 1
|
||||||
#define Anum_pg_dist_colocation_shardcount 2
|
#define Anum_pg_dist_colocation_shardcount 2
|
||||||
#define Anum_pg_dist_colocation_replicationfactor 3
|
#define Anum_pg_dist_colocation_replicationfactor 3
|
||||||
#define Anum_pg_dist_colocation_distributioncolumntype 4
|
#define Anum_pg_dist_colocation_distributioncolumntype 4
|
||||||
|
#define Anum_pg_dist_colocation_defaultgroup 5
|
||||||
|
|
||||||
#define COLOCATIONID_SEQUENCE_NAME "pg_dist_colocationid_seq"
|
#define COLOCATIONID_SEQUENCE_NAME "pg_dist_colocationid_seq"
|
||||||
|
|
||||||
|
|
|
@ -432,12 +432,12 @@ NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
|
||||||
SELECT * FROM pg_dist_colocation
|
SELECT * FROM pg_dist_colocation
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY colocationid;
|
ORDER BY colocationid;
|
||||||
colocationid | shardcount | replicationfactor | distributioncolumntype
|
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
|
||||||
--------------+------------+-------------------+------------------------
|
--------------+------------+-------------------+------------------------+--------------
|
||||||
1 | 2 | 2 | 23
|
1 | 2 | 2 | 23 | t
|
||||||
2 | 2 | 1 | 23
|
2 | 2 | 1 | 23 | t
|
||||||
3 | 2 | 2 | 25
|
3 | 2 | 2 | 25 | t
|
||||||
4 | 4 | 2 | 23
|
4 | 4 | 2 | 23 | t
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
@ -459,16 +459,16 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
-- check effects of dropping tables
|
-- check effects of dropping tables
|
||||||
DROP TABLE table1_groupA;
|
DROP TABLE table1_groupA;
|
||||||
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
|
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
|
||||||
colocationid | shardcount | replicationfactor | distributioncolumntype
|
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
|
||||||
--------------+------------+-------------------+------------------------
|
--------------+------------+-------------------+------------------------+--------------
|
||||||
1 | 2 | 2 | 23
|
1 | 2 | 2 | 23 | t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- dropping all tables in a colocation group also deletes the colocation group
|
-- dropping all tables in a colocation group also deletes the colocation group
|
||||||
DROP TABLE table2_groupA;
|
DROP TABLE table2_groupA;
|
||||||
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
|
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
|
||||||
colocationid | shardcount | replicationfactor | distributioncolumntype
|
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
|
||||||
--------------+------------+-------------------+------------------------
|
--------------+------------+-------------------+------------------------+--------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
-- create dropped colocation group again
|
-- create dropped colocation group again
|
||||||
|
@ -504,6 +504,118 @@ SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- test colocate_with option
|
||||||
|
CREATE TABLE table1_group_none_1 ( id int );
|
||||||
|
SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE table2_group_none_1 ( id int );
|
||||||
|
SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE table1_group_none_2 ( id int );
|
||||||
|
SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE table4_groupE ( id int );
|
||||||
|
SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.shard_count = 3;
|
||||||
|
-- check that this new configuration does not have a default group
|
||||||
|
CREATE TABLE table1_group_none_3 ( id int );
|
||||||
|
SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- a new table does not use a non-default group
|
||||||
|
CREATE TABLE table1_group_default ( id int );
|
||||||
|
SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- check metadata
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
|
||||||
|
--------------+------------+-------------------+------------------------+--------------
|
||||||
|
2 | 2 | 1 | 23 | t
|
||||||
|
3 | 2 | 2 | 25 | t
|
||||||
|
4 | 4 | 2 | 23 | t
|
||||||
|
5 | 2 | 2 | 23 | t
|
||||||
|
6 | 2 | 2 | 23 | f
|
||||||
|
7 | 2 | 2 | 23 | f
|
||||||
|
8 | 3 | 2 | 23 | f
|
||||||
|
9 | 3 | 2 | 23 | t
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid, logicalrelid;
|
||||||
|
logicalrelid | colocationid
|
||||||
|
----------------------------------+--------------
|
||||||
|
table1_groupb | 2
|
||||||
|
table2_groupb | 2
|
||||||
|
table1_groupc | 3
|
||||||
|
table2_groupc | 3
|
||||||
|
table1_groupd | 4
|
||||||
|
table2_groupd | 4
|
||||||
|
table3_groupd | 4
|
||||||
|
table1_groupe | 5
|
||||||
|
table2_groupe | 5
|
||||||
|
table3_groupe | 5
|
||||||
|
schema_collocation.table4_groupe | 5
|
||||||
|
table4_groupe | 5
|
||||||
|
table1_group_none_1 | 6
|
||||||
|
table2_group_none_1 | 6
|
||||||
|
table1_group_none_2 | 7
|
||||||
|
table1_group_none_3 | 8
|
||||||
|
table1_group_default | 9
|
||||||
|
(17 rows)
|
||||||
|
|
||||||
|
-- check failing colocate_with options
|
||||||
|
CREATE TABLE table_postgresql( id int );
|
||||||
|
CREATE TABLE table_failing ( id int );
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append');
|
||||||
|
ERROR: cannot distribute relation
|
||||||
|
DETAIL: Currently, colocate_with option is only supported for hash distributed tables.
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE');
|
||||||
|
ERROR: cannot distribute relation
|
||||||
|
DETAIL: Currently, colocate_with option is only supported for hash distributed tables.
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql');
|
||||||
|
ERROR: relation table_postgresql is not distributed
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table');
|
||||||
|
ERROR: relation "no_table" does not exist
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', colocate_with => '');
|
||||||
|
ERROR: invalid name syntax
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL);
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- check with different distribution column types
|
||||||
|
CREATE TABLE table_bigint ( id bigint );
|
||||||
|
SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE');
|
||||||
|
ERROR: cannot colocate with table1_groupE
|
||||||
|
DETAIL: Distribution column types are different.
|
||||||
-- check worker table schemas
|
-- check worker table schemas
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
\d table3_groupE_1300050
|
\d table3_groupE_1300050
|
||||||
|
@ -538,14 +650,74 @@ SELECT create_reference_table('table2_groupF');
|
||||||
SELECT * FROM pg_dist_colocation
|
SELECT * FROM pg_dist_colocation
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY colocationid;
|
ORDER BY colocationid;
|
||||||
colocationid | shardcount | replicationfactor | distributioncolumntype
|
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
|
||||||
--------------+------------+-------------------+------------------------
|
--------------+------------+-------------------+------------------------+--------------
|
||||||
2 | 2 | 1 | 23
|
2 | 2 | 1 | 23 | t
|
||||||
3 | 2 | 2 | 25
|
3 | 2 | 2 | 25 | t
|
||||||
4 | 4 | 2 | 23
|
4 | 4 | 2 | 23 | t
|
||||||
5 | 2 | 2 | 23
|
5 | 2 | 2 | 23 | t
|
||||||
6 | 1 | 2 | 23
|
6 | 2 | 2 | 23 | f
|
||||||
(5 rows)
|
7 | 2 | 2 | 23 | f
|
||||||
|
8 | 3 | 2 | 23 | f
|
||||||
|
9 | 3 | 2 | 23 | t
|
||||||
|
10 | 1 | 2 | 23 | t
|
||||||
|
(9 rows)
|
||||||
|
|
||||||
|
-- test mark_colocation_group_default()
|
||||||
|
SELECT mark_colocation_group_default(7);
|
||||||
|
mark_colocation_group_default
|
||||||
|
-------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT mark_colocation_group_default(8);
|
||||||
|
mark_colocation_group_default
|
||||||
|
-------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- check metadata after mark_colocation_group_default() is run
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
|
||||||
|
--------------+------------+-------------------+------------------------+--------------
|
||||||
|
2 | 2 | 1 | 23 | t
|
||||||
|
3 | 2 | 2 | 25 | t
|
||||||
|
4 | 4 | 2 | 23 | t
|
||||||
|
5 | 2 | 2 | 23 | f
|
||||||
|
6 | 2 | 2 | 23 | f
|
||||||
|
7 | 2 | 2 | 23 | t
|
||||||
|
8 | 3 | 2 | 23 | t
|
||||||
|
9 | 3 | 2 | 23 | f
|
||||||
|
10 | 1 | 2 | 23 | t
|
||||||
|
(9 rows)
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid, logicalrelid;
|
||||||
|
logicalrelid | colocationid
|
||||||
|
----------------------------------+--------------
|
||||||
|
table1_groupb | 2
|
||||||
|
table2_groupb | 2
|
||||||
|
table1_groupc | 3
|
||||||
|
table2_groupc | 3
|
||||||
|
table1_groupd | 4
|
||||||
|
table2_groupd | 4
|
||||||
|
table3_groupd | 4
|
||||||
|
table1_groupe | 5
|
||||||
|
table2_groupe | 5
|
||||||
|
table3_groupe | 5
|
||||||
|
schema_collocation.table4_groupe | 5
|
||||||
|
table4_groupe | 5
|
||||||
|
table1_group_none_1 | 6
|
||||||
|
table2_group_none_1 | 6
|
||||||
|
table1_group_none_2 | 7
|
||||||
|
table1_group_none_3 | 8
|
||||||
|
table1_group_default | 9
|
||||||
|
table1_groupf | 10
|
||||||
|
table2_groupf | 10
|
||||||
|
(19 rows)
|
||||||
|
|
||||||
-- cross check with internal colocation API
|
-- cross check with internal colocation API
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -564,7 +736,7 @@ ORDER BY
|
||||||
table1,
|
table1,
|
||||||
table2;
|
table2;
|
||||||
table1 | table2 | colocated
|
table1 | table2 | colocated
|
||||||
---------------+----------------------------------+-----------
|
----------------------------------+----------------------------------+-----------
|
||||||
table1_group1 | table2_group1 | t
|
table1_group1 | table2_group1 | t
|
||||||
table1_groupb | table2_groupb | t
|
table1_groupb | table2_groupb | t
|
||||||
table1_groupc | table2_groupc | t
|
table1_groupc | table2_groupc | t
|
||||||
|
@ -574,11 +746,16 @@ ORDER BY
|
||||||
table1_groupe | table2_groupe | t
|
table1_groupe | table2_groupe | t
|
||||||
table1_groupe | table3_groupe | t
|
table1_groupe | table3_groupe | t
|
||||||
table1_groupe | schema_collocation.table4_groupe | t
|
table1_groupe | schema_collocation.table4_groupe | t
|
||||||
|
table1_groupe | table4_groupe | t
|
||||||
table2_groupe | table3_groupe | t
|
table2_groupe | table3_groupe | t
|
||||||
table2_groupe | schema_collocation.table4_groupe | t
|
table2_groupe | schema_collocation.table4_groupe | t
|
||||||
|
table2_groupe | table4_groupe | t
|
||||||
table3_groupe | schema_collocation.table4_groupe | t
|
table3_groupe | schema_collocation.table4_groupe | t
|
||||||
|
table3_groupe | table4_groupe | t
|
||||||
|
schema_collocation.table4_groupe | table4_groupe | t
|
||||||
|
table1_group_none_1 | table2_group_none_1 | t
|
||||||
table1_groupf | table2_groupf | t
|
table1_groupf | table2_groupf | t
|
||||||
(13 rows)
|
(18 rows)
|
||||||
|
|
||||||
-- check created shards
|
-- check created shards
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -653,11 +830,39 @@ ORDER BY
|
||||||
schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1
|
schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1
|
||||||
schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647
|
schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647
|
||||||
schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647
|
schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647
|
||||||
table1_groupf | 1300054 | t | 57637 | -2147483648 | 2147483647
|
table1_group_none_1 | 1300054 | t | 57637 | -2147483648 | -1
|
||||||
table1_groupf | 1300054 | t | 57638 | -2147483648 | 2147483647
|
table1_group_none_1 | 1300054 | t | 57638 | -2147483648 | -1
|
||||||
table2_groupf | 1300055 | t | 57638 | -2147483648 | 2147483647
|
table1_group_none_1 | 1300055 | t | 57638 | 0 | 2147483647
|
||||||
table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647
|
table1_group_none_1 | 1300055 | t | 57637 | 0 | 2147483647
|
||||||
(56 rows)
|
table2_group_none_1 | 1300056 | t | 57638 | -2147483648 | -1
|
||||||
|
table2_group_none_1 | 1300056 | t | 57637 | -2147483648 | -1
|
||||||
|
table2_group_none_1 | 1300057 | t | 57637 | 0 | 2147483647
|
||||||
|
table2_group_none_1 | 1300057 | t | 57638 | 0 | 2147483647
|
||||||
|
table1_group_none_2 | 1300058 | t | 57637 | -2147483648 | -1
|
||||||
|
table1_group_none_2 | 1300058 | t | 57638 | -2147483648 | -1
|
||||||
|
table1_group_none_2 | 1300059 | t | 57638 | 0 | 2147483647
|
||||||
|
table1_group_none_2 | 1300059 | t | 57637 | 0 | 2147483647
|
||||||
|
table4_groupe | 1300060 | t | 57637 | -2147483648 | -1
|
||||||
|
table4_groupe | 1300060 | t | 57638 | -2147483648 | -1
|
||||||
|
table4_groupe | 1300061 | t | 57638 | 0 | 2147483647
|
||||||
|
table4_groupe | 1300061 | t | 57637 | 0 | 2147483647
|
||||||
|
table1_group_none_3 | 1300062 | t | 57637 | -2147483648 | -715827884
|
||||||
|
table1_group_none_3 | 1300062 | t | 57638 | -2147483648 | -715827884
|
||||||
|
table1_group_none_3 | 1300063 | t | 57638 | -715827883 | 715827881
|
||||||
|
table1_group_none_3 | 1300063 | t | 57637 | -715827883 | 715827881
|
||||||
|
table1_group_none_3 | 1300064 | t | 57637 | 715827882 | 2147483647
|
||||||
|
table1_group_none_3 | 1300064 | t | 57638 | 715827882 | 2147483647
|
||||||
|
table1_group_default | 1300065 | t | 57637 | -2147483648 | -715827884
|
||||||
|
table1_group_default | 1300065 | t | 57638 | -2147483648 | -715827884
|
||||||
|
table1_group_default | 1300066 | t | 57638 | -715827883 | 715827881
|
||||||
|
table1_group_default | 1300066 | t | 57637 | -715827883 | 715827881
|
||||||
|
table1_group_default | 1300067 | t | 57637 | 715827882 | 2147483647
|
||||||
|
table1_group_default | 1300067 | t | 57638 | 715827882 | 2147483647
|
||||||
|
table1_groupf | 1300068 | t | 57637 | -2147483648 | 2147483647
|
||||||
|
table1_groupf | 1300068 | t | 57638 | -2147483648 | 2147483647
|
||||||
|
table2_groupf | 1300069 | t | 57638 | -2147483648 | 2147483647
|
||||||
|
table2_groupf | 1300069 | t | 57637 | -2147483648 | 2147483647
|
||||||
|
(84 rows)
|
||||||
|
|
||||||
-- reset colocation ids to test mark_tables_colocated
|
-- reset colocation ids to test mark_tables_colocated
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;
|
||||||
|
@ -669,13 +874,13 @@ UPDATE pg_dist_partition SET colocationid = 0
|
||||||
SELECT * FROM pg_dist_colocation
|
SELECT * FROM pg_dist_colocation
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY colocationid;
|
ORDER BY colocationid;
|
||||||
colocationid | shardcount | replicationfactor | distributioncolumntype
|
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
|
||||||
--------------+------------+-------------------+------------------------
|
--------------+------------+-------------------+------------------------+--------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY logicalrelid;
|
ORDER BY colocationid, logicalrelid;
|
||||||
logicalrelid | colocationid
|
logicalrelid | colocationid
|
||||||
--------------+--------------
|
--------------+--------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
@ -700,13 +905,13 @@ DETAIL: Shard counts don't match for table1_groupb and table1_groupd.
|
||||||
SELECT * FROM pg_dist_colocation
|
SELECT * FROM pg_dist_colocation
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY colocationid;
|
ORDER BY colocationid;
|
||||||
colocationid | shardcount | replicationfactor | distributioncolumntype
|
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
|
||||||
--------------+------------+-------------------+------------------------
|
--------------+------------+-------------------+------------------------+--------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY logicalrelid;
|
ORDER BY colocationid, logicalrelid;
|
||||||
logicalrelid | colocationid
|
logicalrelid | colocationid
|
||||||
--------------+--------------
|
--------------+--------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
@ -749,18 +954,27 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']);
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.shard_count = 2;
|
||||||
|
CREATE TABLE table5_groupE ( id int );
|
||||||
|
SELECT create_distributed_table('table5_groupE', 'id', colocate_with => 'NONE');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- check metadata to see colocation groups are created successfully
|
-- check metadata to see colocation groups are created successfully
|
||||||
SELECT * FROM pg_dist_colocation
|
SELECT * FROM pg_dist_colocation
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY colocationid;
|
ORDER BY colocationid;
|
||||||
colocationid | shardcount | replicationfactor | distributioncolumntype
|
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
|
||||||
--------------+------------+-------------------+------------------------
|
--------------+------------+-------------------+------------------------+--------------
|
||||||
2 | 2 | 1 | 23
|
2 | 2 | 1 | 23 | t
|
||||||
3 | 2 | 2 | 25
|
3 | 2 | 2 | 25 | t
|
||||||
4 | 4 | 2 | 23
|
4 | 4 | 2 | 23 | t
|
||||||
5 | 2 | 2 | 23
|
5 | 2 | 2 | 23 | t
|
||||||
6 | 1 | 2 | 23
|
6 | 1 | 2 | 23 | t
|
||||||
(5 rows)
|
7 | 2 | 2 | 23 | f
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
@ -778,5 +992,45 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
table3_groupe | 5
|
table3_groupe | 5
|
||||||
table1_groupf | 6
|
table1_groupf | 6
|
||||||
table2_groupf | 6
|
table2_groupf | 6
|
||||||
(11 rows)
|
table5_groupe | 7
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
-- move the only table in colocation group 7 to colocation group 5
|
||||||
|
SELECT mark_tables_colocated('table1_groupE', ARRAY['table5_groupE']);
|
||||||
|
mark_tables_colocated
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- check metadata to see that unused colocation group is deleted
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
|
||||||
|
--------------+------------+-------------------+------------------------+--------------
|
||||||
|
2 | 2 | 1 | 23 | t
|
||||||
|
3 | 2 | 2 | 25 | t
|
||||||
|
4 | 4 | 2 | 23 | t
|
||||||
|
5 | 2 | 2 | 23 | t
|
||||||
|
6 | 1 | 2 | 23 | t
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid, logicalrelid;
|
||||||
|
logicalrelid | colocationid
|
||||||
|
---------------+--------------
|
||||||
|
table1_groupb | 2
|
||||||
|
table2_groupb | 2
|
||||||
|
table1_groupc | 3
|
||||||
|
table2_groupc | 3
|
||||||
|
table1_groupd | 4
|
||||||
|
table2_groupd | 4
|
||||||
|
table1_groupe | 5
|
||||||
|
table2_groupe | 5
|
||||||
|
table3_groupe | 5
|
||||||
|
table5_groupe | 5
|
||||||
|
table1_groupf | 6
|
||||||
|
table2_groupf | 6
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
|
|
@ -156,7 +156,7 @@ SELECT partition_type('events_hash');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT partition_type('pg_type');
|
SELECT partition_type('pg_type');
|
||||||
ERROR: relation 1247 is not distributed
|
ERROR: relation pg_type is not distributed
|
||||||
-- should see true for events_hash, false for others
|
-- should see true for events_hash, false for others
|
||||||
SELECT is_distributed_table('events_hash');
|
SELECT is_distributed_table('events_hash');
|
||||||
is_distributed_table
|
is_distributed_table
|
||||||
|
|
|
@ -61,6 +61,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-18';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-1';
|
ALTER EXTENSION citus UPDATE TO '6.1-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-2';
|
ALTER EXTENSION citus UPDATE TO '6.1-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-3';
|
ALTER EXTENSION citus UPDATE TO '6.1-3';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.1-4';
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
SELECT COUNT(*)
|
SELECT COUNT(*)
|
||||||
FROM pg_depend AS pgd,
|
FROM pg_depend AS pgd,
|
||||||
|
|
|
@ -238,8 +238,8 @@ Indexes:
|
||||||
|
|
||||||
-- Check that pg_dist_colocation is not synced
|
-- Check that pg_dist_colocation is not synced
|
||||||
SELECT * FROM pg_dist_colocation ORDER BY colocationid;
|
SELECT * FROM pg_dist_colocation ORDER BY colocationid;
|
||||||
colocationid | shardcount | replicationfactor | distributioncolumntype
|
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
|
||||||
--------------+------------+-------------------+------------------------
|
--------------+------------+-------------------+------------------------+--------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
-- Make sure that truncate trigger has been set for the MX table on worker
|
-- Make sure that truncate trigger has been set for the MX table on worker
|
||||||
|
|
|
@ -241,6 +241,53 @@ CREATE SCHEMA schema_collocation;
|
||||||
CREATE TABLE schema_collocation.table4_groupE ( id int );
|
CREATE TABLE schema_collocation.table4_groupE ( id int );
|
||||||
SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
|
SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
|
||||||
|
|
||||||
|
-- test colocate_with option
|
||||||
|
CREATE TABLE table1_group_none_1 ( id int );
|
||||||
|
SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none');
|
||||||
|
|
||||||
|
CREATE TABLE table2_group_none_1 ( id int );
|
||||||
|
SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1');
|
||||||
|
|
||||||
|
CREATE TABLE table1_group_none_2 ( id int );
|
||||||
|
SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none');
|
||||||
|
|
||||||
|
CREATE TABLE table4_groupE ( id int );
|
||||||
|
SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default');
|
||||||
|
|
||||||
|
SET citus.shard_count = 3;
|
||||||
|
|
||||||
|
-- check that this new configuration does not have a default group
|
||||||
|
CREATE TABLE table1_group_none_3 ( id int );
|
||||||
|
SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE');
|
||||||
|
|
||||||
|
-- a new table does not use a non-default group
|
||||||
|
CREATE TABLE table1_group_default ( id int );
|
||||||
|
SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT');
|
||||||
|
|
||||||
|
-- check metadata
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid, logicalrelid;
|
||||||
|
|
||||||
|
-- check failing colocate_with options
|
||||||
|
CREATE TABLE table_postgresql( id int );
|
||||||
|
CREATE TABLE table_failing ( id int );
|
||||||
|
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append');
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE');
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql');
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table');
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', colocate_with => '');
|
||||||
|
SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL);
|
||||||
|
|
||||||
|
-- check with different distribution column types
|
||||||
|
CREATE TABLE table_bigint ( id bigint );
|
||||||
|
SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE');
|
||||||
|
|
||||||
-- check worker table schemas
|
-- check worker table schemas
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
\d table3_groupE_1300050
|
\d table3_groupE_1300050
|
||||||
|
@ -259,6 +306,19 @@ SELECT * FROM pg_dist_colocation
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY colocationid;
|
ORDER BY colocationid;
|
||||||
|
|
||||||
|
-- test mark_colocation_group_default()
|
||||||
|
SELECT mark_colocation_group_default(7);
|
||||||
|
SELECT mark_colocation_group_default(8);
|
||||||
|
|
||||||
|
-- check metadata after mark_colocation_group_default() is run
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid, logicalrelid;
|
||||||
|
|
||||||
-- cross check with internal colocation API
|
-- cross check with internal colocation API
|
||||||
SELECT
|
SELECT
|
||||||
p1.logicalrelid::regclass AS table1,
|
p1.logicalrelid::regclass AS table1,
|
||||||
|
@ -310,7 +370,7 @@ SELECT * FROM pg_dist_colocation
|
||||||
|
|
||||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY logicalrelid;
|
ORDER BY colocationid, logicalrelid;
|
||||||
|
|
||||||
-- first check failing cases
|
-- first check failing cases
|
||||||
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']);
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']);
|
||||||
|
@ -326,7 +386,7 @@ SELECT * FROM pg_dist_colocation
|
||||||
|
|
||||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY logicalrelid;
|
ORDER BY colocationid, logicalrelid;
|
||||||
|
|
||||||
-- check successfully cololated tables
|
-- check successfully cololated tables
|
||||||
SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']);
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']);
|
||||||
|
@ -338,6 +398,11 @@ SELECT mark_tables_colocated('table1_groupF', ARRAY['table2_groupF']);
|
||||||
-- check to colocate with itself
|
-- check to colocate with itself
|
||||||
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']);
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']);
|
||||||
|
|
||||||
|
SET citus.shard_count = 2;
|
||||||
|
|
||||||
|
CREATE TABLE table5_groupE ( id int );
|
||||||
|
SELECT create_distributed_table('table5_groupE', 'id', colocate_with => 'NONE');
|
||||||
|
|
||||||
-- check metadata to see colocation groups are created successfully
|
-- check metadata to see colocation groups are created successfully
|
||||||
SELECT * FROM pg_dist_colocation
|
SELECT * FROM pg_dist_colocation
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
@ -346,3 +411,15 @@ SELECT * FROM pg_dist_colocation
|
||||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY logicalrelid;
|
ORDER BY logicalrelid;
|
||||||
|
|
||||||
|
-- move the only table in colocation group 7 to colocation group 5
|
||||||
|
SELECT mark_tables_colocated('table1_groupE', ARRAY['table5_groupE']);
|
||||||
|
|
||||||
|
-- check metadata to see that unused colocation group is deleted
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid, logicalrelid;
|
||||||
|
|
|
@ -61,6 +61,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-18';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-1';
|
ALTER EXTENSION citus UPDATE TO '6.1-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-2';
|
ALTER EXTENSION citus UPDATE TO '6.1-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-3';
|
ALTER EXTENSION citus UPDATE TO '6.1-3';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.1-4';
|
||||||
|
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
SELECT COUNT(*)
|
SELECT COUNT(*)
|
||||||
|
|
Loading…
Reference in New Issue