diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 82f4fdf61..b91c0f4e0 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -103,8 +103,9 @@ static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS); static bool ShouldSkipMetadataChecks(void); -static void EnsurePartitionMetadataIsSane(char distributionMethod, int colocationId, - char replicationModel); +static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod, + int colocationId, char replicationModel, + Var *distributionKey); static void EnsureCoordinatorInitiatedOperation(void); static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, text *shardMinValue, @@ -128,6 +129,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata); +PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation); static bool got_SIGTERM = false; @@ -1020,10 +1022,9 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId) { StringInfo command = makeStringInfo(); char *qualifiedRelationName = generate_qualified_relation_name(relationId); - appendStringInfo(command, "UPDATE pg_dist_partition " - "SET colocationid = %d " - "WHERE logicalrelid = %s::regclass", - colocationId, quote_literal_cstr(qualifiedRelationName)); + appendStringInfo(command, + "SELECT citus_internal_update_relation_colocation(%s::regclass, %d)", + quote_literal_cstr(qualifiedRelationName), colocationId); return command->data; } @@ -2122,8 +2123,8 @@ citus_internal_add_partition_metadata(PG_FUNCTION_ARGS) * metadata is not sane, the user can only affect its own tables. * Given that the user is owner of the table, we should allow. */ - EnsurePartitionMetadataIsSane(distributionMethod, colocationId, - replicationModel); + EnsurePartitionMetadataIsSane(relationId, distributionMethod, colocationId, + replicationModel, distributionColumnVar); } InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumnVar, @@ -2138,8 +2139,8 @@ citus_internal_add_partition_metadata(PG_FUNCTION_ARGS) * for inserting into pg_dist_partition metadata. */ static void -EnsurePartitionMetadataIsSane(char distributionMethod, int colocationId, - char replicationModel) +EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod, int colocationId, + char replicationModel, Var *distributionColumnVar) { if (!(distributionMethod == DISTRIBUTE_BY_HASH || distributionMethod == DISTRIBUTE_BY_NONE)) @@ -2155,6 +2156,26 @@ EnsurePartitionMetadataIsSane(char distributionMethod, int colocationId, errmsg("Metadata syncing is only allowed for valid " "colocation id values."))); } + else if (colocationId != INVALID_COLOCATION_ID && + distributionMethod == DISTRIBUTE_BY_HASH) + { + int count = 1; + List *targetColocatedTableList = + ColocationGroupTableList(colocationId, count); + + /* + * If we have any colocated hash tables, ensure if they share the + * same distribution key properties. + */ + if (list_length(targetColocatedTableList) >= 1) + { + Oid targetRelationId = linitial_oid(targetColocatedTableList); + + EnsureColumnTypeEquality(relationId, targetRelationId, distributionColumnVar, + DistPartitionKeyOrError(targetRelationId)); + } + } + if (!(replicationModel == REPLICATION_MODEL_2PC || replicationModel == REPLICATION_MODEL_STREAMING || @@ -2612,3 +2633,64 @@ citus_internal_delete_shard_metadata(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + + +/* + * citus_internal_update_relation_colocation is an internal UDF to + * delete a row in pg_dist_shard and corresponding placement rows + * from pg_dist_shard_placement. + */ +Datum +citus_internal_update_relation_colocation(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + uint32 tagetColocationId = PG_GETARG_UINT32(1); + + EnsureTableOwner(relationId); + + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + + /* ensure that the table is in pg_dist_partition */ + char partitionMethod = PartitionMethodViaCatalog(relationId); + if (partitionMethod == DISTRIBUTE_BY_INVALID) + { + /* connection from the coordinator operating on a shard */ + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("The relation \"%s\" does not have a valid " + "entry in pg_dist_partition.", + get_rel_name(relationId)))); + } + else if (partitionMethod != DISTRIBUTE_BY_HASH) + { + /* connection from the coordinator operating on a shard */ + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Updating colocation ids are only allowed for hash " + "distributed tables: %c", partitionMethod))); + } + + int count = 1; + List *targetColocatedTableList = + ColocationGroupTableList(tagetColocationId, count); + + if (list_length(targetColocatedTableList) == 0) + { + /* the table is colocated with none, so nothing to check */ + } + else + { + Oid targetRelationId = linitial_oid(targetColocatedTableList); + + ErrorIfShardPlacementsNotColocated(relationId, targetRelationId); + CheckReplicationModel(relationId, targetRelationId); + CheckDistributionColumnType(relationId, targetRelationId); + } + } + + bool localOnly = true; + UpdateRelationColocationGroup(relationId, tagetColocationId, localOnly); + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql index b56a3296f..c2b779d9f 100644 --- a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql +++ b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql @@ -15,3 +15,4 @@ ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupi #include "udfs/citus_internal_add_placement_metadata/10.2-1.sql"; #include "udfs/citus_internal_update_placement_metadata/10.2-1.sql"; #include "udfs/citus_internal_delete_shard_metadata/10.2-1.sql"; +#include "udfs/citus_internal_update_relation_colocation/10.2-1.sql"; diff --git a/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql b/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql index afb7bd195..7809b80ce 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql @@ -16,6 +16,7 @@ DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "ch DROP FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, integer, bigint, integer, bigint); DROP FUNCTION pg_catalog.citus_internal_update_placement_metadata(bigint, integer, integer); DROP FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint); +DROP FUNCTION pg_catalog.citus_internal_update_relation_colocation(oid, integer); REVOKE ALL ON FUNCTION pg_catalog.worker_record_sequence_dependency(regclass,regclass,name) FROM PUBLIC; ALTER TABLE pg_catalog.pg_dist_placement DROP CONSTRAINT placement_shardid_groupid_unique_index; diff --git a/src/backend/distributed/sql/udfs/citus_internal_update_relation_colocation/10.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_update_relation_colocation/10.2-1.sql new file mode 100644 index 000000000..a7f2ec1c6 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_update_relation_colocation/10.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_relation_colocation(relation_id Oid, target_colocation_id int) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_update_relation_colocation(oid, int) IS + 'Updates colocationId field of pg_dist_partition for the relation_id'; + diff --git a/src/backend/distributed/sql/udfs/citus_internal_update_relation_colocation/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_update_relation_colocation/latest.sql new file mode 100644 index 000000000..a7f2ec1c6 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_update_relation_colocation/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_relation_colocation(relation_id Oid, target_colocation_id int) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_update_relation_colocation(oid, int) IS + 'Updates colocationId field of pg_dist_partition for the relation_id'; + diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 979f24b8f..fa698f88a 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -42,15 +42,12 @@ /* local function forward declarations */ static void MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId); -static void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId); static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); static bool HashPartitionedShardIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); static int CompareShardPlacementsByNode(const void *leftElement, const void *rightElement); -static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId); -static List * ColocationGroupTableList(Oid colocationId); static void DeleteColocationGroup(uint32 colocationId); static uint32 CreateColocationGroupForRelation(Oid sourceRelationId); static void BreakColocation(Oid sourceRelationId); @@ -161,7 +158,8 @@ BreakColocation(Oid sourceRelationId) Relation pgDistColocation = table_open(DistColocationRelationId(), ExclusiveLock); uint32 newColocationId = GetNextColocationId(); - UpdateRelationColocationGroup(sourceRelationId, newColocationId); + bool localOnly = false; + UpdateRelationColocationGroup(sourceRelationId, newColocationId, localOnly); /* if there is not any remaining table in the colocation group, delete it */ DeleteColocationGroupIfNoTablesBelong(sourceRelationId); @@ -230,7 +228,8 @@ CreateColocationGroupForRelation(Oid sourceRelationId) uint32 sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor, sourceDistributionColumnType, sourceDistributionColumnCollation); - UpdateRelationColocationGroup(sourceRelationId, sourceColocationId); + bool localOnly = false; + UpdateRelationColocationGroup(sourceRelationId, sourceColocationId, localOnly); return sourceColocationId; } @@ -279,7 +278,8 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) uint32 targetColocationId = TableColocationId(targetRelationId); /* finally set colocation group for the target relation */ - UpdateRelationColocationGroup(targetRelationId, sourceColocationId); + bool localOnly = false; + UpdateRelationColocationGroup(targetRelationId, sourceColocationId, localOnly); /* if there is not any remaining table in the colocation group, delete it */ DeleteColocationGroupIfNoTablesBelong(targetColocationId); @@ -300,7 +300,7 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) * * Note that, this functions assumes that both tables are hash distributed. */ -static void +void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId) { ListCell *leftShardIntervalCell = NULL; @@ -680,29 +680,48 @@ CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId) */ void CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId) +{ + /* reference tables have NULL distribution column */ + Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId); + + /* reference tables have NULL distribution column */ + Var *targetDistributionColumn = DistPartitionKey(targetRelationId); + + EnsureColumnTypeEquality(sourceRelationId, targetRelationId, + sourceDistributionColumn, targetDistributionColumn); +} + + +/* + * GetColumnTypeEquality checks if distribution column types and collations + * of the given columns are same. The function sets the boolean pointers. + */ +void +EnsureColumnTypeEquality(Oid sourceRelationId, Oid targetRelationId, + Var *sourceDistributionColumn, Var *targetDistributionColumn) { Oid sourceDistributionColumnType = InvalidOid; Oid targetDistributionColumnType = InvalidOid; Oid sourceDistributionColumnCollation = InvalidOid; Oid targetDistributionColumnCollation = InvalidOid; - /* reference tables have NULL distribution column */ - Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId); if (sourceDistributionColumn != NULL) { sourceDistributionColumnType = sourceDistributionColumn->vartype; sourceDistributionColumnCollation = sourceDistributionColumn->varcollid; } - /* reference tables have NULL distribution column */ - Var *targetDistributionColumn = DistPartitionKey(targetRelationId); if (targetDistributionColumn != NULL) { targetDistributionColumnType = targetDistributionColumn->vartype; targetDistributionColumnCollation = targetDistributionColumn->varcollid; } - if (sourceDistributionColumnType != targetDistributionColumnType) + bool columnTypesSame = sourceDistributionColumnType == targetDistributionColumnType; + bool columnCollationsSame = + sourceDistributionColumnCollation == targetDistributionColumnCollation; + + if (!columnTypesSame) { char *sourceRelationName = get_rel_name(sourceRelationId); char *targetRelationName = get_rel_name(targetRelationId); @@ -714,16 +733,17 @@ CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId) targetRelationName))); } - if (sourceDistributionColumnCollation != targetDistributionColumnCollation) + if (!columnCollationsSame) { char *sourceRelationName = get_rel_name(sourceRelationId); char *targetRelationName = get_rel_name(targetRelationId); ereport(ERROR, (errmsg("cannot colocate tables %s and %s", sourceRelationName, targetRelationName), - errdetail("Distribution column collations don't match for " - "%s and %s.", sourceRelationName, - targetRelationName))); + errdetail( + "Distribution column collations don't match for " + "%s and %s.", sourceRelationName, + targetRelationName))); } } @@ -731,9 +751,13 @@ CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId) /* * UpdateRelationColocationGroup updates colocation group in pg_dist_partition * for the given relation. + * + * When localOnly is true, the function does not propagate changes to the + * metadata workers. */ -static void -UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId) +void +UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, + bool localOnly) { bool indexOK = true; int scanKeyCount = 1; @@ -782,7 +806,7 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId) table_close(pgDistPartition, NoLock); bool shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId); - if (shouldSyncMetadata) + if (shouldSyncMetadata && !localOnly) { char *updateColocationIdCommand = ColocationIdUpdateCommand(distributedRelationId, colocationId); @@ -878,7 +902,8 @@ ColocatedTableList(Oid distributedTableId) return colocatedTableList; } - colocatedTableList = ColocationGroupTableList(tableColocationId); + int count = 0; + colocatedTableList = ColocationGroupTableList(tableColocationId, count); return colocatedTableList; } @@ -887,9 +912,14 @@ ColocatedTableList(Oid distributedTableId) /* * ColocationGroupTableList returns the list of tables in the given colocation * group. If the colocation group is INVALID_COLOCATION_ID, it returns NIL. + * + * If count is zero then the command is executed for all rows that it applies to. + * If count is greater than zero, then no more than count rows will be retrieved; + * execution stops when the count is reached, much like adding a LIMIT clause + * to the query. */ -static List * -ColocationGroupTableList(Oid colocationId) +List * +ColocationGroupTableList(uint32 colocationId, uint32 count) { List *colocatedTableList = NIL; bool indexOK = true; @@ -906,7 +936,7 @@ ColocationGroupTableList(Oid colocationId) } ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, - BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId)); + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId)); Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); @@ -924,6 +954,17 @@ ColocationGroupTableList(Oid colocationId) colocatedTableList = lappend_oid(colocatedTableList, colocatedTableId); heapTuple = systable_getnext(scanDescriptor); + + if (count == 0) + { + /* fetch all rows */ + continue; + } + else if (list_length(colocatedTableList) >= count) + { + /* we are done */ + break; + } } systable_endscan(scanDescriptor); @@ -1158,7 +1199,8 @@ DeleteColocationGroupIfNoTablesBelong(uint32 colocationId) { if (colocationId != INVALID_COLOCATION_ID) { - List *colocatedTableList = ColocationGroupTableList(colocationId); + int count = 1; + List *colocatedTableList = ColocationGroupTableList(colocationId, count); int colocatedTableCount = list_length(colocatedTableList); if (colocatedTableCount == 0) diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index d2ee3d0be..0ce4d8fb1 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -33,9 +33,15 @@ extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnCollation); extern bool IsColocateWithNone(char *colocateWithTableName); extern uint32 GetNextColocationId(void); +extern void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId); extern void CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId); extern void CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId); - +extern void EnsureColumnTypeEquality(Oid sourceRelationId, Oid targetRelationId, + Var *sourceDistributionColumn, + Var *targetDistributionColumn); +extern void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, + bool localOnly); extern void DeleteColocationGroupIfNoTablesBelong(uint32 colocationId); +extern List * ColocationGroupTableList(uint32 colocationId, uint32 count); #endif /* COLOCATION_UTILS_H_ */ diff --git a/src/test/regress/expected/metadata_sync_helpers.out b/src/test/regress/expected/metadata_sync_helpers.out index ff3594c77..58678d8f6 100644 --- a/src/test/regress/expected/metadata_sync_helpers.out +++ b/src/test/regress/expected/metadata_sync_helpers.out @@ -14,6 +14,8 @@ CREATE TABLE test(col_1 int); -- not in a distributed transaction SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's'); ERROR: This is an internal Citus function can only be used in a distributed transaction +SELECT citus_internal_update_relation_colocation ('test'::regclass, 1); +ERROR: This is an internal Citus function can only be used in a distributed transaction -- in a distributed transaction, but the application name is not Citus BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); @@ -75,8 +77,21 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's'); ERROR: must be owner of table test ROLLBACK; +-- we do not own the relation +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + SELECT citus_internal_update_relation_colocation ('test'::regclass, 10); +ERROR: must be owner of table test +ROLLBACK; -- finally, a user can only add its own tables to the metadata CREATE TABLE test_2(col_1 int, col_2 int); +CREATE TABLE test_3(col_1 int, col_2 int); BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); assign_distributed_transaction_id @@ -415,7 +430,13 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) SET application_name to 'citus'; - SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 250, 's'); + citus_internal_add_partition_metadata +--------------------------------------------------------------------- + +(1 row) + + SELECT citus_internal_add_partition_metadata ('test_3'::regclass, 'h', 'col_1', 251, 's'); citus_internal_add_partition_metadata --------------------------------------------------------------------- @@ -428,6 +449,22 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1 row) COMMIT; +-- we can update to a non-existing colocation group (e.g., colocate_with:=none) +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232); + citus_internal_update_relation_colocation +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; -- invalid shard ids are not allowed BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); @@ -585,7 +622,8 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; ('test_2'::regclass, 1420002::bigint, 't'::"char", '31'::text, '40'::text), ('test_2'::regclass, 1420003::bigint, 't'::"char", '41'::text, '50'::text), ('test_2'::regclass, 1420004::bigint, 't'::"char", '51'::text, '60'::text), - ('test_2'::regclass, 1420005::bigint, 't'::"char", '61'::text, '70'::text)) + ('test_2'::regclass, 1420005::bigint, 't'::"char", '61'::text, '70'::text), + ('test_3'::regclass, 1420008::bigint, 't'::"char", '11'::text, '20'::text)) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; citus_internal_add_shard_metadata --------------------------------------------------------------------- @@ -595,7 +633,47 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; -(6 rows) + +(7 rows) + +COMMIT; +-- we cannot mark these two tables colocated because they are not colocated +BEGIN; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); +ERROR: cannot colocate tables test_2 and test_3 +ROLLBACK; +-- now, add few more shards for test_3 to make it colocated with test_2 +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + \set VERBOSITY terse + WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) + AS (VALUES ('test_3'::regclass, 1420009::bigint, 't'::"char", '21'::text, '30'::text), + ('test_3'::regclass, 1420010::bigint, 't'::"char", '31'::text, '40'::text), + ('test_3'::regclass, 1420011::bigint, 't'::"char", '41'::text, '50'::text), + ('test_3'::regclass, 1420012::bigint, 't'::"char", '51'::text, '60'::text), + ('test_3'::regclass, 1420013::bigint, 't'::"char", '61'::text, '70'::text)) + SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; + citus_internal_add_shard_metadata +--------------------------------------------------------------------- + + + + + +(5 rows) COMMIT; -- shardMin/MaxValues should be NULL for reference tables @@ -807,7 +885,13 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1420002, 1, 0::bigint, get_node_id(), 1500002), (1420003, 1, 0::bigint, get_node_id(), 1500003), (1420004, 1, 0::bigint, get_node_id(), 1500004), - (1420005, 1, 0::bigint, get_node_id(), 1500005)) + (1420005, 1, 0::bigint, get_node_id(), 1500005), + (1420008, 1, 0::bigint, get_node_id(), 1500006), + (1420009, 1, 0::bigint, get_node_id(), 1500007), + (1420010, 1, 0::bigint, get_node_id(), 1500008), + (1420011, 1, 0::bigint, get_node_id(), 1500009), + (1420012, 1, 0::bigint, get_node_id(), 1500010), + (1420013, 1, 0::bigint, get_node_id(), 1500011)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; citus_internal_add_placement_metadata --------------------------------------------------------------------- @@ -817,9 +901,31 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; -(6 rows) + + + + + + +(12 rows) COMMIT; +-- we should be able to colocate both tables now +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); + citus_internal_update_relation_colocation +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; -- try to update placements -- fails because we are trying to update it to non-existing node BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; @@ -967,6 +1073,114 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; 0 (1 row) +ROLLBACK; +-- we'll do some metadata changes to trigger some error cases +-- so connect as superuser +\c - postgres - :worker_1_port +SET search_path TO metadata_sync_helpers; +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + -- with an ugly trick, update the repmodel + -- so that making two tables colocated fails + UPDATE pg_dist_partition SET repmodel = 't' + WHERE logicalrelid = 'test_2'::regclass; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); +ERROR: cannot colocate tables test_2 and test_3 +ROLLBACK; +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + -- with an ugly trick, update the vartype of table from int to bigint + -- so that making two tables colocated fails + UPDATE pg_dist_partition SET partkey = '{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}' + WHERE logicalrelid = 'test_2'::regclass; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); +ERROR: cannot colocate tables test_2 and test_3 +ROLLBACK; +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + -- with an ugly trick, update the partmethod of the table to not-valid + -- so that making two tables colocated fails + UPDATE pg_dist_partition SET partmethod = '' + WHERE logicalrelid = 'test_2'::regclass; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); +ERROR: The relation "test_2" does not have a valid entry in pg_dist_partition. +ROLLBACK; +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + -- with an ugly trick, update the partmethod of the table to not-valid + -- so that making two tables colocated fails + UPDATE pg_dist_partition SET partmethod = 'a' + WHERE logicalrelid = 'test_2'::regclass; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); +ERROR: Updating colocation ids are only allowed for hash distributed tables: a +ROLLBACK; +-- colocated hash distributed table should have the same dist key columns +CREATE TABLE test_5(int_col int, text_col text); +CREATE TABLE test_6(int_col int, text_col text); +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + \set VERBOSITY terse + SELECT citus_internal_add_partition_metadata ('test_5'::regclass, 'h', 'int_col', 500, 's'); + citus_internal_add_partition_metadata +--------------------------------------------------------------------- + +(1 row) + + SELECT citus_internal_add_partition_metadata ('test_6'::regclass, 'h', 'text_col', 500, 's'); +ERROR: cannot colocate tables test_6 and test_5 +ROLLBACK; +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + CREATE COLLATION collation_t1 (provider = icu, locale = 'de-u-co-phonebk'); + CREATE COLLATION caseinsensitive (provider = icu, locale = 'und-u-ks-level2'); + -- colocated hash distributed table should have the same dist key collations + CREATE TABLE test_7(int_col int, text_col text COLLATE "collation_t1"); + CREATE TABLE test_8(int_col int, text_col text COLLATE "caseinsensitive"); + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + \set VERBOSITY terse + SELECT citus_internal_add_partition_metadata ('test_7'::regclass, 'h', 'text_col', 500, 's'); + citus_internal_add_partition_metadata +--------------------------------------------------------------------- + +(1 row) + + SELECT citus_internal_add_partition_metadata ('test_8'::regclass, 'h', 'text_col', 500, 's'); +ERROR: cannot colocate tables test_8 and test_7 ROLLBACK; -- we don't need the table/schema anymore -- connect back as super user to drop everything diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 1486401ec..07e3c0f92 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -651,8 +651,9 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) void | function citus_internal_delete_shard_metadata(bigint) void | function citus_internal_update_placement_metadata(bigint,integer,integer) void + | function citus_internal_update_relation_colocation(oid,integer) void | function stop_metadata_sync_to_node(text,integer,boolean) void -(9 rows) +(10 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 872127f9f..1abc2d45b 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -833,12 +833,21 @@ SELECT create_distributed_table('mx_colocation_test_1', 'a'); (1 row) CREATE TABLE mx_colocation_test_2 (a int); -SELECT create_distributed_table('mx_colocation_test_2', 'a'); +SELECT create_distributed_table('mx_colocation_test_2', 'a', colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) +-- Reset the colocation IDs of the test tables +DELETE FROM + pg_dist_colocation +WHERE EXISTS ( + SELECT 1 + FROM pg_dist_partition + WHERE + colocationid = pg_dist_partition.colocationid + AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass); -- Check the colocation IDs of the created tables SELECT logicalrelid, colocationid @@ -851,25 +860,9 @@ ORDER BY logicalrelid; logicalrelid | colocationid --------------------------------------------------------------------- mx_colocation_test_1 | 10000 - mx_colocation_test_2 | 10000 + mx_colocation_test_2 | 10001 (2 rows) --- Reset the colocation IDs of the test tables -DELETE FROM - pg_dist_colocation -WHERE EXISTS ( - SELECT 1 - FROM pg_dist_partition - WHERE - colocationid = pg_dist_partition.colocationid - AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass); -UPDATE - pg_dist_partition -SET - colocationid = 0 -WHERE - logicalrelid = 'mx_colocation_test_1'::regclass - OR logicalrelid = 'mx_colocation_test_2'::regclass; -- Update colocation and see the changes on the master and the worker SELECT update_distributed_table_colocation('mx_colocation_test_1', colocate_with => 'mx_colocation_test_2'); update_distributed_table_colocation @@ -1680,9 +1673,9 @@ SELECT unnest(master_metadata_snapshot()) order by 1; SELECT citus_internal_add_partition_metadata ('mx_test_schema_1.mx_table_1'::regclass, 'h', 'col1', 3, 's') SELECT citus_internal_add_partition_metadata ('mx_test_schema_2.mx_table_2'::regclass, 'h', 'col1', 3, 's') SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 0, 's') - SELECT citus_internal_add_partition_metadata ('public.dist_table_1'::regclass, 'h', 'a', 10004, 's') - SELECT citus_internal_add_partition_metadata ('public.mx_ref'::regclass, 'n', NULL, 10002, 't') - SELECT citus_internal_add_partition_metadata ('public.test_table'::regclass, 'h', 'id', 10004, 's') + SELECT citus_internal_add_partition_metadata ('public.dist_table_1'::regclass, 'h', 'a', 10005, 's') + SELECT citus_internal_add_partition_metadata ('public.mx_ref'::regclass, 'n', NULL, 10003, 't') + SELECT citus_internal_add_partition_metadata ('public.test_table'::regclass, 'h', 'id', 10005, 's') SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_sequence_0 AS integer INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START WITH 1 CACHE 1 NO CYCLE','integer') diff --git a/src/test/regress/expected/update_colocation_mx.out b/src/test/regress/expected/update_colocation_mx.out new file mode 100644 index 000000000..12b0a4a46 --- /dev/null +++ b/src/test/regress/expected/update_colocation_mx.out @@ -0,0 +1,100 @@ +-- in order to make the enterprise and community +-- tests outputs the same, disable enable_ddl_propagation +-- and create the roles/schema manually +SET citus.enable_ddl_propagation TO OFF; +CREATE SCHEMA "Update Colocation"; +SET client_min_messages TO ERROR; +CREATE ROLE mx_update_colocation WITH LOGIN; +GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation; +\c - - - :worker_1_port +SET citus.enable_ddl_propagation TO OFF; +CREATE SCHEMA "Update Colocation"; +SET client_min_messages TO ERROR; +CREATE ROLE mx_update_colocation WITH LOGIN; +GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation; +\c - - - :worker_2_port +SET citus.enable_ddl_propagation TO OFF; +CREATE SCHEMA "Update Colocation"; +SET client_min_messages TO ERROR; +CREATE ROLE mx_update_colocation WITH LOGIN; +GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation; +\c - mx_update_colocation - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path TO "Update Colocation"; +CREATE TABLE t1(a int); +CREATE TABLE t2(a int); +SELECT create_distributed_table('t1', 'a', colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('t2', 'a', colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT update_distributed_table_colocation('t1', 't2'); + update_distributed_table_colocation +--------------------------------------------------------------------- + +(1 row) + +-- show that we successfuly updated the colocationids to the same value +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); + count +--------------------------------------------------------------------- + 1 +(1 row) + +\c - - - :worker_1_port +SET search_path TO "Update Colocation"; +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); + count +--------------------------------------------------------------------- + 1 +(1 row) + +\c - - - :worker_2_port +SET search_path TO "Update Colocation"; +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); + count +--------------------------------------------------------------------- + 1 +(1 row) + +\c - - - :master_port +SET search_path TO "Update Colocation"; +SELECT update_distributed_table_colocation('t1', 'none'); + update_distributed_table_colocation +--------------------------------------------------------------------- + +(1 row) + +-- show that we successfuly updated the colocationids different values +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); + count +--------------------------------------------------------------------- + 2 +(1 row) + +\c - - - :worker_1_port +SET search_path TO "Update Colocation"; +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); + count +--------------------------------------------------------------------- + 2 +(1 row) + +\c - - - :worker_2_port +SET search_path TO "Update Colocation"; +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); + count +--------------------------------------------------------------------- + 2 +(1 row) + +\c - postgres - :master_port +SET client_min_messages TO ERROR; +DROP SCHEMA "Update Colocation" cascade; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index f040a0a4a..6ee6a3084 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -71,6 +71,7 @@ ORDER BY 1; function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) function citus_internal_delete_shard_metadata(bigint) function citus_internal_update_placement_metadata(bigint,integer,integer) + function citus_internal_update_relation_colocation(oid,integer) function citus_isolation_test_session_is_blocked(integer,integer[]) function citus_json_concatenate(json,json) function citus_json_concatenate_final(json) @@ -253,5 +254,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(237 rows) +(238 rows) diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index e0f590443..64dfddba5 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -54,6 +54,7 @@ test: multi_mx_reference_table test: multi_mx_insert_select_repartition test: locally_execute_intermediate_results test: multi_mx_alter_distributed_table +test: update_colocation_mx # should be executed sequentially because it modifies metadata test: local_shard_execution_dropped_column diff --git a/src/test/regress/sql/metadata_sync_helpers.sql b/src/test/regress/sql/metadata_sync_helpers.sql index 58339612f..e4f174b16 100644 --- a/src/test/regress/sql/metadata_sync_helpers.sql +++ b/src/test/regress/sql/metadata_sync_helpers.sql @@ -16,6 +16,7 @@ CREATE TABLE test(col_1 int); -- not in a distributed transaction SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's'); +SELECT citus_internal_update_relation_colocation ('test'::regclass, 1); -- in a distributed transaction, but the application name is not Citus BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; @@ -70,8 +71,16 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's'); ROLLBACK; +-- we do not own the relation +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + SELECT citus_internal_update_relation_colocation ('test'::regclass, 10); +ROLLBACK; + -- finally, a user can only add its own tables to the metadata CREATE TABLE test_2(col_1 int, col_2 int); +CREATE TABLE test_3(col_1 int, col_2 int); BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); SET application_name to 'citus'; @@ -264,10 +273,18 @@ ROLLBACK; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); SET application_name to 'citus'; - SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 's'); + SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 250, 's'); + SELECT citus_internal_add_partition_metadata ('test_3'::regclass, 'h', 'col_1', 251, 's'); SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 't'); COMMIT; +-- we can update to a non-existing colocation group (e.g., colocate_with:=none) +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232); +ROLLBACK; + -- invalid shard ids are not allowed BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); @@ -377,7 +394,29 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; ('test_2'::regclass, 1420002::bigint, 't'::"char", '31'::text, '40'::text), ('test_2'::regclass, 1420003::bigint, 't'::"char", '41'::text, '50'::text), ('test_2'::regclass, 1420004::bigint, 't'::"char", '51'::text, '60'::text), - ('test_2'::regclass, 1420005::bigint, 't'::"char", '61'::text, '70'::text)) + ('test_2'::regclass, 1420005::bigint, 't'::"char", '61'::text, '70'::text), + ('test_3'::regclass, 1420008::bigint, 't'::"char", '11'::text, '20'::text)) + SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; +COMMIT; + +-- we cannot mark these two tables colocated because they are not colocated +BEGIN; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); +ROLLBACK; + +-- now, add few more shards for test_3 to make it colocated with test_2 +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + \set VERBOSITY terse + WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) + AS (VALUES ('test_3'::regclass, 1420009::bigint, 't'::"char", '21'::text, '30'::text), + ('test_3'::regclass, 1420010::bigint, 't'::"char", '31'::text, '40'::text), + ('test_3'::regclass, 1420011::bigint, 't'::"char", '41'::text, '50'::text), + ('test_3'::regclass, 1420012::bigint, 't'::"char", '51'::text, '60'::text), + ('test_3'::regclass, 1420013::bigint, 't'::"char", '61'::text, '70'::text)) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; COMMIT; @@ -526,10 +565,23 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1420002, 1, 0::bigint, get_node_id(), 1500002), (1420003, 1, 0::bigint, get_node_id(), 1500003), (1420004, 1, 0::bigint, get_node_id(), 1500004), - (1420005, 1, 0::bigint, get_node_id(), 1500005)) + (1420005, 1, 0::bigint, get_node_id(), 1500005), + (1420008, 1, 0::bigint, get_node_id(), 1500006), + (1420009, 1, 0::bigint, get_node_id(), 1500007), + (1420010, 1, 0::bigint, get_node_id(), 1500008), + (1420011, 1, 0::bigint, get_node_id(), 1500009), + (1420012, 1, 0::bigint, get_node_id(), 1500010), + (1420013, 1, 0::bigint, get_node_id(), 1500011)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; COMMIT; +-- we should be able to colocate both tables now +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); +ROLLBACK; + -- try to update placements -- fails because we are trying to update it to non-existing node @@ -619,6 +671,80 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000; ROLLBACK; + +-- we'll do some metadata changes to trigger some error cases +-- so connect as superuser +\c - postgres - :worker_1_port +SET search_path TO metadata_sync_helpers; +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + -- with an ugly trick, update the repmodel + -- so that making two tables colocated fails + UPDATE pg_dist_partition SET repmodel = 't' + WHERE logicalrelid = 'test_2'::regclass; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); +ROLLBACK; + + +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + -- with an ugly trick, update the vartype of table from int to bigint + -- so that making two tables colocated fails + UPDATE pg_dist_partition SET partkey = '{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}' + WHERE logicalrelid = 'test_2'::regclass; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); +ROLLBACK; + +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + -- with an ugly trick, update the partmethod of the table to not-valid + -- so that making two tables colocated fails + UPDATE pg_dist_partition SET partmethod = '' + WHERE logicalrelid = 'test_2'::regclass; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); +ROLLBACK; + +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + -- with an ugly trick, update the partmethod of the table to not-valid + -- so that making two tables colocated fails + UPDATE pg_dist_partition SET partmethod = 'a' + WHERE logicalrelid = 'test_2'::regclass; + SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251); +ROLLBACK; + +-- colocated hash distributed table should have the same dist key columns +CREATE TABLE test_5(int_col int, text_col text); +CREATE TABLE test_6(int_col int, text_col text); + +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + \set VERBOSITY terse + SELECT citus_internal_add_partition_metadata ('test_5'::regclass, 'h', 'int_col', 500, 's'); + SELECT citus_internal_add_partition_metadata ('test_6'::regclass, 'h', 'text_col', 500, 's'); +ROLLBACK; + + +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + CREATE COLLATION collation_t1 (provider = icu, locale = 'de-u-co-phonebk'); + CREATE COLLATION caseinsensitive (provider = icu, locale = 'und-u-ks-level2'); + + -- colocated hash distributed table should have the same dist key collations + CREATE TABLE test_7(int_col int, text_col text COLLATE "collation_t1"); + CREATE TABLE test_8(int_col int, text_col text COLLATE "caseinsensitive"); + + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + \set VERBOSITY terse + SELECT citus_internal_add_partition_metadata ('test_7'::regclass, 'h', 'text_col', 500, 's'); + SELECT citus_internal_add_partition_metadata ('test_8'::regclass, 'h', 'text_col', 500, 's'); +ROLLBACK; + -- we don't need the table/schema anymore -- connect back as super user to drop everything \c - postgres - :worker_1_port diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 45e0fe8e4..975625ceb 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -363,17 +363,7 @@ CREATE TABLE mx_colocation_test_1 (a int); SELECT create_distributed_table('mx_colocation_test_1', 'a'); CREATE TABLE mx_colocation_test_2 (a int); -SELECT create_distributed_table('mx_colocation_test_2', 'a'); - --- Check the colocation IDs of the created tables -SELECT - logicalrelid, colocationid -FROM - pg_dist_partition -WHERE - logicalrelid = 'mx_colocation_test_1'::regclass - OR logicalrelid = 'mx_colocation_test_2'::regclass -ORDER BY logicalrelid; +SELECT create_distributed_table('mx_colocation_test_2', 'a', colocate_with:='none'); -- Reset the colocation IDs of the test tables DELETE FROM @@ -384,13 +374,16 @@ WHERE EXISTS ( WHERE colocationid = pg_dist_partition.colocationid AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass); -UPDATE + +-- Check the colocation IDs of the created tables +SELECT + logicalrelid, colocationid +FROM pg_dist_partition -SET - colocationid = 0 WHERE logicalrelid = 'mx_colocation_test_1'::regclass - OR logicalrelid = 'mx_colocation_test_2'::regclass; + OR logicalrelid = 'mx_colocation_test_2'::regclass +ORDER BY logicalrelid; -- Update colocation and see the changes on the master and the worker SELECT update_distributed_table_colocation('mx_colocation_test_1', colocate_with => 'mx_colocation_test_2'); diff --git a/src/test/regress/sql/update_colocation_mx.sql b/src/test/regress/sql/update_colocation_mx.sql new file mode 100644 index 000000000..c62dfa7b3 --- /dev/null +++ b/src/test/regress/sql/update_colocation_mx.sql @@ -0,0 +1,58 @@ + +-- in order to make the enterprise and community +-- tests outputs the same, disable enable_ddl_propagation +-- and create the roles/schema manually +SET citus.enable_ddl_propagation TO OFF; +CREATE SCHEMA "Update Colocation"; +SET client_min_messages TO ERROR; +CREATE ROLE mx_update_colocation WITH LOGIN; +GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation; + +\c - - - :worker_1_port +SET citus.enable_ddl_propagation TO OFF; +CREATE SCHEMA "Update Colocation"; +SET client_min_messages TO ERROR; +CREATE ROLE mx_update_colocation WITH LOGIN; +GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation; + +\c - - - :worker_2_port +SET citus.enable_ddl_propagation TO OFF; +CREATE SCHEMA "Update Colocation"; +SET client_min_messages TO ERROR; +CREATE ROLE mx_update_colocation WITH LOGIN; +GRANT ALL ON SCHEMA "Update Colocation" TO mx_update_colocation; +\c - mx_update_colocation - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path TO "Update Colocation"; + +CREATE TABLE t1(a int); +CREATE TABLE t2(a int); +SELECT create_distributed_table('t1', 'a', colocate_with:='none'); +SELECT create_distributed_table('t2', 'a', colocate_with:='none'); +SELECT update_distributed_table_colocation('t1', 't2'); + +-- show that we successfuly updated the colocationids to the same value +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); +\c - - - :worker_1_port +SET search_path TO "Update Colocation"; +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); +\c - - - :worker_2_port +SET search_path TO "Update Colocation"; +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); + +\c - - - :master_port +SET search_path TO "Update Colocation"; +SELECT update_distributed_table_colocation('t1', 'none'); + +-- show that we successfuly updated the colocationids different values +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); +\c - - - :worker_1_port +SET search_path TO "Update Colocation"; +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); +\c - - - :worker_2_port +SET search_path TO "Update Colocation"; +SELECT count(DISTINCT colocationid) FROM pg_dist_partition WHERE logicalrelid IN ('t1'::regclass, 't2'::regclass); + +\c - postgres - :master_port +SET client_min_messages TO ERROR; +DROP SCHEMA "Update Colocation" cascade;