diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 2e4d9ff3a..0aa96b5cd 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,7 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -88,6 +88,8 @@ $(EXTENSION)--6.0-14.sql: $(EXTENSION)--6.0-13.sql $(EXTENSION)--6.0-13--6.0-14. cat $^ > $@ $(EXTENSION)--6.0-15.sql: $(EXTENSION)--6.0-14.sql $(EXTENSION)--6.0-14--6.0-15.sql cat $^ > $@ +$(EXTENSION)--6.0-16.sql: $(EXTENSION)--6.0-15.sql $(EXTENSION)--6.0-15--6.0-16.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-15--6.0-16.sql b/src/backend/distributed/citus--6.0-15--6.0-16.sql new file mode 100644 index 000000000..2bb12b659 --- /dev/null +++ b/src/backend/distributed/citus--6.0-15--6.0-16.sql @@ -0,0 +1,12 @@ +/* citus--6.0-15--6.0-16.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION mark_tables_colocated(source_table_name regclass, target_table_names regclass[]) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$mark_tables_colocated$$; +COMMENT ON FUNCTION mark_tables_colocated(source_table_name regclass, target_table_names regclass[]) + IS 'mark target distributed tables as colocated with the source table'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 557f71bc8..cf6b39116 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-15' +default_version = '6.0-16' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 5563a308d..7e6627bbe 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -79,8 +79,6 @@ static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, static void CreateTruncateTrigger(Oid relationId); static uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType); -static uint32 CreateColocationGroup(int shardCount, int replicationFactor, - Oid distributionColumnType); static uint32 GetNextColocationId(void); static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, int shardCount, int replicationFactor); @@ -610,7 +608,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, CharGetDatum(distributionMethod); newValues[Anum_pg_dist_partition_partkey - 1] = CStringGetTextDatum(distributionColumnString); - newValues[Anum_pg_dist_partition_colocationid - 1] = Int32GetDatum(colocationId); + newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls); @@ -883,7 +881,7 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) * pg_dist_colocation with the given configuration. It also returns the created * colocation id. */ -static uint32 +uint32 CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) { uint32 colocationId = GetNextColocationId(); @@ -967,7 +965,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, Relation distributedRelation = NULL; Relation pgDistColocation = NULL; Var *distributionColumn = NULL; - int distributionColumnType = 0; + Oid distributionColumnType = 0; uint32 colocationId = INVALID_COLOCATION_ID; /* get distribution column type */ diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index a787f1a49..c135b3ef5 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -50,7 +50,6 @@ /* local function forward declarations */ -static void CheckHashPartitionedTable(Oid distributedTableId); static text * IntegerToText(int32 value); @@ -326,7 +325,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) * tableId and checks if the table is hash partitioned. If not, the function * throws an error. */ -static void +void CheckHashPartitionedTable(Oid distributedTableId) { char partitionType = PartitionMethod(distributedTableId); diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 86449369a..51f87554a 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -52,6 +52,61 @@ PG_FUNCTION_INFO_V1(master_stage_shard_row); PG_FUNCTION_INFO_V1(master_stage_shard_placement_row); +/* + * TableShardReplicationFactor returns the current replication factor of the + * given relation by looking into shard placements. It errors out if there + * are different number of shard placements for different shards. It also + * errors out if the table does not have any shards. + */ +uint32 +TableShardReplicationFactor(Oid relationId) +{ + uint32 replicationCount = 0; + ListCell *shardCell = NULL; + + List *shardIntervalList = LoadShardIntervalList(relationId); + foreach(shardCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell); + uint64 shardId = shardInterval->shardId; + + List *shardPlacementList = ShardPlacementList(shardId); + uint32 shardPlacementCount = list_length(shardPlacementList); + + /* + * Get the replication count of the first shard in the list, and error + * out if there is a shard with different replication count. + */ + if (replicationCount == 0) + { + replicationCount = shardPlacementCount; + } + else if (replicationCount != shardPlacementCount) + { + char *relationName = get_rel_name(relationId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot find the replication factor of the " + "table %s", relationName), + errdetail("The shard %ld has different shards replication " + "counts from other shards.", shardId))); + } + } + + /* error out if the table does not have any shards */ + if (replicationCount == 0) + { + char *relationName = get_rel_name(relationId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot find the replication factor of the " + "table %s", relationName), + errdetail("The table %s does not have any shards.", + relationName))); + } + + return replicationCount; +} + + /* * LoadShardIntervalList returns a list of shard intervals related for a given * distributed table. The function returns an empty list if no shards can be diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 9fc4ad85b..11060526f 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -14,14 +14,377 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" +#include "access/xact.h" +#include "catalog/indexing.h" #include "distributed/colocation_utils.h" +#include "distributed/listutils.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" +#include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" +#include "distributed/worker_protocol.h" #include "utils/fmgroids.h" +#include "utils/lsyscache.h" #include "utils/rel.h" +/* local function forward declarations */ +static void MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId); +static void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId); +static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval, + ShardInterval *rightShardInterval); +static int CompareShardPlacementsByNode(const void *leftElement, + const void *rightElement); +static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId); + + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(mark_tables_colocated); + + +/* + * mark_tables_colocated puts target tables to same colocation group with the + * source table. If the source table is in INVALID_COLOCATION_ID group, then it + * creates a new colocation group and assigns all tables to this new colocation + * group. + */ +Datum +mark_tables_colocated(PG_FUNCTION_ARGS) +{ + Oid sourceRelationId = PG_GETARG_OID(0); + ArrayType *relationIdArrayObject = PG_GETARG_ARRAYTYPE_P(1); + Datum *relationIdDatumArray = NULL; + int relationIndex = 0; + + int relationCount = ArrayObjectCount(relationIdArrayObject); + if (relationCount < 1) + { + ereport(ERROR, (errmsg("at least one target table is required for this " + "operation"))); + } + + relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject); + + for (relationIndex = 0; relationIndex < relationCount; relationIndex++) + { + Oid nextRelationOid = DatumGetObjectId(relationIdDatumArray[relationIndex]); + MarkTablesColocated(sourceRelationId, nextRelationOid); + } + + PG_RETURN_VOID(); +} + + +/* + * MarkTablesColocated puts both tables to same colocation group. If the + * source table is in INVALID_COLOCATION_ID group, then it creates a new + * colocation group and assigns both tables to same colocation group. Otherwise, + * it adds the target table to colocation group of the source table. + */ +static void +MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) +{ + uint32 sourceColocationId = INVALID_COLOCATION_ID; + Relation pgDistColocation = NULL; + Var *sourceDistributionColumn = NULL; + Var *targetDistributionColumn = NULL; + Oid sourceDistributionColumnType = InvalidOid; + Oid targetDistributionColumnType = InvalidOid; + + CheckHashPartitionedTable(sourceRelationId); + CheckHashPartitionedTable(targetRelationId); + + sourceDistributionColumn = PartitionKey(sourceRelationId); + sourceDistributionColumnType = sourceDistributionColumn->vartype; + + targetDistributionColumn = PartitionKey(targetRelationId); + targetDistributionColumnType = targetDistributionColumn->vartype; + + if (sourceDistributionColumnType != targetDistributionColumnType) + { + char *sourceRelationName = get_rel_name(sourceRelationId); + char *targetRelationName = get_rel_name(targetRelationId); + + ereport(ERROR, (errmsg("cannot colocate tables %s and %s", + sourceRelationName, targetRelationName), + errdetail("Distribution column types don't match for " + "%s and %s.", sourceRelationName, + targetRelationName))); + } + + /* + * Get an exclusive lock on the colocation system catalog. Therefore, we + * can be sure that there will no modifications on the colocation table + * until this transaction is committed. + */ + pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); + + /* check if shard placements are colocated */ + ErrorIfShardPlacementsNotColocated(sourceRelationId, targetRelationId); + + /* + * Get colocation group of the source table, if the source table does not + * have a colocation group, create a new one, and set it for the source table. + */ + sourceColocationId = TableColocationId(sourceRelationId); + if (sourceColocationId == INVALID_COLOCATION_ID) + { + uint32 shardCount = ShardIntervalCount(sourceRelationId); + uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId); + + sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor, + sourceDistributionColumnType); + UpdateRelationColocationGroup(sourceRelationId, sourceColocationId); + } + + /* finally set colocation group for the target relation */ + UpdateRelationColocationGroup(targetRelationId, sourceColocationId); + + heap_close(pgDistColocation, NoLock); +} + + +/* + * ErrorIfShardPlacementsNotColocated checks if the shard placements of the + * given two relations are physically colocated. It errors out in any of + * following cases: + * 1.Shard counts are different, + * 2.Shard intervals don't match + * 3.Shard placements are not colocated (not on the same node) + * 4.Shard placements have different health states + * + * Note that, this functions assumes that both tables are hash distributed. + */ +static void +ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId) +{ + List *leftShardIntervalList = NIL; + List *rightShardIntervalList = NIL; + ListCell *leftShardIntervalCell = NULL; + ListCell *rightShardIntervalCell = NULL; + char *leftRelationName = NULL; + char *rightRelationName = NULL; + uint32 leftShardCount = 0; + uint32 rightShardCount = 0; + + /* get sorted shard interval lists for both tables */ + leftShardIntervalList = LoadShardIntervalList(leftRelationId); + rightShardIntervalList = LoadShardIntervalList(rightRelationId); + + /* prevent concurrent placement changes */ + LockShardListMetadata(leftShardIntervalList, ShareLock); + LockShardListMetadata(rightShardIntervalList, ShareLock); + + leftRelationName = get_rel_name(leftRelationId); + rightRelationName = get_rel_name(rightRelationId); + + leftShardCount = list_length(leftShardIntervalList); + rightShardCount = list_length(rightShardIntervalList); + + if (leftShardCount != rightShardCount) + { + ereport(ERROR, (errmsg("cannot colocate tables %s and %s", + leftRelationName, rightRelationName), + errdetail("Shard counts don't match for %s and %s.", + leftRelationName, rightRelationName))); + } + + /* compare shard intervals one by one */ + forboth(leftShardIntervalCell, leftShardIntervalList, + rightShardIntervalCell, rightShardIntervalList) + { + ShardInterval *leftInterval = (ShardInterval *) lfirst(leftShardIntervalCell); + ShardInterval *rightInterval = (ShardInterval *) lfirst(rightShardIntervalCell); + + List *leftPlacementList = NIL; + List *rightPlacementList = NIL; + List *sortedLeftPlacementList = NIL; + List *sortedRightPlacementList = NIL; + ListCell *leftPlacementCell = NULL; + ListCell *rightPlacementCell = NULL; + + uint64 leftShardId = leftInterval->shardId; + uint64 rightShardId = rightInterval->shardId; + + bool shardsIntervalsEqual = ShardsIntervalsEqual(leftInterval, rightInterval); + if (!shardsIntervalsEqual) + { + ereport(ERROR, (errmsg("cannot colocate tables %s and %s", + leftRelationName, rightRelationName), + errdetail("Shard intervals don't match for %s and %s.", + leftRelationName, rightRelationName))); + } + + leftPlacementList = ShardPlacementList(leftShardId); + rightPlacementList = ShardPlacementList(rightShardId); + + /* sort shard placements according to the node */ + sortedLeftPlacementList = SortList(leftPlacementList, + CompareShardPlacementsByNode); + sortedRightPlacementList = SortList(rightPlacementList, + CompareShardPlacementsByNode); + + /* compare shard placements one by one */ + forboth(leftPlacementCell, sortedLeftPlacementList, + rightPlacementCell, sortedRightPlacementList) + { + ShardPlacement *leftPlacement = + (ShardPlacement *) lfirst(leftPlacementCell); + ShardPlacement *rightPlacement = + (ShardPlacement *) lfirst(rightPlacementCell); + int nodeCompare = 0; + + /* + * If shard placements are on different nodes, these shard + * placements are not colocated. + */ + nodeCompare = CompareShardPlacementsByNode((void *) &leftPlacement, + (void *) &rightPlacement); + if (nodeCompare != 0) + { + ereport(ERROR, (errmsg("cannot colocate tables %s and %s", + leftRelationName, rightRelationName), + errdetail("Shard %ld of %s and shard %ld of %s " + "are not colocated.", + leftShardId, leftRelationName, + rightShardId, rightRelationName))); + } + + /* we also don't allow colocated shards to be in different shard states */ + if (leftPlacement->shardState != rightPlacement->shardState) + { + ereport(ERROR, (errmsg("cannot colocate tables %s and %s", + leftRelationName, rightRelationName), + errdetail("%s and %s have shard placements in " + "different shard states.", + leftRelationName, rightRelationName))); + } + } + } +} + + +/* + * ShardsIntervalsEqual checks if two shard intervals of hash distributed + * tables are equal. Note that, this function doesn't work with non-hash + * partitioned table's shards. + * + * We do min/max value check here to decide whether two shards are colocated, + * instead we can simply use FindShardIntervalIndex function on both shards then + * but do index check, but we avoid it because this way it is more cheaper. + */ +static bool +ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval) +{ + int32 leftShardMinValue = DatumGetInt32(leftShardInterval->minValue); + int32 leftShardMaxValue = DatumGetInt32(leftShardInterval->maxValue); + int32 rightShardMinValue = DatumGetInt32(rightShardInterval->minValue); + int32 rightShardMaxValue = DatumGetInt32(rightShardInterval->maxValue); + + bool minValuesEqual = leftShardMinValue == rightShardMinValue; + bool maxValuesEqual = leftShardMaxValue == rightShardMaxValue; + + return minValuesEqual && maxValuesEqual; +} + + +/* + * CompareShardPlacementsByNode compares two shard placements by their nodename + * and nodeport. + */ +static int +CompareShardPlacementsByNode(const void *leftElement, const void *rightElement) +{ + const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement); + const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement); + + char *leftNodeName = leftPlacement->nodeName; + char *rightNodeName = rightPlacement->nodeName; + + uint32 leftNodePort = leftPlacement->nodePort; + uint32 rightNodePort = rightPlacement->nodePort; + + /* first compare node names */ + int nodeNameCompare = strncmp(leftNodeName, rightNodeName, WORKER_LENGTH); + if (nodeNameCompare != 0) + { + return nodeNameCompare; + } + + /* if node names are same, check node ports */ + if (leftNodePort < rightNodePort) + { + return -1; + } + else if (leftNodePort > rightNodePort) + { + return 1; + } + else + { + return 0; + } +} + + +/* + * UpdateRelationColocationGroup updates colocation group in pg_dist_partition + * for the given relation. + */ +static void +UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId) +{ + Relation pgDistPartition = NULL; + HeapTuple heapTuple = NULL; + TupleDesc tupleDescriptor = NULL; + SysScanDesc scanDescriptor = NULL; + bool indexOK = true; + int scanKeyCount = 1; + ScanKeyData scanKey[scanKeyCount]; + Datum values[Natts_pg_dist_partition]; + bool isNull[Natts_pg_dist_partition]; + bool replace[Natts_pg_dist_partition]; + + pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistPartition); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId)); + + scanDescriptor = systable_beginscan(pgDistPartition, + DistPartitionLogicalRelidIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + char *distributedRelationName = get_rel_name(distributedRelationId); + ereport(ERROR, (errmsg("could not find valid entry for relation %s", + distributedRelationName))); + } + + memset(values, 0, sizeof(replace)); + memset(isNull, false, sizeof(isNull)); + memset(replace, false, sizeof(replace)); + + values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); + isNull[Anum_pg_dist_partition_colocationid - 1] = false; + replace[Anum_pg_dist_partition_colocationid - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isNull, replace); + simple_heap_update(pgDistPartition, &heapTuple->t_self, heapTuple); + + CatalogUpdateIndexes(pgDistPartition, heapTuple); + CitusInvalidateRelcacheByRelid(distributedRelationId); + + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + heap_close(pgDistPartition, NoLock); +} + + /* * TableColocationId function returns co-location id of given table. This function * errors out if given table is not distributed. @@ -81,23 +444,9 @@ ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInter if (tablesColocated) { - /* - * We do min/max value check here to decide whether two shards are co=located, - * instead we can simply use FindShardIntervalIndex function on both shards then - * but do index check, but we avoid it because this way it is more cheaper. - * - * Having co-located tables implies that tables are partitioned by hash partition - * therefore it is safe to use DatumGetInt32 here. - */ - int32 leftShardMinValue = DatumGetInt32(leftShardInterval->minValue); - int32 leftShardMaxValue = DatumGetInt32(leftShardInterval->maxValue); - int32 rightShardMinValue = DatumGetInt32(rightShardInterval->minValue); - int32 rightShardMaxValue = DatumGetInt32(rightShardInterval->maxValue); - - bool minValuesEqual = leftShardMinValue == rightShardMinValue; - bool maxValuesEqual = leftShardMaxValue == rightShardMaxValue; - - return minValuesEqual && maxValuesEqual; + bool shardIntervalEqual = ShardsIntervalsEqual(leftShardInterval, + rightShardInterval); + return shardIntervalEqual; } return false; diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index fec53e6fe..09de91ee3 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -25,5 +25,8 @@ extern List * ColocatedTableList(Oid distributedTableId); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); extern Oid ColocatedTableId(Oid colocationId); extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex); +extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, + Oid distributionColumnType); + #endif /* COLOCATION_UTILS_H_ */ diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index a267d7ff3..3881e8d61 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -57,6 +57,7 @@ typedef struct ShardPlacement /* Function declarations to read shard and shard placement data */ +extern uint32 TableShardReplicationFactor(Oid relationId); extern List * LoadShardIntervalList(Oid relationId); extern int ShardIntervalCount(Oid relationId); extern List * LoadShardList(Oid relationId); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 04a978eb5..5c3491384 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -110,6 +110,7 @@ extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, int shardIndex, uint64 shardId, char *newShardOwner, List *ddlCommandList, List *foreignConstraintCommadList); extern Oid ForeignConstraintGetReferencedTableId(char *queryString); +extern void CheckHashPartitionedTable(Oid distributedTableId); /* Function declarations for generating metadata for shard and placement creation */ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 2b269ea82..b87eba729 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -442,7 +442,7 @@ SELECT * FROM pg_dist_colocation SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY colocationid; + ORDER BY logicalrelid; logicalrelid | colocationid ---------------+-------------- table1_groupa | 1 @@ -659,3 +659,124 @@ ORDER BY table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647 (56 rows) +-- reset colocation ids to test mark_tables_colocated +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; +DELETE FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000; +UPDATE pg_dist_partition SET colocationid = 0 + WHERE colocationid >= 1 AND colocationid < 1000; +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ +(0 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY logicalrelid; + logicalrelid | colocationid +--------------+-------------- +(0 rows) + +-- first check failing cases +SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']); +ERROR: cannot colocate tables table1_groupb and table1_groupc +DETAIL: Distribution column types don't match for table1_groupb and table1_groupc. +SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupD']); +ERROR: cannot colocate tables table1_groupb and table1_groupd +DETAIL: Shard counts don't match for table1_groupb and table1_groupd. +SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupE']); +ERROR: cannot colocate tables table1_groupb and table1_groupe +DETAIL: Shard 1300027 of table1_groupb and shard 1300047 of table1_groupe are not colocated. +SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupF']); +ERROR: cannot colocate tables table1_groupb and table1_groupf +DETAIL: Shard counts don't match for table1_groupb and table1_groupf. +SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_groupD']); +ERROR: cannot colocate tables table1_groupb and table1_groupd +DETAIL: Shard counts don't match for table1_groupb and table1_groupd. +-- check metadata to see failing calls didn't have any side effects +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ +(0 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY logicalrelid; + logicalrelid | colocationid +--------------+-------------- +(0 rows) + +-- check successfully cololated tables +SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']); + mark_tables_colocated +----------------------- + +(1 row) + +SELECT mark_tables_colocated('table1_groupC', ARRAY['table2_groupC']); + mark_tables_colocated +----------------------- + +(1 row) + +SELECT mark_tables_colocated('table1_groupD', ARRAY['table2_groupD']); + mark_tables_colocated +----------------------- + +(1 row) + +SELECT mark_tables_colocated('table1_groupE', ARRAY['table2_groupE', 'table3_groupE']); + mark_tables_colocated +----------------------- + +(1 row) + +SELECT mark_tables_colocated('table1_groupF', ARRAY['table2_groupF']); + mark_tables_colocated +----------------------- + +(1 row) + +-- check to colocate with itself +SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']); + mark_tables_colocated +----------------------- + +(1 row) + +-- check metadata to see colocation groups are created successfully +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 2 | 2 | 1 | 23 + 3 | 2 | 2 | 25 + 4 | 4 | 2 | 23 + 5 | 2 | 2 | 23 + 6 | 1 | 2 | 23 +(5 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY logicalrelid; + logicalrelid | colocationid +---------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table1_groupe | 5 + table2_groupe | 5 + table3_groupe | 5 + table1_groupf | 6 + table2_groupf | 6 +(11 rows) + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 119930e1d..a11dced98 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -55,6 +55,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-12'; ALTER EXTENSION citus UPDATE TO '6.0-13'; ALTER EXTENSION citus UPDATE TO '6.0-14'; ALTER EXTENSION citus UPDATE TO '6.0-15'; +ALTER EXTENSION citus UPDATE TO '6.0-16'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 8f714c4e8..184d21173 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -212,7 +212,7 @@ SELECT * FROM pg_dist_colocation SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY colocationid; + ORDER BY logicalrelid; -- check effects of dropping tables DROP TABLE table1_groupA; @@ -295,3 +295,54 @@ ORDER BY shardmaxvalue::integer, shardid, placementid; + +-- reset colocation ids to test mark_tables_colocated +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; +DELETE FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000; +UPDATE pg_dist_partition SET colocationid = 0 + WHERE colocationid >= 1 AND colocationid < 1000; + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY logicalrelid; + +-- first check failing cases +SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']); +SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupD']); +SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupE']); +SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupF']); +SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_groupD']); + +-- check metadata to see failing calls didn't have any side effects +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY logicalrelid; + +-- check successfully cololated tables +SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']); +SELECT mark_tables_colocated('table1_groupC', ARRAY['table2_groupC']); +SELECT mark_tables_colocated('table1_groupD', ARRAY['table2_groupD']); +SELECT mark_tables_colocated('table1_groupE', ARRAY['table2_groupE', 'table3_groupE']); +SELECT mark_tables_colocated('table1_groupF', ARRAY['table2_groupF']); + +-- check to colocate with itself +SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']); + +-- check metadata to see colocation groups are created successfully +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY logicalrelid; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 63369f692..38b2b1aa0 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -55,6 +55,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-12'; ALTER EXTENSION citus UPDATE TO '6.0-13'; ALTER EXTENSION citus UPDATE TO '6.0-14'; ALTER EXTENSION citus UPDATE TO '6.0-15'; +ALTER EXTENSION citus UPDATE TO '6.0-16'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*)