mirror of https://github.com/citusdata/citus.git
add shardgroup catalog and backfill data
parent
da7db53c87
commit
303411b9fa
|
@ -361,7 +361,8 @@ QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray,
|
||||||
};
|
};
|
||||||
bool nullsArray[Natts_pg_dist_shard] = {
|
bool nullsArray[Natts_pg_dist_shard] = {
|
||||||
[Anum_pg_dist_shard_shardminvalue - 1] = minValueNulls[partitionIndex],
|
[Anum_pg_dist_shard_shardminvalue - 1] = minValueNulls[partitionIndex],
|
||||||
[Anum_pg_dist_shard_shardmaxvalue - 1] = maxValueNulls[partitionIndex]
|
[Anum_pg_dist_shard_shardmaxvalue - 1] = maxValueNulls[partitionIndex],
|
||||||
|
[Anum_pg_dist_shard_shardgroupid - 1] = true
|
||||||
};
|
};
|
||||||
|
|
||||||
shardIntervalArray[partitionIndex] =
|
shardIntervalArray[partitionIndex] =
|
||||||
|
|
|
@ -1705,6 +1705,8 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
|
||||||
isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true;
|
isNulls[Anum_pg_dist_shard_shardmaxvalue - 1] = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
isNulls[Anum_pg_dist_shard_shardgroupid - 1] = true;
|
||||||
|
|
||||||
/* open shard relation and insert new tuple */
|
/* open shard relation and insert new tuple */
|
||||||
Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock);
|
Relation pgDistShard = table_open(DistShardRelationId(), RowExclusiveLock);
|
||||||
|
|
||||||
|
|
|
@ -507,7 +507,7 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList)
|
||||||
static void
|
static void
|
||||||
AcquireLogicalReplicationLock(void)
|
AcquireLogicalReplicationLock(void)
|
||||||
{
|
{
|
||||||
LOCKTAG tag;
|
LOCKTAG tag = { 0 };
|
||||||
SET_LOCKTAG_LOGICAL_REPLICATION(tag);
|
SET_LOCKTAG_LOGICAL_REPLICATION(tag);
|
||||||
|
|
||||||
LockAcquire(&tag, ExclusiveLock, false, false);
|
LockAcquire(&tag, ExclusiveLock, false, false);
|
||||||
|
|
|
@ -2,3 +2,48 @@
|
||||||
|
|
||||||
-- bump version to 11.3-1
|
-- bump version to 11.3-1
|
||||||
|
|
||||||
|
CREATE TABLE citus.pg_dist_shardgroup (
|
||||||
|
shardgroupid bigint PRIMARY KEY,
|
||||||
|
colocationid integer NOT NULL,
|
||||||
|
shardminvalue text,
|
||||||
|
shardmaxvalue text
|
||||||
|
);
|
||||||
|
ALTER TABLE citus.pg_dist_shardgroup SET SCHEMA pg_catalog;
|
||||||
|
|
||||||
|
INSERT INTO pg_catalog.pg_dist_shardgroup
|
||||||
|
SELECT min(shardid) as shardgroupid,
|
||||||
|
colocationid,
|
||||||
|
shardminvalue,
|
||||||
|
shardmaxvalue
|
||||||
|
FROM pg_dist_shard
|
||||||
|
JOIN pg_dist_partition USING (logicalrelid)
|
||||||
|
GROUP BY colocationid, shardminvalue, shardmaxvalue;
|
||||||
|
|
||||||
|
ALTER TABLE pg_catalog.pg_dist_shard ADD COLUMN shardgroupid bigint;
|
||||||
|
|
||||||
|
-- backfill shardgroupid field by finding the generated shardgroup above by joining the colocationid, shardminvalue and
|
||||||
|
-- shardmaxvalue (for the shardvalues we want to treat NULL values as equal, hence the complex conditions for those).
|
||||||
|
-- After this operation _all_ shards should have a shardgroupid associated which satisfies the colocation invariant of
|
||||||
|
-- the shards in the same colocationid.
|
||||||
|
UPDATE pg_catalog.pg_dist_shard AS shard
|
||||||
|
SET shardgroupid = shardgroup.shardgroupid
|
||||||
|
FROM (
|
||||||
|
SELECT shardgroupid,
|
||||||
|
colocationid,
|
||||||
|
shardminvalue,
|
||||||
|
shardmaxvalue,
|
||||||
|
logicalrelid
|
||||||
|
FROM pg_catalog.pg_dist_shardgroup
|
||||||
|
JOIN pg_dist_partition USING (colocationid)
|
||||||
|
) AS shardgroup
|
||||||
|
WHERE shard.logicalrelid = shardgroup.logicalrelid
|
||||||
|
AND (
|
||||||
|
shard.shardminvalue = shardgroup.shardminvalue
|
||||||
|
OR ( shard.shardminvalue IS NULL
|
||||||
|
AND shardgroup.shardminvalue IS NULL)
|
||||||
|
)
|
||||||
|
AND (
|
||||||
|
shard.shardmaxvalue = shardgroup.shardmaxvalue
|
||||||
|
OR ( shard.shardmaxvalue IS NULL
|
||||||
|
AND shardgroup.shardmaxvalue IS NULL)
|
||||||
|
);
|
|
@ -30,6 +30,7 @@ typedef struct FormData_pg_dist_shard
|
||||||
text shardminvalue; /* partition key's minimum value in shard */
|
text shardminvalue; /* partition key's minimum value in shard */
|
||||||
text shardmaxvalue; /* partition key's maximum value in shard */
|
text shardmaxvalue; /* partition key's maximum value in shard */
|
||||||
#endif
|
#endif
|
||||||
|
int64 shardgroupid;
|
||||||
} FormData_pg_dist_shard;
|
} FormData_pg_dist_shard;
|
||||||
|
|
||||||
/* ----------------
|
/* ----------------
|
||||||
|
@ -43,13 +44,14 @@ typedef FormData_pg_dist_shard *Form_pg_dist_shard;
|
||||||
* compiler constants for pg_dist_shards
|
* compiler constants for pg_dist_shards
|
||||||
* ----------------
|
* ----------------
|
||||||
*/
|
*/
|
||||||
#define Natts_pg_dist_shard 6
|
#define Natts_pg_dist_shard 7
|
||||||
#define Anum_pg_dist_shard_logicalrelid 1
|
#define Anum_pg_dist_shard_logicalrelid 1
|
||||||
#define Anum_pg_dist_shard_shardid 2
|
#define Anum_pg_dist_shard_shardid 2
|
||||||
#define Anum_pg_dist_shard_shardstorage 3
|
#define Anum_pg_dist_shard_shardstorage 3
|
||||||
#define Anum_pg_dist_shard_shardalias_DROPPED 4
|
#define Anum_pg_dist_shard_shardalias_DROPPED 4
|
||||||
#define Anum_pg_dist_shard_shardminvalue 5
|
#define Anum_pg_dist_shard_shardminvalue 5
|
||||||
#define Anum_pg_dist_shard_shardmaxvalue 6
|
#define Anum_pg_dist_shard_shardmaxvalue 6
|
||||||
|
#define Anum_pg_dist_shard_shardgroupid 7
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Valid values for shard storage types include foreign table, (standard) table
|
* Valid values for shard storage types include foreign table, (standard) table
|
||||||
|
|
Loading…
Reference in New Issue