mirror of https://github.com/citusdata/citus.git
Introduce citus_internal_update_relation_colocation
update_distributed_table_colocation can be called by the relation owner, and internally it updates pg_dist_partition. With this commit, update_distributed_table_colocation uses an internal UDF to access pg_dist_partition. As a result, this operation can now be done by regular users on MX.pull/5130/head
parent
ef6a8604ba
commit
482b8096e9
|
@ -103,8 +103,9 @@ static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS);
|
||||||
|
|
||||||
|
|
||||||
static bool ShouldSkipMetadataChecks(void);
|
static bool ShouldSkipMetadataChecks(void);
|
||||||
static void EnsurePartitionMetadataIsSane(char distributionMethod, int colocationId,
|
static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod,
|
||||||
char replicationModel);
|
int colocationId, char replicationModel,
|
||||||
|
Var *distributionKey);
|
||||||
static void EnsureCoordinatorInitiatedOperation(void);
|
static void EnsureCoordinatorInitiatedOperation(void);
|
||||||
static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
|
static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
|
||||||
text *shardMinValue,
|
text *shardMinValue,
|
||||||
|
@ -128,6 +129,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
|
||||||
|
PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation);
|
||||||
|
|
||||||
|
|
||||||
static bool got_SIGTERM = false;
|
static bool got_SIGTERM = false;
|
||||||
|
@ -1020,10 +1022,9 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
|
||||||
{
|
{
|
||||||
StringInfo command = makeStringInfo();
|
StringInfo command = makeStringInfo();
|
||||||
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
||||||
appendStringInfo(command, "UPDATE pg_dist_partition "
|
appendStringInfo(command,
|
||||||
"SET colocationid = %d "
|
"SELECT citus_internal_update_relation_colocation(%s::regclass, %d)",
|
||||||
"WHERE logicalrelid = %s::regclass",
|
quote_literal_cstr(qualifiedRelationName), colocationId);
|
||||||
colocationId, quote_literal_cstr(qualifiedRelationName));
|
|
||||||
|
|
||||||
return command->data;
|
return command->data;
|
||||||
}
|
}
|
||||||
|
@ -2122,8 +2123,8 @@ citus_internal_add_partition_metadata(PG_FUNCTION_ARGS)
|
||||||
* metadata is not sane, the user can only affect its own tables.
|
* metadata is not sane, the user can only affect its own tables.
|
||||||
* Given that the user is owner of the table, we should allow.
|
* Given that the user is owner of the table, we should allow.
|
||||||
*/
|
*/
|
||||||
EnsurePartitionMetadataIsSane(distributionMethod, colocationId,
|
EnsurePartitionMetadataIsSane(relationId, distributionMethod, colocationId,
|
||||||
replicationModel);
|
replicationModel, distributionColumnVar);
|
||||||
}
|
}
|
||||||
|
|
||||||
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumnVar,
|
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumnVar,
|
||||||
|
@ -2138,8 +2139,8 @@ citus_internal_add_partition_metadata(PG_FUNCTION_ARGS)
|
||||||
* for inserting into pg_dist_partition metadata.
|
* for inserting into pg_dist_partition metadata.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
EnsurePartitionMetadataIsSane(char distributionMethod, int colocationId,
|
EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod, int colocationId,
|
||||||
char replicationModel)
|
char replicationModel, Var *distributionColumnVar)
|
||||||
{
|
{
|
||||||
if (!(distributionMethod == DISTRIBUTE_BY_HASH ||
|
if (!(distributionMethod == DISTRIBUTE_BY_HASH ||
|
||||||
distributionMethod == DISTRIBUTE_BY_NONE))
|
distributionMethod == DISTRIBUTE_BY_NONE))
|
||||||
|
@ -2155,6 +2156,26 @@ EnsurePartitionMetadataIsSane(char distributionMethod, int colocationId,
|
||||||
errmsg("Metadata syncing is only allowed for valid "
|
errmsg("Metadata syncing is only allowed for valid "
|
||||||
"colocation id values.")));
|
"colocation id values.")));
|
||||||
}
|
}
|
||||||
|
else if (colocationId != INVALID_COLOCATION_ID &&
|
||||||
|
distributionMethod == DISTRIBUTE_BY_HASH)
|
||||||
|
{
|
||||||
|
int count = 1;
|
||||||
|
List *targetColocatedTableList =
|
||||||
|
ColocationGroupTableList(colocationId, count);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we have any colocated hash tables, ensure if they share the
|
||||||
|
* same distribution key properties.
|
||||||
|
*/
|
||||||
|
if (list_length(targetColocatedTableList) >= 1)
|
||||||
|
{
|
||||||
|
Oid targetRelationId = linitial_oid(targetColocatedTableList);
|
||||||
|
|
||||||
|
EnsureColumnTypeEquality(relationId, targetRelationId, distributionColumnVar,
|
||||||
|
DistPartitionKeyOrError(targetRelationId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
if (!(replicationModel == REPLICATION_MODEL_2PC ||
|
if (!(replicationModel == REPLICATION_MODEL_2PC ||
|
||||||
replicationModel == REPLICATION_MODEL_STREAMING ||
|
replicationModel == REPLICATION_MODEL_STREAMING ||
|
||||||
|
@ -2612,3 +2633,64 @@ citus_internal_delete_shard_metadata(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_internal_update_relation_colocation is an internal UDF to
|
||||||
|
* delete a row in pg_dist_shard and corresponding placement rows
|
||||||
|
* from pg_dist_shard_placement.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_internal_update_relation_colocation(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
|
uint32 tagetColocationId = PG_GETARG_UINT32(1);
|
||||||
|
|
||||||
|
EnsureTableOwner(relationId);
|
||||||
|
|
||||||
|
if (!ShouldSkipMetadataChecks())
|
||||||
|
{
|
||||||
|
/* this UDF is not allowed allowed for executing as a separate command */
|
||||||
|
EnsureCoordinatorInitiatedOperation();
|
||||||
|
|
||||||
|
/* ensure that the table is in pg_dist_partition */
|
||||||
|
char partitionMethod = PartitionMethodViaCatalog(relationId);
|
||||||
|
if (partitionMethod == DISTRIBUTE_BY_INVALID)
|
||||||
|
{
|
||||||
|
/* connection from the coordinator operating on a shard */
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("The relation \"%s\" does not have a valid "
|
||||||
|
"entry in pg_dist_partition.",
|
||||||
|
get_rel_name(relationId))));
|
||||||
|
}
|
||||||
|
else if (partitionMethod != DISTRIBUTE_BY_HASH)
|
||||||
|
{
|
||||||
|
/* connection from the coordinator operating on a shard */
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("Updating colocation ids are only allowed for hash "
|
||||||
|
"distributed tables: %c", partitionMethod)));
|
||||||
|
}
|
||||||
|
|
||||||
|
int count = 1;
|
||||||
|
List *targetColocatedTableList =
|
||||||
|
ColocationGroupTableList(tagetColocationId, count);
|
||||||
|
|
||||||
|
if (list_length(targetColocatedTableList) == 0)
|
||||||
|
{
|
||||||
|
/* the table is colocated with none, so nothing to check */
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Oid targetRelationId = linitial_oid(targetColocatedTableList);
|
||||||
|
|
||||||
|
ErrorIfShardPlacementsNotColocated(relationId, targetRelationId);
|
||||||
|
CheckReplicationModel(relationId, targetRelationId);
|
||||||
|
CheckDistributionColumnType(relationId, targetRelationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool localOnly = true;
|
||||||
|
UpdateRelationColocationGroup(relationId, tagetColocationId, localOnly);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
|
@ -15,3 +15,4 @@ ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupi
|
||||||
#include "udfs/citus_internal_add_placement_metadata/10.2-1.sql";
|
#include "udfs/citus_internal_add_placement_metadata/10.2-1.sql";
|
||||||
#include "udfs/citus_internal_update_placement_metadata/10.2-1.sql";
|
#include "udfs/citus_internal_update_placement_metadata/10.2-1.sql";
|
||||||
#include "udfs/citus_internal_delete_shard_metadata/10.2-1.sql";
|
#include "udfs/citus_internal_delete_shard_metadata/10.2-1.sql";
|
||||||
|
#include "udfs/citus_internal_update_relation_colocation/10.2-1.sql";
|
||||||
|
|
|
@ -16,6 +16,7 @@ DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "ch
|
||||||
DROP FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, integer, bigint, integer, bigint);
|
DROP FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, integer, bigint, integer, bigint);
|
||||||
DROP FUNCTION pg_catalog.citus_internal_update_placement_metadata(bigint, integer, integer);
|
DROP FUNCTION pg_catalog.citus_internal_update_placement_metadata(bigint, integer, integer);
|
||||||
DROP FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint);
|
DROP FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint);
|
||||||
|
DROP FUNCTION pg_catalog.citus_internal_update_relation_colocation(oid, integer);
|
||||||
|
|
||||||
REVOKE ALL ON FUNCTION pg_catalog.worker_record_sequence_dependency(regclass,regclass,name) FROM PUBLIC;
|
REVOKE ALL ON FUNCTION pg_catalog.worker_record_sequence_dependency(regclass,regclass,name) FROM PUBLIC;
|
||||||
ALTER TABLE pg_catalog.pg_dist_placement DROP CONSTRAINT placement_shardid_groupid_unique_index;
|
ALTER TABLE pg_catalog.pg_dist_placement DROP CONSTRAINT placement_shardid_groupid_unique_index;
|
||||||
|
|
7
src/backend/distributed/sql/udfs/citus_internal_update_relation_colocation/10.2-1.sql
generated
Normal file
7
src/backend/distributed/sql/udfs/citus_internal_update_relation_colocation/10.2-1.sql
generated
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_relation_colocation(relation_id Oid, target_colocation_id int)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME';
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_internal_update_relation_colocation(oid, int) IS
|
||||||
|
'Updates colocationId field of pg_dist_partition for the relation_id';
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_relation_colocation(relation_id Oid, target_colocation_id int)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME';
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_internal_update_relation_colocation(oid, int) IS
|
||||||
|
'Updates colocationId field of pg_dist_partition for the relation_id';
|
||||||
|
|
|
@ -42,15 +42,12 @@
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static void MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId);
|
static void MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId);
|
||||||
static void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId);
|
|
||||||
static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval,
|
static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval,
|
||||||
ShardInterval *rightShardInterval);
|
ShardInterval *rightShardInterval);
|
||||||
static bool HashPartitionedShardIntervalsEqual(ShardInterval *leftShardInterval,
|
static bool HashPartitionedShardIntervalsEqual(ShardInterval *leftShardInterval,
|
||||||
ShardInterval *rightShardInterval);
|
ShardInterval *rightShardInterval);
|
||||||
static int CompareShardPlacementsByNode(const void *leftElement,
|
static int CompareShardPlacementsByNode(const void *leftElement,
|
||||||
const void *rightElement);
|
const void *rightElement);
|
||||||
static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId);
|
|
||||||
static List * ColocationGroupTableList(Oid colocationId);
|
|
||||||
static void DeleteColocationGroup(uint32 colocationId);
|
static void DeleteColocationGroup(uint32 colocationId);
|
||||||
static uint32 CreateColocationGroupForRelation(Oid sourceRelationId);
|
static uint32 CreateColocationGroupForRelation(Oid sourceRelationId);
|
||||||
static void BreakColocation(Oid sourceRelationId);
|
static void BreakColocation(Oid sourceRelationId);
|
||||||
|
@ -161,7 +158,8 @@ BreakColocation(Oid sourceRelationId)
|
||||||
Relation pgDistColocation = table_open(DistColocationRelationId(), ExclusiveLock);
|
Relation pgDistColocation = table_open(DistColocationRelationId(), ExclusiveLock);
|
||||||
|
|
||||||
uint32 newColocationId = GetNextColocationId();
|
uint32 newColocationId = GetNextColocationId();
|
||||||
UpdateRelationColocationGroup(sourceRelationId, newColocationId);
|
bool localOnly = false;
|
||||||
|
UpdateRelationColocationGroup(sourceRelationId, newColocationId, localOnly);
|
||||||
|
|
||||||
/* if there is not any remaining table in the colocation group, delete it */
|
/* if there is not any remaining table in the colocation group, delete it */
|
||||||
DeleteColocationGroupIfNoTablesBelong(sourceRelationId);
|
DeleteColocationGroupIfNoTablesBelong(sourceRelationId);
|
||||||
|
@ -230,7 +228,8 @@ CreateColocationGroupForRelation(Oid sourceRelationId)
|
||||||
uint32 sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor,
|
uint32 sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor,
|
||||||
sourceDistributionColumnType,
|
sourceDistributionColumnType,
|
||||||
sourceDistributionColumnCollation);
|
sourceDistributionColumnCollation);
|
||||||
UpdateRelationColocationGroup(sourceRelationId, sourceColocationId);
|
bool localOnly = false;
|
||||||
|
UpdateRelationColocationGroup(sourceRelationId, sourceColocationId, localOnly);
|
||||||
return sourceColocationId;
|
return sourceColocationId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,7 +278,8 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
|
||||||
uint32 targetColocationId = TableColocationId(targetRelationId);
|
uint32 targetColocationId = TableColocationId(targetRelationId);
|
||||||
|
|
||||||
/* finally set colocation group for the target relation */
|
/* finally set colocation group for the target relation */
|
||||||
UpdateRelationColocationGroup(targetRelationId, sourceColocationId);
|
bool localOnly = false;
|
||||||
|
UpdateRelationColocationGroup(targetRelationId, sourceColocationId, localOnly);
|
||||||
|
|
||||||
/* if there is not any remaining table in the colocation group, delete it */
|
/* if there is not any remaining table in the colocation group, delete it */
|
||||||
DeleteColocationGroupIfNoTablesBelong(targetColocationId);
|
DeleteColocationGroupIfNoTablesBelong(targetColocationId);
|
||||||
|
@ -300,7 +300,7 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
|
||||||
*
|
*
|
||||||
* Note that, this functions assumes that both tables are hash distributed.
|
* Note that, this functions assumes that both tables are hash distributed.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId)
|
ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId)
|
||||||
{
|
{
|
||||||
ListCell *leftShardIntervalCell = NULL;
|
ListCell *leftShardIntervalCell = NULL;
|
||||||
|
@ -680,29 +680,48 @@ CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId)
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId)
|
CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId)
|
||||||
|
{
|
||||||
|
/* reference tables have NULL distribution column */
|
||||||
|
Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId);
|
||||||
|
|
||||||
|
/* reference tables have NULL distribution column */
|
||||||
|
Var *targetDistributionColumn = DistPartitionKey(targetRelationId);
|
||||||
|
|
||||||
|
EnsureColumnTypeEquality(sourceRelationId, targetRelationId,
|
||||||
|
sourceDistributionColumn, targetDistributionColumn);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetColumnTypeEquality checks if distribution column types and collations
|
||||||
|
* of the given columns are same. The function sets the boolean pointers.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
EnsureColumnTypeEquality(Oid sourceRelationId, Oid targetRelationId,
|
||||||
|
Var *sourceDistributionColumn, Var *targetDistributionColumn)
|
||||||
{
|
{
|
||||||
Oid sourceDistributionColumnType = InvalidOid;
|
Oid sourceDistributionColumnType = InvalidOid;
|
||||||
Oid targetDistributionColumnType = InvalidOid;
|
Oid targetDistributionColumnType = InvalidOid;
|
||||||
Oid sourceDistributionColumnCollation = InvalidOid;
|
Oid sourceDistributionColumnCollation = InvalidOid;
|
||||||
Oid targetDistributionColumnCollation = InvalidOid;
|
Oid targetDistributionColumnCollation = InvalidOid;
|
||||||
|
|
||||||
/* reference tables have NULL distribution column */
|
|
||||||
Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId);
|
|
||||||
if (sourceDistributionColumn != NULL)
|
if (sourceDistributionColumn != NULL)
|
||||||
{
|
{
|
||||||
sourceDistributionColumnType = sourceDistributionColumn->vartype;
|
sourceDistributionColumnType = sourceDistributionColumn->vartype;
|
||||||
sourceDistributionColumnCollation = sourceDistributionColumn->varcollid;
|
sourceDistributionColumnCollation = sourceDistributionColumn->varcollid;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* reference tables have NULL distribution column */
|
|
||||||
Var *targetDistributionColumn = DistPartitionKey(targetRelationId);
|
|
||||||
if (targetDistributionColumn != NULL)
|
if (targetDistributionColumn != NULL)
|
||||||
{
|
{
|
||||||
targetDistributionColumnType = targetDistributionColumn->vartype;
|
targetDistributionColumnType = targetDistributionColumn->vartype;
|
||||||
targetDistributionColumnCollation = targetDistributionColumn->varcollid;
|
targetDistributionColumnCollation = targetDistributionColumn->varcollid;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sourceDistributionColumnType != targetDistributionColumnType)
|
bool columnTypesSame = sourceDistributionColumnType == targetDistributionColumnType;
|
||||||
|
bool columnCollationsSame =
|
||||||
|
sourceDistributionColumnCollation == targetDistributionColumnCollation;
|
||||||
|
|
||||||
|
if (!columnTypesSame)
|
||||||
{
|
{
|
||||||
char *sourceRelationName = get_rel_name(sourceRelationId);
|
char *sourceRelationName = get_rel_name(sourceRelationId);
|
||||||
char *targetRelationName = get_rel_name(targetRelationId);
|
char *targetRelationName = get_rel_name(targetRelationId);
|
||||||
|
@ -714,16 +733,17 @@ CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId)
|
||||||
targetRelationName)));
|
targetRelationName)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sourceDistributionColumnCollation != targetDistributionColumnCollation)
|
if (!columnCollationsSame)
|
||||||
{
|
{
|
||||||
char *sourceRelationName = get_rel_name(sourceRelationId);
|
char *sourceRelationName = get_rel_name(sourceRelationId);
|
||||||
char *targetRelationName = get_rel_name(targetRelationId);
|
char *targetRelationName = get_rel_name(targetRelationId);
|
||||||
|
|
||||||
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
|
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
|
||||||
sourceRelationName, targetRelationName),
|
sourceRelationName, targetRelationName),
|
||||||
errdetail("Distribution column collations don't match for "
|
errdetail(
|
||||||
"%s and %s.", sourceRelationName,
|
"Distribution column collations don't match for "
|
||||||
targetRelationName)));
|
"%s and %s.", sourceRelationName,
|
||||||
|
targetRelationName)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -731,9 +751,13 @@ CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId)
|
||||||
/*
|
/*
|
||||||
* UpdateRelationColocationGroup updates colocation group in pg_dist_partition
|
* UpdateRelationColocationGroup updates colocation group in pg_dist_partition
|
||||||
* for the given relation.
|
* for the given relation.
|
||||||
|
*
|
||||||
|
* When localOnly is true, the function does not propagate changes to the
|
||||||
|
* metadata workers.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId)
|
UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId,
|
||||||
|
bool localOnly)
|
||||||
{
|
{
|
||||||
bool indexOK = true;
|
bool indexOK = true;
|
||||||
int scanKeyCount = 1;
|
int scanKeyCount = 1;
|
||||||
|
@ -782,7 +806,7 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId)
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
|
||||||
bool shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId);
|
bool shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId);
|
||||||
if (shouldSyncMetadata)
|
if (shouldSyncMetadata && !localOnly)
|
||||||
{
|
{
|
||||||
char *updateColocationIdCommand = ColocationIdUpdateCommand(distributedRelationId,
|
char *updateColocationIdCommand = ColocationIdUpdateCommand(distributedRelationId,
|
||||||
colocationId);
|
colocationId);
|
||||||
|
@ -878,7 +902,8 @@ ColocatedTableList(Oid distributedTableId)
|
||||||
return colocatedTableList;
|
return colocatedTableList;
|
||||||
}
|
}
|
||||||
|
|
||||||
colocatedTableList = ColocationGroupTableList(tableColocationId);
|
int count = 0;
|
||||||
|
colocatedTableList = ColocationGroupTableList(tableColocationId, count);
|
||||||
|
|
||||||
return colocatedTableList;
|
return colocatedTableList;
|
||||||
}
|
}
|
||||||
|
@ -887,9 +912,14 @@ ColocatedTableList(Oid distributedTableId)
|
||||||
/*
|
/*
|
||||||
* ColocationGroupTableList returns the list of tables in the given colocation
|
* ColocationGroupTableList returns the list of tables in the given colocation
|
||||||
* group. If the colocation group is INVALID_COLOCATION_ID, it returns NIL.
|
* group. If the colocation group is INVALID_COLOCATION_ID, it returns NIL.
|
||||||
|
*
|
||||||
|
* If count is zero then the command is executed for all rows that it applies to.
|
||||||
|
* If count is greater than zero, then no more than count rows will be retrieved;
|
||||||
|
* execution stops when the count is reached, much like adding a LIMIT clause
|
||||||
|
* to the query.
|
||||||
*/
|
*/
|
||||||
static List *
|
List *
|
||||||
ColocationGroupTableList(Oid colocationId)
|
ColocationGroupTableList(uint32 colocationId, uint32 count)
|
||||||
{
|
{
|
||||||
List *colocatedTableList = NIL;
|
List *colocatedTableList = NIL;
|
||||||
bool indexOK = true;
|
bool indexOK = true;
|
||||||
|
@ -906,7 +936,7 @@ ColocationGroupTableList(Oid colocationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
|
||||||
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId));
|
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId));
|
||||||
|
|
||||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
@ -924,6 +954,17 @@ ColocationGroupTableList(Oid colocationId)
|
||||||
|
|
||||||
colocatedTableList = lappend_oid(colocatedTableList, colocatedTableId);
|
colocatedTableList = lappend_oid(colocatedTableList, colocatedTableId);
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
|
||||||
|
if (count == 0)
|
||||||
|
{
|
||||||
|
/* fetch all rows */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else if (list_length(colocatedTableList) >= count)
|
||||||
|
{
|
||||||
|
/* we are done */
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
|
@ -1158,7 +1199,8 @@ DeleteColocationGroupIfNoTablesBelong(uint32 colocationId)
|
||||||
{
|
{
|
||||||
if (colocationId != INVALID_COLOCATION_ID)
|
if (colocationId != INVALID_COLOCATION_ID)
|
||||||
{
|
{
|
||||||
List *colocatedTableList = ColocationGroupTableList(colocationId);
|
int count = 1;
|
||||||
|
List *colocatedTableList = ColocationGroupTableList(colocationId, count);
|
||||||
int colocatedTableCount = list_length(colocatedTableList);
|
int colocatedTableCount = list_length(colocatedTableList);
|
||||||
|
|
||||||
if (colocatedTableCount == 0)
|
if (colocatedTableCount == 0)
|
||||||
|
|
|
@ -33,9 +33,15 @@ extern uint32 CreateColocationGroup(int shardCount, int replicationFactor,
|
||||||
Oid distributionColumnCollation);
|
Oid distributionColumnCollation);
|
||||||
extern bool IsColocateWithNone(char *colocateWithTableName);
|
extern bool IsColocateWithNone(char *colocateWithTableName);
|
||||||
extern uint32 GetNextColocationId(void);
|
extern uint32 GetNextColocationId(void);
|
||||||
|
extern void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId);
|
||||||
extern void CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId);
|
extern void CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId);
|
||||||
extern void CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId);
|
extern void CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId);
|
||||||
|
extern void EnsureColumnTypeEquality(Oid sourceRelationId, Oid targetRelationId,
|
||||||
|
Var *sourceDistributionColumn,
|
||||||
|
Var *targetDistributionColumn);
|
||||||
|
extern void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId,
|
||||||
|
bool localOnly);
|
||||||
extern void DeleteColocationGroupIfNoTablesBelong(uint32 colocationId);
|
extern void DeleteColocationGroupIfNoTablesBelong(uint32 colocationId);
|
||||||
|
extern List * ColocationGroupTableList(uint32 colocationId, uint32 count);
|
||||||
|
|
||||||
#endif /* COLOCATION_UTILS_H_ */
|
#endif /* COLOCATION_UTILS_H_ */
|
||||||
|
|
|
@ -14,6 +14,8 @@ CREATE TABLE test(col_1 int);
|
||||||
-- not in a distributed transaction
|
-- not in a distributed transaction
|
||||||
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
|
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
|
||||||
ERROR: This is an internal Citus function can only be used in a distributed transaction
|
ERROR: This is an internal Citus function can only be used in a distributed transaction
|
||||||
|
SELECT citus_internal_update_relation_colocation ('test'::regclass, 1);
|
||||||
|
ERROR: This is an internal Citus function can only be used in a distributed transaction
|
||||||
-- in a distributed transaction, but the application name is not Citus
|
-- in a distributed transaction, but the application name is not Citus
|
||||||
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
@ -75,8 +77,21 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
|
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
|
||||||
ERROR: must be owner of table test
|
ERROR: must be owner of table test
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
-- we do not own the relation
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus';
|
||||||
|
SELECT citus_internal_update_relation_colocation ('test'::regclass, 10);
|
||||||
|
ERROR: must be owner of table test
|
||||||
|
ROLLBACK;
|
||||||
-- finally, a user can only add its own tables to the metadata
|
-- finally, a user can only add its own tables to the metadata
|
||||||
CREATE TABLE test_2(col_1 int, col_2 int);
|
CREATE TABLE test_2(col_1 int, col_2 int);
|
||||||
|
CREATE TABLE test_3(col_1 int, col_2 int);
|
||||||
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
assign_distributed_transaction_id
|
assign_distributed_transaction_id
|
||||||
|
@ -415,7 +430,13 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SET application_name to 'citus';
|
SET application_name to 'citus';
|
||||||
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's');
|
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 250, 's');
|
||||||
|
citus_internal_add_partition_metadata
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_internal_add_partition_metadata ('test_3'::regclass, 'h', 'col_1', 251, 's');
|
||||||
citus_internal_add_partition_metadata
|
citus_internal_add_partition_metadata
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -428,6 +449,22 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
-- we can update to a non-existing colocation group (e.g., colocate_with:=none)
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus';
|
||||||
|
SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232);
|
||||||
|
citus_internal_update_relation_colocation
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
-- invalid shard ids are not allowed
|
-- invalid shard ids are not allowed
|
||||||
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
@ -585,7 +622,8 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
('test_2'::regclass, 1420002::bigint, 't'::"char", '31'::text, '40'::text),
|
('test_2'::regclass, 1420002::bigint, 't'::"char", '31'::text, '40'::text),
|
||||||
('test_2'::regclass, 1420003::bigint, 't'::"char", '41'::text, '50'::text),
|
('test_2'::regclass, 1420003::bigint, 't'::"char", '41'::text, '50'::text),
|
||||||
('test_2'::regclass, 1420004::bigint, 't'::"char", '51'::text, '60'::text),
|
('test_2'::regclass, 1420004::bigint, 't'::"char", '51'::text, '60'::text),
|
||||||
('test_2'::regclass, 1420005::bigint, 't'::"char", '61'::text, '70'::text))
|
('test_2'::regclass, 1420005::bigint, 't'::"char", '61'::text, '70'::text),
|
||||||
|
('test_3'::regclass, 1420008::bigint, 't'::"char", '11'::text, '20'::text))
|
||||||
SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||||
citus_internal_add_shard_metadata
|
citus_internal_add_shard_metadata
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -595,7 +633,47 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
(6 rows)
|
|
||||||
|
(7 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- we cannot mark these two tables colocated because they are not colocated
|
||||||
|
BEGIN;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus';
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
ERROR: cannot colocate tables test_2 and test_3
|
||||||
|
ROLLBACK;
|
||||||
|
-- now, add few more shards for test_3 to make it colocated with test_2
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus';
|
||||||
|
\set VERBOSITY terse
|
||||||
|
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
|
||||||
|
AS (VALUES ('test_3'::regclass, 1420009::bigint, 't'::"char", '21'::text, '30'::text),
|
||||||
|
('test_3'::regclass, 1420010::bigint, 't'::"char", '31'::text, '40'::text),
|
||||||
|
('test_3'::regclass, 1420011::bigint, 't'::"char", '41'::text, '50'::text),
|
||||||
|
('test_3'::regclass, 1420012::bigint, 't'::"char", '51'::text, '60'::text),
|
||||||
|
('test_3'::regclass, 1420013::bigint, 't'::"char", '61'::text, '70'::text))
|
||||||
|
SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||||
|
citus_internal_add_shard_metadata
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- shardMin/MaxValues should be NULL for reference tables
|
-- shardMin/MaxValues should be NULL for reference tables
|
||||||
|
@ -807,7 +885,13 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
(1420002, 1, 0::bigint, get_node_id(), 1500002),
|
(1420002, 1, 0::bigint, get_node_id(), 1500002),
|
||||||
(1420003, 1, 0::bigint, get_node_id(), 1500003),
|
(1420003, 1, 0::bigint, get_node_id(), 1500003),
|
||||||
(1420004, 1, 0::bigint, get_node_id(), 1500004),
|
(1420004, 1, 0::bigint, get_node_id(), 1500004),
|
||||||
(1420005, 1, 0::bigint, get_node_id(), 1500005))
|
(1420005, 1, 0::bigint, get_node_id(), 1500005),
|
||||||
|
(1420008, 1, 0::bigint, get_node_id(), 1500006),
|
||||||
|
(1420009, 1, 0::bigint, get_node_id(), 1500007),
|
||||||
|
(1420010, 1, 0::bigint, get_node_id(), 1500008),
|
||||||
|
(1420011, 1, 0::bigint, get_node_id(), 1500009),
|
||||||
|
(1420012, 1, 0::bigint, get_node_id(), 1500010),
|
||||||
|
(1420013, 1, 0::bigint, get_node_id(), 1500011))
|
||||||
SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||||
citus_internal_add_placement_metadata
|
citus_internal_add_placement_metadata
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -817,9 +901,31 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
(6 rows)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
-- we should be able to colocate both tables now
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus';
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
citus_internal_update_relation_colocation
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
-- try to update placements
|
-- try to update placements
|
||||||
-- fails because we are trying to update it to non-existing node
|
-- fails because we are trying to update it to non-existing node
|
||||||
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
@ -967,6 +1073,114 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- we'll do some metadata changes to trigger some error cases
|
||||||
|
-- so connect as superuser
|
||||||
|
\c - postgres - :worker_1_port
|
||||||
|
SET search_path TO metadata_sync_helpers;
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus';
|
||||||
|
-- with an ugly trick, update the repmodel
|
||||||
|
-- so that making two tables colocated fails
|
||||||
|
UPDATE pg_dist_partition SET repmodel = 't'
|
||||||
|
WHERE logicalrelid = 'test_2'::regclass;
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
ERROR: cannot colocate tables test_2 and test_3
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus';
|
||||||
|
-- with an ugly trick, update the vartype of table from int to bigint
|
||||||
|
-- so that making two tables colocated fails
|
||||||
|
UPDATE pg_dist_partition SET partkey = '{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}'
|
||||||
|
WHERE logicalrelid = 'test_2'::regclass;
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
ERROR: cannot colocate tables test_2 and test_3
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus';
|
||||||
|
-- with an ugly trick, update the partmethod of the table to not-valid
|
||||||
|
-- so that making two tables colocated fails
|
||||||
|
UPDATE pg_dist_partition SET partmethod = ''
|
||||||
|
WHERE logicalrelid = 'test_2'::regclass;
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
ERROR: The relation "test_2" does not have a valid entry in pg_dist_partition.
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus';
|
||||||
|
-- with an ugly trick, update the partmethod of the table to not-valid
|
||||||
|
-- so that making two tables colocated fails
|
||||||
|
UPDATE pg_dist_partition SET partmethod = 'a'
|
||||||
|
WHERE logicalrelid = 'test_2'::regclass;
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
ERROR: Updating colocation ids are only allowed for hash distributed tables: a
|
||||||
|
ROLLBACK;
|
||||||
|
-- colocated hash distributed table should have the same dist key columns
|
||||||
|
CREATE TABLE test_5(int_col int, text_col text);
|
||||||
|
CREATE TABLE test_6(int_col int, text_col text);
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus';
|
||||||
|
\set VERBOSITY terse
|
||||||
|
SELECT citus_internal_add_partition_metadata ('test_5'::regclass, 'h', 'int_col', 500, 's');
|
||||||
|
citus_internal_add_partition_metadata
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_internal_add_partition_metadata ('test_6'::regclass, 'h', 'text_col', 500, 's');
|
||||||
|
ERROR: cannot colocate tables test_6 and test_5
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
CREATE COLLATION collation_t1 (provider = icu, locale = 'de-u-co-phonebk');
|
||||||
|
CREATE COLLATION caseinsensitive (provider = icu, locale = 'und-u-ks-level2');
|
||||||
|
-- colocated hash distributed table should have the same dist key collations
|
||||||
|
CREATE TABLE test_7(int_col int, text_col text COLLATE "collation_t1");
|
||||||
|
CREATE TABLE test_8(int_col int, text_col text COLLATE "caseinsensitive");
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET application_name to 'citus';
|
||||||
|
\set VERBOSITY terse
|
||||||
|
SELECT citus_internal_add_partition_metadata ('test_7'::regclass, 'h', 'text_col', 500, 's');
|
||||||
|
citus_internal_add_partition_metadata
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_internal_add_partition_metadata ('test_8'::regclass, 'h', 'text_col', 500, 's');
|
||||||
|
ERROR: cannot colocate tables test_8 and test_7
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- we don't need the table/schema anymore
|
-- we don't need the table/schema anymore
|
||||||
-- connect back as super user to drop everything
|
-- connect back as super user to drop everything
|
||||||
|
|
|
@ -651,8 +651,9 @@ SELECT * FROM multi_extension.print_extension_changes();
|
||||||
| function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) void
|
| function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) void
|
||||||
| function citus_internal_delete_shard_metadata(bigint) void
|
| function citus_internal_delete_shard_metadata(bigint) void
|
||||||
| function citus_internal_update_placement_metadata(bigint,integer,integer) void
|
| function citus_internal_update_placement_metadata(bigint,integer,integer) void
|
||||||
|
| function citus_internal_update_relation_colocation(oid,integer) void
|
||||||
| function stop_metadata_sync_to_node(text,integer,boolean) void
|
| function stop_metadata_sync_to_node(text,integer,boolean) void
|
||||||
(9 rows)
|
(10 rows)
|
||||||
|
|
||||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||||
-- show running version
|
-- show running version
|
||||||
|
|
|
@ -833,12 +833,21 @@ SELECT create_distributed_table('mx_colocation_test_1', 'a');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
CREATE TABLE mx_colocation_test_2 (a int);
|
CREATE TABLE mx_colocation_test_2 (a int);
|
||||||
SELECT create_distributed_table('mx_colocation_test_2', 'a');
|
SELECT create_distributed_table('mx_colocation_test_2', 'a', colocate_with:='none');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Reset the colocation IDs of the test tables
|
||||||
|
DELETE FROM
|
||||||
|
pg_dist_colocation
|
||||||
|
WHERE EXISTS (
|
||||||
|
SELECT 1
|
||||||
|
FROM pg_dist_partition
|
||||||
|
WHERE
|
||||||
|
colocationid = pg_dist_partition.colocationid
|
||||||
|
AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass);
|
||||||
-- Check the colocation IDs of the created tables
|
-- Check the colocation IDs of the created tables
|
||||||
SELECT
|
SELECT
|
||||||
logicalrelid, colocationid
|
logicalrelid, colocationid
|
||||||
|
@ -851,25 +860,9 @@ ORDER BY logicalrelid;
|
||||||
logicalrelid | colocationid
|
logicalrelid | colocationid
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
mx_colocation_test_1 | 10000
|
mx_colocation_test_1 | 10000
|
||||||
mx_colocation_test_2 | 10000
|
mx_colocation_test_2 | 10001
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- Reset the colocation IDs of the test tables
|
|
||||||
DELETE FROM
|
|
||||||
pg_dist_colocation
|
|
||||||
WHERE EXISTS (
|
|
||||||
SELECT 1
|
|
||||||
FROM pg_dist_partition
|
|
||||||
WHERE
|
|
||||||
colocationid = pg_dist_partition.colocationid
|
|
||||||
AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass);
|
|
||||||
UPDATE
|
|
||||||
pg_dist_partition
|
|
||||||
SET
|
|
||||||
colocationid = 0
|
|
||||||
WHERE
|
|
||||||
logicalrelid = 'mx_colocation_test_1'::regclass
|
|
||||||
OR logicalrelid = 'mx_colocation_test_2'::regclass;
|
|
||||||
-- Update colocation and see the changes on the master and the worker
|
-- Update colocation and see the changes on the master and the worker
|
||||||
SELECT update_distributed_table_colocation('mx_colocation_test_1', colocate_with => 'mx_colocation_test_2');
|
SELECT update_distributed_table_colocation('mx_colocation_test_1', colocate_with => 'mx_colocation_test_2');
|
||||||
update_distributed_table_colocation
|
update_distributed_table_colocation
|
||||||
|
@ -1680,9 +1673,9 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
||||||
SELECT citus_internal_add_partition_metadata ('mx_test_schema_1.mx_table_1'::regclass, 'h', 'col1', 3, 's')
|
SELECT citus_internal_add_partition_metadata ('mx_test_schema_1.mx_table_1'::regclass, 'h', 'col1', 3, 's')
|
||||||
SELECT citus_internal_add_partition_metadata ('mx_test_schema_2.mx_table_2'::regclass, 'h', 'col1', 3, 's')
|
SELECT citus_internal_add_partition_metadata ('mx_test_schema_2.mx_table_2'::regclass, 'h', 'col1', 3, 's')
|
||||||
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
|
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
|
||||||
SELECT citus_internal_add_partition_metadata ('public.dist_table_1'::regclass, 'h', 'a', 10004, 's')
|
SELECT citus_internal_add_partition_metadata ('public.dist_table_1'::regclass, 'h', 'a', 10005, 's')
|
||||||
SELECT citus_internal_add_partition_metadata ('public.mx_ref'::regclass, 'n', NULL, 10002, 't')
|
SELECT citus_internal_add_partition_metadata ('public.mx_ref'::regclass, 'n', NULL, 10003, 't')
|
||||||
SELECT citus_internal_add_partition_metadata ('public.test_table'::regclass, 'h', 'id', 10004, 's')
|
SELECT citus_internal_add_partition_metadata ('public.test_table'::regclass, 'h', 'id', 10005, 's')
|
||||||
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
|
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
|
||||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
||||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_sequence_0 AS integer INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START WITH 1 CACHE 1 NO CYCLE','integer')
|
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_sequence_0 AS integer INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START WITH 1 CACHE 1 NO CYCLE','integer')
|
||||||
|
|
|
@ -0,0 +1,100 @@
|
||||||
|
-- in order to make the enterprise and community
|
||||||
|
-- tests outputs the same, disable enable_ddl_propagation
|
||||||
|
-- and create the roles/schema manually
|
||||||
|
SET citus.enable_ddl_propagation TO OFF;
|
||||||
|
CREATE SCHEMA "Update Colocation";
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
CREATE ROLE mx_update_colocation WITH LOGIN;
|
||||||
|
GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation;
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET citus.enable_ddl_propagation TO OFF;
|
||||||
|
CREATE SCHEMA "Update Colocation";
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
CREATE ROLE mx_update_colocation WITH LOGIN;
|
||||||
|
GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation;
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET citus.enable_ddl_propagation TO OFF;
|
||||||
|
CREATE SCHEMA "Update Colocation";
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
CREATE ROLE mx_update_colocation WITH LOGIN;
|
||||||
|
GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation;
|
||||||
|
\c - mx_update_colocation - :master_port
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
CREATE TABLE t1(a int);
|
||||||
|
CREATE TABLE t2(a int);
|
||||||
|
SELECT create_distributed_table('t1', 'a', colocate_with:='none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('t2', 'a', colocate_with:='none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT update_distributed_table_colocation('t1', 't2');
|
||||||
|
update_distributed_table_colocation
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- show that we successfuly updated the colocationids to the same value
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
SELECT update_distributed_table_colocation('t1', 'none');
|
||||||
|
update_distributed_table_colocation
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- show that we successfuly updated the colocationids different values
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - postgres - :master_port
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA "Update Colocation" cascade;
|
|
@ -71,6 +71,7 @@ ORDER BY 1;
|
||||||
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text)
|
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text)
|
||||||
function citus_internal_delete_shard_metadata(bigint)
|
function citus_internal_delete_shard_metadata(bigint)
|
||||||
function citus_internal_update_placement_metadata(bigint,integer,integer)
|
function citus_internal_update_placement_metadata(bigint,integer,integer)
|
||||||
|
function citus_internal_update_relation_colocation(oid,integer)
|
||||||
function citus_isolation_test_session_is_blocked(integer,integer[])
|
function citus_isolation_test_session_is_blocked(integer,integer[])
|
||||||
function citus_json_concatenate(json,json)
|
function citus_json_concatenate(json,json)
|
||||||
function citus_json_concatenate_final(json)
|
function citus_json_concatenate_final(json)
|
||||||
|
@ -253,5 +254,5 @@ ORDER BY 1;
|
||||||
view citus_worker_stat_activity
|
view citus_worker_stat_activity
|
||||||
view pg_dist_shard_placement
|
view pg_dist_shard_placement
|
||||||
view time_partitions
|
view time_partitions
|
||||||
(237 rows)
|
(238 rows)
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,7 @@ test: multi_mx_reference_table
|
||||||
test: multi_mx_insert_select_repartition
|
test: multi_mx_insert_select_repartition
|
||||||
test: locally_execute_intermediate_results
|
test: locally_execute_intermediate_results
|
||||||
test: multi_mx_alter_distributed_table
|
test: multi_mx_alter_distributed_table
|
||||||
|
test: update_colocation_mx
|
||||||
|
|
||||||
# should be executed sequentially because it modifies metadata
|
# should be executed sequentially because it modifies metadata
|
||||||
test: local_shard_execution_dropped_column
|
test: local_shard_execution_dropped_column
|
||||||
|
|
|
@ -16,6 +16,7 @@ CREATE TABLE test(col_1 int);
|
||||||
|
|
||||||
-- not in a distributed transaction
|
-- not in a distributed transaction
|
||||||
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
|
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
|
||||||
|
SELECT citus_internal_update_relation_colocation ('test'::regclass, 1);
|
||||||
|
|
||||||
-- in a distributed transaction, but the application name is not Citus
|
-- in a distributed transaction, but the application name is not Citus
|
||||||
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
@ -70,8 +71,16 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
|
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- we do not own the relation
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus';
|
||||||
|
SELECT citus_internal_update_relation_colocation ('test'::regclass, 10);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
-- finally, a user can only add its own tables to the metadata
|
-- finally, a user can only add its own tables to the metadata
|
||||||
CREATE TABLE test_2(col_1 int, col_2 int);
|
CREATE TABLE test_2(col_1 int, col_2 int);
|
||||||
|
CREATE TABLE test_3(col_1 int, col_2 int);
|
||||||
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
SET application_name to 'citus';
|
SET application_name to 'citus';
|
||||||
|
@ -264,10 +273,18 @@ ROLLBACK;
|
||||||
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
SET application_name to 'citus';
|
SET application_name to 'citus';
|
||||||
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's');
|
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 250, 's');
|
||||||
|
SELECT citus_internal_add_partition_metadata ('test_3'::regclass, 'h', 'col_1', 251, 's');
|
||||||
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 't');
|
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 't');
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
-- we can update to a non-existing colocation group (e.g., colocate_with:=none)
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus';
|
||||||
|
SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
-- invalid shard ids are not allowed
|
-- invalid shard ids are not allowed
|
||||||
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
@ -377,7 +394,29 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
('test_2'::regclass, 1420002::bigint, 't'::"char", '31'::text, '40'::text),
|
('test_2'::regclass, 1420002::bigint, 't'::"char", '31'::text, '40'::text),
|
||||||
('test_2'::regclass, 1420003::bigint, 't'::"char", '41'::text, '50'::text),
|
('test_2'::regclass, 1420003::bigint, 't'::"char", '41'::text, '50'::text),
|
||||||
('test_2'::regclass, 1420004::bigint, 't'::"char", '51'::text, '60'::text),
|
('test_2'::regclass, 1420004::bigint, 't'::"char", '51'::text, '60'::text),
|
||||||
('test_2'::regclass, 1420005::bigint, 't'::"char", '61'::text, '70'::text))
|
('test_2'::regclass, 1420005::bigint, 't'::"char", '61'::text, '70'::text),
|
||||||
|
('test_3'::regclass, 1420008::bigint, 't'::"char", '11'::text, '20'::text))
|
||||||
|
SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- we cannot mark these two tables colocated because they are not colocated
|
||||||
|
BEGIN;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus';
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- now, add few more shards for test_3 to make it colocated with test_2
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus';
|
||||||
|
\set VERBOSITY terse
|
||||||
|
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue)
|
||||||
|
AS (VALUES ('test_3'::regclass, 1420009::bigint, 't'::"char", '21'::text, '30'::text),
|
||||||
|
('test_3'::regclass, 1420010::bigint, 't'::"char", '31'::text, '40'::text),
|
||||||
|
('test_3'::regclass, 1420011::bigint, 't'::"char", '41'::text, '50'::text),
|
||||||
|
('test_3'::regclass, 1420012::bigint, 't'::"char", '51'::text, '60'::text),
|
||||||
|
('test_3'::regclass, 1420013::bigint, 't'::"char", '61'::text, '70'::text))
|
||||||
SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
@ -526,10 +565,23 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
(1420002, 1, 0::bigint, get_node_id(), 1500002),
|
(1420002, 1, 0::bigint, get_node_id(), 1500002),
|
||||||
(1420003, 1, 0::bigint, get_node_id(), 1500003),
|
(1420003, 1, 0::bigint, get_node_id(), 1500003),
|
||||||
(1420004, 1, 0::bigint, get_node_id(), 1500004),
|
(1420004, 1, 0::bigint, get_node_id(), 1500004),
|
||||||
(1420005, 1, 0::bigint, get_node_id(), 1500005))
|
(1420005, 1, 0::bigint, get_node_id(), 1500005),
|
||||||
|
(1420008, 1, 0::bigint, get_node_id(), 1500006),
|
||||||
|
(1420009, 1, 0::bigint, get_node_id(), 1500007),
|
||||||
|
(1420010, 1, 0::bigint, get_node_id(), 1500008),
|
||||||
|
(1420011, 1, 0::bigint, get_node_id(), 1500009),
|
||||||
|
(1420012, 1, 0::bigint, get_node_id(), 1500010),
|
||||||
|
(1420013, 1, 0::bigint, get_node_id(), 1500011))
|
||||||
SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
-- we should be able to colocate both tables now
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus';
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
-- try to update placements
|
-- try to update placements
|
||||||
|
|
||||||
-- fails because we are trying to update it to non-existing node
|
-- fails because we are trying to update it to non-existing node
|
||||||
|
@ -619,6 +671,80 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000;
|
SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
-- we'll do some metadata changes to trigger some error cases
|
||||||
|
-- so connect as superuser
|
||||||
|
\c - postgres - :worker_1_port
|
||||||
|
SET search_path TO metadata_sync_helpers;
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus';
|
||||||
|
-- with an ugly trick, update the repmodel
|
||||||
|
-- so that making two tables colocated fails
|
||||||
|
UPDATE pg_dist_partition SET repmodel = 't'
|
||||||
|
WHERE logicalrelid = 'test_2'::regclass;
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus';
|
||||||
|
-- with an ugly trick, update the vartype of table from int to bigint
|
||||||
|
-- so that making two tables colocated fails
|
||||||
|
UPDATE pg_dist_partition SET partkey = '{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}'
|
||||||
|
WHERE logicalrelid = 'test_2'::regclass;
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus';
|
||||||
|
-- with an ugly trick, update the partmethod of the table to not-valid
|
||||||
|
-- so that making two tables colocated fails
|
||||||
|
UPDATE pg_dist_partition SET partmethod = ''
|
||||||
|
WHERE logicalrelid = 'test_2'::regclass;
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus';
|
||||||
|
-- with an ugly trick, update the partmethod of the table to not-valid
|
||||||
|
-- so that making two tables colocated fails
|
||||||
|
UPDATE pg_dist_partition SET partmethod = 'a'
|
||||||
|
WHERE logicalrelid = 'test_2'::regclass;
|
||||||
|
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- colocated hash distributed table should have the same dist key columns
|
||||||
|
CREATE TABLE test_5(int_col int, text_col text);
|
||||||
|
CREATE TABLE test_6(int_col int, text_col text);
|
||||||
|
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus';
|
||||||
|
\set VERBOSITY terse
|
||||||
|
SELECT citus_internal_add_partition_metadata ('test_5'::regclass, 'h', 'int_col', 500, 's');
|
||||||
|
SELECT citus_internal_add_partition_metadata ('test_6'::regclass, 'h', 'text_col', 500, 's');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
|
||||||
|
CREATE COLLATION collation_t1 (provider = icu, locale = 'de-u-co-phonebk');
|
||||||
|
CREATE COLLATION caseinsensitive (provider = icu, locale = 'und-u-ks-level2');
|
||||||
|
|
||||||
|
-- colocated hash distributed table should have the same dist key collations
|
||||||
|
CREATE TABLE test_7(int_col int, text_col text COLLATE "collation_t1");
|
||||||
|
CREATE TABLE test_8(int_col int, text_col text COLLATE "caseinsensitive");
|
||||||
|
|
||||||
|
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
|
||||||
|
SET application_name to 'citus';
|
||||||
|
\set VERBOSITY terse
|
||||||
|
SELECT citus_internal_add_partition_metadata ('test_7'::regclass, 'h', 'text_col', 500, 's');
|
||||||
|
SELECT citus_internal_add_partition_metadata ('test_8'::regclass, 'h', 'text_col', 500, 's');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
-- we don't need the table/schema anymore
|
-- we don't need the table/schema anymore
|
||||||
-- connect back as super user to drop everything
|
-- connect back as super user to drop everything
|
||||||
\c - postgres - :worker_1_port
|
\c - postgres - :worker_1_port
|
||||||
|
|
|
@ -363,17 +363,7 @@ CREATE TABLE mx_colocation_test_1 (a int);
|
||||||
SELECT create_distributed_table('mx_colocation_test_1', 'a');
|
SELECT create_distributed_table('mx_colocation_test_1', 'a');
|
||||||
|
|
||||||
CREATE TABLE mx_colocation_test_2 (a int);
|
CREATE TABLE mx_colocation_test_2 (a int);
|
||||||
SELECT create_distributed_table('mx_colocation_test_2', 'a');
|
SELECT create_distributed_table('mx_colocation_test_2', 'a', colocate_with:='none');
|
||||||
|
|
||||||
-- Check the colocation IDs of the created tables
|
|
||||||
SELECT
|
|
||||||
logicalrelid, colocationid
|
|
||||||
FROM
|
|
||||||
pg_dist_partition
|
|
||||||
WHERE
|
|
||||||
logicalrelid = 'mx_colocation_test_1'::regclass
|
|
||||||
OR logicalrelid = 'mx_colocation_test_2'::regclass
|
|
||||||
ORDER BY logicalrelid;
|
|
||||||
|
|
||||||
-- Reset the colocation IDs of the test tables
|
-- Reset the colocation IDs of the test tables
|
||||||
DELETE FROM
|
DELETE FROM
|
||||||
|
@ -384,13 +374,16 @@ WHERE EXISTS (
|
||||||
WHERE
|
WHERE
|
||||||
colocationid = pg_dist_partition.colocationid
|
colocationid = pg_dist_partition.colocationid
|
||||||
AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass);
|
AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass);
|
||||||
UPDATE
|
|
||||||
|
-- Check the colocation IDs of the created tables
|
||||||
|
SELECT
|
||||||
|
logicalrelid, colocationid
|
||||||
|
FROM
|
||||||
pg_dist_partition
|
pg_dist_partition
|
||||||
SET
|
|
||||||
colocationid = 0
|
|
||||||
WHERE
|
WHERE
|
||||||
logicalrelid = 'mx_colocation_test_1'::regclass
|
logicalrelid = 'mx_colocation_test_1'::regclass
|
||||||
OR logicalrelid = 'mx_colocation_test_2'::regclass;
|
OR logicalrelid = 'mx_colocation_test_2'::regclass
|
||||||
|
ORDER BY logicalrelid;
|
||||||
|
|
||||||
-- Update colocation and see the changes on the master and the worker
|
-- Update colocation and see the changes on the master and the worker
|
||||||
SELECT update_distributed_table_colocation('mx_colocation_test_1', colocate_with => 'mx_colocation_test_2');
|
SELECT update_distributed_table_colocation('mx_colocation_test_1', colocate_with => 'mx_colocation_test_2');
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
|
||||||
|
-- in order to make the enterprise and community
|
||||||
|
-- tests outputs the same, disable enable_ddl_propagation
|
||||||
|
-- and create the roles/schema manually
|
||||||
|
SET citus.enable_ddl_propagation TO OFF;
|
||||||
|
CREATE SCHEMA "Update Colocation";
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
CREATE ROLE mx_update_colocation WITH LOGIN;
|
||||||
|
GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation;
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET citus.enable_ddl_propagation TO OFF;
|
||||||
|
CREATE SCHEMA "Update Colocation";
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
CREATE ROLE mx_update_colocation WITH LOGIN;
|
||||||
|
GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET citus.enable_ddl_propagation TO OFF;
|
||||||
|
CREATE SCHEMA "Update Colocation";
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
CREATE ROLE mx_update_colocation WITH LOGIN;
|
||||||
|
GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation;
|
||||||
|
\c - mx_update_colocation - :master_port
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
|
||||||
|
CREATE TABLE t1(a int);
|
||||||
|
CREATE TABLE t2(a int);
|
||||||
|
SELECT create_distributed_table('t1', 'a', colocate_with:='none');
|
||||||
|
SELECT create_distributed_table('t2', 'a', colocate_with:='none');
|
||||||
|
SELECT update_distributed_table_colocation('t1', 't2');
|
||||||
|
|
||||||
|
-- show that we successfuly updated the colocationids to the same value
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
SELECT update_distributed_table_colocation('t1', 'none');
|
||||||
|
|
||||||
|
-- show that we successfuly updated the colocationids different values
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO "Update Colocation";
|
||||||
|
SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass);
|
||||||
|
|
||||||
|
\c - postgres - :master_port
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA "Update Colocation" cascade;
|
Loading…
Reference in New Issue