diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index 752552343..4e94930e3 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -361,7 +361,8 @@ QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray, }; bool nullsArray[Natts_pg_dist_shard] = { [Anum_pg_dist_shard_shardminvalue - 1] = minValueNulls[partitionIndex], - [Anum_pg_dist_shard_shardmaxvalue - 1] = maxValueNulls[partitionIndex] + [Anum_pg_dist_shard_shardmaxvalue - 1] = maxValueNulls[partitionIndex], + [Anum_pg_dist_shard_shardgroupid - 1] = true }; shardIntervalArray[partitionIndex] = diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 98ffe1b7d..59c39a030 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1705,6 +1705,8 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType, isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true; } + isNulls[Anum_pg_dist_shard_shardgroupid - 1] = true; + /* open shard relation and insert new tuple */ Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock); diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 86b40bfba..34a1fe25e 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -507,7 +507,7 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList) static void AcquireLogicalReplicationLock(void) { - LOCKTAG tag; + LOCKTAG tag = { 0 }; SET_LOCKTAG_LOGICAL_REPLICATION(tag); LockAcquire(&tag, ExclusiveLock, false, false); diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index 981c5f375..f30b57db6 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -2,3 +2,48 @@ -- bump version to 11.3-1 +CREATE TABLE citus.pg_dist_shardgroup ( + shardgroupid bigint PRIMARY KEY, + colocationid integer NOT NULL, + shardminvalue text, + shardmaxvalue text +); +ALTER TABLE citus.pg_dist_shardgroup SET SCHEMA pg_catalog; + +INSERT INTO pg_catalog.pg_dist_shardgroup + SELECT min(shardid) as shardgroupid, + colocationid, + shardminvalue, + shardmaxvalue + FROM pg_dist_shard + JOIN pg_dist_partition USING (logicalrelid) + GROUP BY colocationid, shardminvalue, shardmaxvalue; + +ALTER TABLE pg_catalog.pg_dist_shard ADD COLUMN shardgroupid bigint; + +-- backfill shardgroupid field by finding the generated shardgroup above by joining the colocationid, shardminvalue and +-- shardmaxvalue (for the shardvalues we want to treat NULL values as equal, hence the complex conditions for those). +-- After this operation _all_ shards should have a shardgroupid associated which satisfies the colocation invariant of +-- the shards in the same colocationid. +UPDATE pg_catalog.pg_dist_shard AS shard + SET shardgroupid = shardgroup.shardgroupid + FROM ( + SELECT shardgroupid, + colocationid, + shardminvalue, + shardmaxvalue, + logicalrelid + FROM pg_catalog.pg_dist_shardgroup + JOIN pg_dist_partition USING (colocationid) + ) AS shardgroup +WHERE shard.logicalrelid = shardgroup.logicalrelid + AND ( + shard.shardminvalue = shardgroup.shardminvalue + OR ( shard.shardminvalue IS NULL + AND shardgroup.shardminvalue IS NULL) + ) + AND ( + shard.shardmaxvalue = shardgroup.shardmaxvalue + OR ( shard.shardmaxvalue IS NULL + AND shardgroup.shardmaxvalue IS NULL) + ); \ No newline at end of file diff --git a/src/include/distributed/pg_dist_shard.h b/src/include/distributed/pg_dist_shard.h index 5c98b755f..95d2f3e7e 100644 --- a/src/include/distributed/pg_dist_shard.h +++ b/src/include/distributed/pg_dist_shard.h @@ -30,6 +30,7 @@ typedef struct FormData_pg_dist_shard text shardminvalue; /* partition key's minimum value in shard */ text shardmaxvalue; /* partition key's maximum value in shard */ #endif + int64 shardgroupid; } FormData_pg_dist_shard; /* ---------------- @@ -43,13 +44,14 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard; * compiler constants for pg_dist_shards * ---------------- */ -#define Natts_pg_dist_shard 6 +#define Natts_pg_dist_shard 7 #define Anum_pg_dist_shard_logicalrelid 1 #define Anum_pg_dist_shard_shardid 2 #define Anum_pg_dist_shard_shardstorage 3 #define Anum_pg_dist_shard_shardalias_DROPPED 4 #define Anum_pg_dist_shard_shardminvalue 5 #define Anum_pg_dist_shard_shardmaxvalue 6 +#define Anum_pg_dist_shard_shardgroupid 7 /* * Valid values for shard storage types include foreign table, (standard) table