Merge pull request #1453 from citusdata/shard-placement-uses-groups

Shard placements don't hardcode nodename and nodeport
pull/1491/head
Marco Slot 2017-07-12 14:37:16 +02:00 committed by GitHub
commit d164b4cd10
62 changed files with 986 additions and 748 deletions

3
.gitignore vendored
View File

@ -36,3 +36,6 @@ lib*.pc
/autom4te.cache
/Makefile.global
/src/Makefile.custom
# temporary files vim creates
*.swp

View File

@ -1,3 +1,7 @@
### citus v7.0.0 (unreleased) ###
* Replaces pg_dist_shard_placement metadata table with pg_dist_placement
### citus v6.2.2 (May 31, 2017) ###
* Fixes a common cause of deadlocks when repairing tables with foreign keys

View File

@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
6.2-1 6.2-2 6.2-3 6.2-4 \
7.0-1 7.0-2
7.0-1 7.0-2 7.0-3
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -143,6 +143,8 @@ $(EXTENSION)--7.0-1.sql: $(EXTENSION)--6.2-4.sql $(EXTENSION)--6.2-4--7.0-1.sql
cat $^ > $@
$(EXTENSION)--7.0-2.sql: $(EXTENSION)--7.0-1.sql $(EXTENSION)--7.0-1--7.0-2.sql
cat $^ > $@
$(EXTENSION)--7.0-3.sql: $(EXTENSION)--7.0-2.sql $(EXTENSION)--7.0-2--7.0-3.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,97 @@
/* citus--7.0-2--7.0-3.sql */
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq
RENAME TO pg_dist_placement_placementid_seq;
ALTER TABLE pg_catalog.pg_dist_shard_placement
ALTER COLUMN placementid SET DEFAULT nextval('pg_catalog.pg_dist_placement_placementid_seq');
CREATE TABLE citus.pg_dist_placement (
placementid BIGINT NOT NULL default nextval('pg_dist_placement_placementid_seq'::regclass),
shardid BIGINT NOT NULL,
shardstate INT NOT NULL,
shardlength BIGINT NOT NULL,
groupid INT NOT NULL
);
ALTER TABLE citus.pg_dist_placement SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.pg_dist_placement TO public;
CREATE INDEX pg_dist_placement_groupid_index
ON pg_dist_placement USING btree(groupid);
CREATE INDEX pg_dist_placement_shardid_index
ON pg_dist_placement USING btree(shardid);
CREATE UNIQUE INDEX pg_dist_placement_placementid_index
ON pg_dist_placement USING btree(placementid);
CREATE OR REPLACE FUNCTION citus.find_groupid_for_node(text, int)
RETURNS int AS $$
DECLARE
groupid int := (SELECT groupid FROM pg_dist_node WHERE nodename = $1 AND nodeport = $2);
BEGIN
IF groupid IS NULL THEN
RAISE EXCEPTION 'There is no node at "%:%"', $1, $2;
ELSE
RETURN groupid;
END IF;
END;
$$ LANGUAGE plpgsql;
INSERT INTO pg_catalog.pg_dist_placement
SELECT placementid, shardid, shardstate, shardlength,
citus.find_groupid_for_node(placement.nodename, placement.nodeport::int) AS groupid
FROM pg_dist_shard_placement placement;
DROP TRIGGER dist_placement_cache_invalidate ON pg_catalog.pg_dist_shard_placement;
CREATE TRIGGER dist_placement_cache_invalidate
AFTER INSERT OR UPDATE OR DELETE
ON pg_catalog.pg_dist_placement
FOR EACH ROW EXECUTE PROCEDURE master_dist_placement_cache_invalidate();
-- this should be removed when noderole is added but for now it ensures the below view
-- returns the correct results and that placements unambiguously belong to a view
ALTER TABLE pg_catalog.pg_dist_node ADD CONSTRAINT pg_dist_node_groupid_unique
UNIQUE (groupid);
DROP TABLE pg_dist_shard_placement;
CREATE VIEW citus.pg_dist_shard_placement AS
SELECT shardid, shardstate, shardlength, nodename, nodeport, placementid
-- assumes there's only one node per group
FROM pg_dist_placement placement INNER JOIN pg_dist_node node ON (
placement.groupid = node.groupid
);
ALTER VIEW citus.pg_dist_shard_placement SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.pg_dist_shard_placement TO public;
-- add some triggers which make it look like pg_dist_shard_placement is still a table
ALTER VIEW pg_catalog.pg_dist_shard_placement
ALTER placementid SET DEFAULT nextval('pg_dist_placement_placementid_seq');
CREATE OR REPLACE FUNCTION citus.pg_dist_shard_placement_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'DELETE') THEN
DELETE FROM pg_dist_placement WHERE placementid = OLD.placementid;
RETURN OLD;
ELSIF (TG_OP = 'UPDATE') THEN
UPDATE pg_dist_placement
SET shardid = NEW.shardid, shardstate = NEW.shardstate,
shardlength = NEW.shardlength, placementid = NEW.placementid,
groupid = citus.find_groupid_for_node(NEW.nodename, NEW.nodeport)
WHERE placementid = OLD.placementid;
RETURN NEW;
ELSIF (TG_OP = 'INSERT') THEN
INSERT INTO pg_dist_placement
(placementid, shardid, shardstate, shardlength, groupid)
VALUES (NEW.placementid, NEW.shardid, NEW.shardstate, NEW.shardlength,
citus.find_groupid_for_node(NEW.nodename, NEW.nodeport));
RETURN NEW;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER pg_dist_shard_placement_trigger
INSTEAD OF INSERT OR UPDATE OR DELETE ON pg_dist_shard_placement
FOR EACH ROW EXECUTE PROCEDURE citus.pg_dist_shard_placement_trigger_func();

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.0-2'
default_version = '7.0-3'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -940,7 +940,8 @@ CheckShardPlacements(ConnectionShardHashEntry *shardEntry)
{
uint64 shardId = shardEntry->key.shardId;
uint64 placementId = placementEntry->key.placementId;
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId);
GroupShardPlacement *shardPlacement =
LoadGroupShardPlacement(shardId, placementId);
/*
* We only set shard state if its current state is FILE_FINALIZED, which

View File

@ -620,8 +620,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
if (list_length(relationShardList) > 0)
{
placementAccessList = BuildPlacementSelectList(taskPlacement->nodeName,
taskPlacement->nodePort,
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
relationShardList);
}
else
@ -666,7 +665,7 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task)
* GetPlacementListConnection.
*/
List *
BuildPlacementSelectList(char *nodeName, int nodePort, List *relationShardList)
BuildPlacementSelectList(uint32 groupId, List *relationShardList)
{
ListCell *relationShardCell = NULL;
List *placementAccessList = NIL;
@ -677,12 +676,11 @@ BuildPlacementSelectList(char *nodeName, int nodePort, List *relationShardList)
ShardPlacement *placement = NULL;
ShardPlacementAccess *placementAccess = NULL;
placement = FindShardPlacementOnNode(nodeName, nodePort, relationShard->shardId);
placement = FindShardPlacementOnGroup(groupId, relationShard->shardId);
if (placement == NULL)
{
ereport(ERROR, (errmsg("no active placement of shard %ld found on node "
"%s:%d",
relationShard->shardId, nodeName, nodePort)));
ereport(ERROR, (errmsg("no active placement of shard %ld found on group %d",
relationShard->shardId, groupId)));
}
placementAccess = CreatePlacementAccess(placement, PLACEMENT_ACCESS_SELECT);
@ -886,8 +884,7 @@ GetModifyConnections(Task *task, bool markCritical, bool noNewTransactions)
ShardPlacementAccess *placementModification = NULL;
/* create placement accesses for placements that appear in a subselect */
placementAccessList = BuildPlacementSelectList(taskPlacement->nodeName,
taskPlacement->nodePort,
placementAccessList = BuildPlacementSelectList(taskPlacement->groupId,
relationShardList);
/* create placement access for the placement that we're modifying */

View File

@ -322,7 +322,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
const uint64 shardSize = 0;
InsertShardPlacementRow(newShardId, INVALID_PLACEMENT_ID, shardState,
shardSize, sourceNodeName, sourceNodePort);
shardSize, sourcePlacement->groupId);
}
else
{

View File

@ -421,7 +421,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
ExecuteCriticalRemoteCommand(connection, workerDropQuery->data);
DeleteShardPlacementRow(shardId, workerName, workerPort);
DeleteShardPlacementRow(shardPlacement->placementId);
}
DeleteShardRow(shardId);

View File

@ -35,7 +35,7 @@
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_shard_placement.h"
#include "distributed/pg_dist_placement.h"
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
#include "distributed/remote_commands.h"
@ -59,12 +59,12 @@
static uint64 * AllocateUint64(uint64 value);
static void RecordDistributedRelationDependencies(Oid distributedRelationId,
Node *distributionKey);
static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple);
static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple);
static uint64 DistributedTableSize(Oid relationId, char *sizeQuery);
static uint64 DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
char *sizeQuery);
static List * ShardIntervalsOnWorkerNode(WorkerNode *workerNode, Oid relationId);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static StringInfo GenerateSizeQueryOnMultiplePlacements(Oid distributedRelationId,
List *shardIntervalList,
char *sizeQuery);
@ -197,7 +197,7 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ
int queryResult = 0;
List *sizeList = NIL;
List *shardIntervalsOnNode = ShardIntervalsOnWorkerNode(workerNode, relationId);
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);
tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(relationId,
shardIntervalsOnNode,
@ -222,26 +222,22 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ
/*
* ShardIntervalsOnNode takes a WorkerNode then compares it with each placement
* of table. It returns shard intervals of table on that node as a list of shard
* intervals. Note that, shard intervals returned as elements of the list are
* not the copies but the pointers.
*
* DO NOT modify the shard intervals returned by this function.
* GroupShardPlacementsForTableOnGroup accepts a relationId and a group and returns a list
* of GroupShardPlacement's representing all of the placements for the table which reside
* on the group.
*/
static List *
ShardIntervalsOnWorkerNode(WorkerNode *workerNode, Oid relationId)
List *
GroupShardPlacementsForTableOnGroup(Oid relationId, uint32 groupId)
{
DistTableCacheEntry *distTableCacheEntry = DistributedTableCacheEntry(relationId);
char *workerNodeName = workerNode->workerName;
uint32 workerNodePort = workerNode->workerPort;
List *shardIntervalList = NIL;
List *resultList = NIL;
int shardIndex = 0;
int shardIntervalArrayLength = distTableCacheEntry->shardIntervalArrayLength;
for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
{
ShardPlacement *placementArray =
GroupShardPlacement *placementArray =
distTableCacheEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements =
distTableCacheEntry->arrayOfPlacementArrayLengths[shardIndex];
@ -249,9 +245,45 @@ ShardIntervalsOnWorkerNode(WorkerNode *workerNode, Oid relationId)
for (placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
{
ShardPlacement *placement = &placementArray[placementIndex];
char *shardNodeName = placement->nodeName;
uint32 shardNodePort = placement->nodePort;
GroupShardPlacement *placement = &placementArray[placementIndex];
if (placement->groupId == groupId)
{
resultList = lappend(resultList, placement);
}
}
}
return resultList;
}
/*
* ShardIntervalsOnWorkerGroup accepts a WorkerNode and returns a list of the shard
* intervals of the given table which are placed on the group the node is a part of.
*
* DO NOT modify the shard intervals returned by this function, they are not copies but
* pointers.
*/
static List *
ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)
{
DistTableCacheEntry *distTableCacheEntry = DistributedTableCacheEntry(relationId);
List *shardIntervalList = NIL;
int shardIndex = 0;
int shardIntervalArrayLength = distTableCacheEntry->shardIntervalArrayLength;
for (shardIndex = 0; shardIndex < shardIntervalArrayLength; shardIndex++)
{
GroupShardPlacement *placementArray =
distTableCacheEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements =
distTableCacheEntry->arrayOfPlacementArrayLengths[shardIndex];
int placementIndex = 0;
for (placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
{
GroupShardPlacement *placement = &placementArray[placementIndex];
uint64 shardId = placement->shardId;
bool metadataLock = false;
@ -266,8 +298,7 @@ ShardIntervalsOnWorkerNode(WorkerNode *workerNode, Oid relationId)
continue;
}
if (strcmp(shardNodeName, workerNodeName) == 0 &&
shardNodePort == workerNodePort)
if (placement->groupId == workerNode->groupId)
{
ShardInterval *shardInterval =
distTableCacheEntry->sortedShardIntervalArray[shardIndex];
@ -575,12 +606,13 @@ ShardLength(uint64 shardId)
/*
* NodeHasActiveShardPlacements returns whether any active shards are placed on this node
* NodeHasShardPlacements returns whether any active shards are placed on the group
* this node is a part of.
*/
bool
NodeHasActiveShardPlacements(char *nodeName, int32 nodePort)
NodeHasShardPlacements(char *nodeName, int32 nodePort, bool onlyConsiderActivePlacements)
{
const int scanKeyCount = 3;
const int scanKeyCount = (onlyConsiderActivePlacements ? 2 : 1);
const bool indexOK = false;
bool hasFinalizedPlacements = false;
@ -589,25 +621,28 @@ NodeHasActiveShardPlacements(char *nodeName, int32 nodePort)
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[scanKeyCount];
Relation pgShardPlacement = heap_open(DistShardPlacementRelationId(),
AccessShareLock);
uint32 groupId = GroupForNode(nodeName, nodePort);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_nodename,
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
ScanKeyInit(&scanKey[1], Anum_pg_dist_shard_placement_nodeport,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(nodePort));
ScanKeyInit(&scanKey[2], Anum_pg_dist_shard_placement_shardstate,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(FILE_FINALIZED));
Relation pgPlacement = heap_open(DistPlacementRelationId(),
AccessShareLock);
scanDescriptor = systable_beginscan(pgShardPlacement,
DistShardPlacementNodeidIndexId(), indexOK,
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_groupid,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(groupId));
if (onlyConsiderActivePlacements)
{
ScanKeyInit(&scanKey[1], Anum_pg_dist_placement_shardstate,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(FILE_FINALIZED));
}
scanDescriptor = systable_beginscan(pgPlacement,
DistPlacementGroupidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
hasFinalizedPlacements = HeapTupleIsValid(heapTuple);
systable_endscan(scanDescriptor);
heap_close(pgShardPlacement, AccessShareLock);
heap_close(pgPlacement, NoLock);
return hasFinalizedPlacements;
}
@ -681,76 +716,75 @@ BuildShardPlacementList(ShardInterval *shardInterval)
{
int64 shardId = shardInterval->shardId;
List *shardPlacementList = NIL;
Relation pgShardPlacement = NULL;
Relation pgPlacement = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
HeapTuple heapTuple = NULL;
pgShardPlacement = heap_open(DistShardPlacementRelationId(), AccessShareLock);
pgPlacement = heap_open(DistPlacementRelationId(), AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_shardid,
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_shardid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
scanDescriptor = systable_beginscan(pgShardPlacement,
DistShardPlacementShardidIndexId(), indexOK,
scanDescriptor = systable_beginscan(pgPlacement,
DistPlacementShardidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgShardPlacement);
TupleDesc tupleDescriptor = RelationGetDescr(pgPlacement);
GroupShardPlacement *placement =
TupleToGroupShardPlacement(tupleDescriptor, heapTuple);
ShardPlacement *placement = TupleToShardPlacement(tupleDescriptor, heapTuple);
shardPlacementList = lappend(shardPlacementList, placement);
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
heap_close(pgShardPlacement, AccessShareLock);
heap_close(pgPlacement, NoLock);
return shardPlacementList;
}
/*
* TupleToShardPlacement takes in a heap tuple from pg_dist_shard_placement,
* TupleToGroupShardPlacement takes in a heap tuple from pg_dist_placement,
* and converts this tuple to in-memory struct. The function assumes the
* caller already has locks on the tuple, and doesn't perform any locking.
*/
static ShardPlacement *
TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
static GroupShardPlacement *
TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
{
ShardPlacement *shardPlacement = NULL;
GroupShardPlacement *shardPlacement = NULL;
bool isNull = false;
Datum placementId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_placementid,
Datum placementId = heap_getattr(heapTuple, Anum_pg_dist_placement_placementid,
tupleDescriptor, &isNull);
Datum shardId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardid,
Datum shardId = heap_getattr(heapTuple, Anum_pg_dist_placement_shardid,
tupleDescriptor, &isNull);
Datum shardLength = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardlength,
Datum shardLength = heap_getattr(heapTuple, Anum_pg_dist_placement_shardlength,
tupleDescriptor, &isNull);
Datum shardState = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardstate,
Datum shardState = heap_getattr(heapTuple, Anum_pg_dist_placement_shardstate,
tupleDescriptor, &isNull);
Datum nodeName = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_nodename,
tupleDescriptor, &isNull);
Datum nodePort = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_nodeport,
tupleDescriptor, &isNull);
if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_shard_placement ||
Datum groupId = heap_getattr(heapTuple, Anum_pg_dist_placement_groupid,
tupleDescriptor, &isNull);
if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_placement ||
HeapTupleHasNulls(heapTuple))
{
ereport(ERROR, (errmsg("unexpected null in pg_dist_shard_placement_tuple")));
ereport(ERROR, (errmsg("unexpected null in pg_dist_placement tuple")));
}
shardPlacement = CitusMakeNode(ShardPlacement);
shardPlacement = CitusMakeNode(GroupShardPlacement);
shardPlacement->placementId = DatumGetInt64(placementId);
shardPlacement->shardId = DatumGetInt64(shardId);
shardPlacement->shardLength = DatumGetInt64(shardLength);
shardPlacement->shardState = DatumGetUInt32(shardState);
shardPlacement->nodeName = TextDatumGetCString(nodeName);
shardPlacement->nodePort = DatumGetInt64(nodePort);
shardPlacement->groupId = DatumGetUInt32(groupId);
return shardPlacement;
}
@ -806,7 +840,7 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
CitusInvalidateRelcacheByRelid(relationId);
CommandCounterIncrement();
heap_close(pgDistShard, RowExclusiveLock);
heap_close(pgDistShard, NoLock);
}
@ -818,13 +852,13 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
void
InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength,
char *nodeName, uint32 nodePort)
uint32 groupId)
{
Relation pgDistShardPlacement = NULL;
Relation pgDistPlacement = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_shard_placement];
bool isNulls[Natts_pg_dist_shard_placement];
Datum values[Natts_pg_dist_placement];
bool isNulls[Natts_pg_dist_placement];
/* form new shard placement tuple */
memset(values, 0, sizeof(values));
@ -834,25 +868,24 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
{
placementId = master_get_new_placementid(NULL);
}
values[Anum_pg_dist_shard_placement_shardid - 1] = Int64GetDatum(shardId);
values[Anum_pg_dist_shard_placement_shardstate - 1] = CharGetDatum(shardState);
values[Anum_pg_dist_shard_placement_shardlength - 1] = Int64GetDatum(shardLength);
values[Anum_pg_dist_shard_placement_nodename - 1] = CStringGetTextDatum(nodeName);
values[Anum_pg_dist_shard_placement_nodeport - 1] = Int64GetDatum(nodePort);
values[Anum_pg_dist_shard_placement_placementid - 1] = Int64GetDatum(placementId);
values[Anum_pg_dist_placement_placementid - 1] = Int64GetDatum(placementId);
values[Anum_pg_dist_placement_shardid - 1] = Int64GetDatum(shardId);
values[Anum_pg_dist_placement_shardstate - 1] = CharGetDatum(shardState);
values[Anum_pg_dist_placement_shardlength - 1] = Int64GetDatum(shardLength);
values[Anum_pg_dist_placement_groupid - 1] = Int64GetDatum(groupId);
/* open shard placement relation and insert new tuple */
pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock);
pgDistPlacement = heap_open(DistPlacementRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistShardPlacement);
tupleDescriptor = RelationGetDescr(pgDistPlacement);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CatalogTupleInsert(pgDistShardPlacement, heapTuple);
CatalogTupleInsert(pgDistPlacement, heapTuple);
CitusInvalidateRelcacheByShardId(shardId);
CommandCounterIncrement();
heap_close(pgDistShardPlacement, RowExclusiveLock);
heap_close(pgDistPlacement, NoLock);
}
@ -993,7 +1026,7 @@ DeletePartitionRow(Oid distributedRelationId)
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
heap_close(pgDistPartition, RowExclusiveLock);
heap_close(pgDistPartition, NoLock);
}
@ -1040,78 +1073,59 @@ DeleteShardRow(uint64 shardId)
CitusInvalidateRelcacheByRelid(distributedRelationId);
CommandCounterIncrement();
heap_close(pgDistShard, RowExclusiveLock);
heap_close(pgDistShard, NoLock);
}
/*
* DeleteShardPlacementRow opens the shard placement system catalog, finds the
* first (unique) row that corresponds to the given shardId and worker node, and
* deletes this row.
* DeleteShardPlacementRow opens the shard placement system catalog, finds the placement
* with the given placementId, and deletes it.
*/
uint64
DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
void
DeleteShardPlacementRow(uint64 placementId)
{
Relation pgDistShardPlacement = NULL;
Relation pgDistPlacement = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
const int scanKeyCount = 1;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = true;
HeapTuple heapTuple = NULL;
bool heapTupleFound = false;
TupleDesc tupleDescriptor = NULL;
int64 placementId = INVALID_PLACEMENT_ID;
bool isNull = false;
uint64 shardId = 0;
pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistShardPlacement);
pgDistPlacement = heap_open(DistPlacementRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistPlacement);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_shardid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_placementid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId));
scanDescriptor = systable_beginscan(pgDistShardPlacement,
DistShardPlacementShardidIndexId(), indexOK,
scanDescriptor = systable_beginscan(pgDistPlacement,
DistPlacementPlacementidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
ShardPlacement *placement = TupleToShardPlacement(tupleDescriptor, heapTuple);
if (strncmp(placement->nodeName, workerName, WORKER_LENGTH) == 0 &&
placement->nodePort == workerPort)
{
heapTupleFound = true;
break;
}
heapTuple = systable_getnext(scanDescriptor);
}
/* if we couldn't find the shard placement to delete, error out */
if (!heapTupleFound)
if (heapTuple == NULL)
{
ereport(ERROR, (errmsg("could not find valid entry for shard placement "
UINT64_FORMAT " on node \"%s:%u\"",
shardId, workerName, workerPort)));
INT64_FORMAT, placementId)));
}
placementId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_placementid,
tupleDescriptor, &isNull);
if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_shard_placement ||
shardId = heap_getattr(heapTuple, Anum_pg_dist_placement_shardid,
tupleDescriptor, &isNull);
if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_placement ||
HeapTupleHasNulls(heapTuple))
{
ereport(ERROR, (errmsg("unexpected null in pg_dist_shard_placement_tuple")));
ereport(ERROR, (errmsg("unexpected null in pg_dist_placement tuple")));
}
simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self);
simple_heap_delete(pgDistPlacement, &heapTuple->t_self);
systable_endscan(scanDescriptor);
CitusInvalidateRelcacheByShardId(shardId);
CommandCounterIncrement();
heap_close(pgDistShardPlacement, RowExclusiveLock);
return placementId;
heap_close(pgDistPlacement, NoLock);
}
@ -1122,26 +1136,26 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
void
UpdateShardPlacementState(uint64 placementId, char shardState)
{
Relation pgDistShardPlacement = NULL;
Relation pgDistPlacement = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL;
Datum values[Natts_pg_dist_shard_placement];
bool isnull[Natts_pg_dist_shard_placement];
bool replace[Natts_pg_dist_shard_placement];
Datum values[Natts_pg_dist_placement];
bool isnull[Natts_pg_dist_placement];
bool replace[Natts_pg_dist_placement];
uint64 shardId = INVALID_SHARD_ID;
bool colIsNull = false;
pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistShardPlacement);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_placementid,
pgDistPlacement = heap_open(DistPlacementRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistPlacement);
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_placementid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId));
scanDescriptor = systable_beginscan(pgDistShardPlacement,
DistShardPlacementPlacementidIndexId(), indexOK,
scanDescriptor = systable_beginscan(pgDistPlacement,
DistPlacementPlacementidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
@ -1154,16 +1168,16 @@ UpdateShardPlacementState(uint64 placementId, char shardState)
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_shard_placement_shardstate - 1] = CharGetDatum(shardState);
isnull[Anum_pg_dist_shard_placement_shardstate - 1] = false;
replace[Anum_pg_dist_shard_placement_shardstate - 1] = true;
values[Anum_pg_dist_placement_shardstate - 1] = CharGetDatum(shardState);
isnull[Anum_pg_dist_placement_shardstate - 1] = false;
replace[Anum_pg_dist_placement_shardstate - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistShardPlacement, &heapTuple->t_self, heapTuple);
CatalogTupleUpdate(pgDistPlacement, &heapTuple->t_self, heapTuple);
shardId = DatumGetInt64(heap_getattr(heapTuple,
Anum_pg_dist_shard_placement_shardid,
Anum_pg_dist_placement_shardid,
tupleDescriptor, &colIsNull));
Assert(!colIsNull);
CitusInvalidateRelcacheByShardId(shardId);
@ -1171,7 +1185,7 @@ UpdateShardPlacementState(uint64 placementId, char shardState)
CommandCounterIncrement();
systable_endscan(scanDescriptor);
heap_close(pgDistShardPlacement, NoLock);
heap_close(pgDistPlacement, NoLock);
}

View File

@ -406,7 +406,7 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
const uint64 shardSize = 0;
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize,
nodeName, nodePort);
workerNode->groupId);
placementsCreated++;
}
else
@ -601,12 +601,11 @@ UpdateShardStatistics(int64 shardId)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
uint32 groupId = placement->groupId;
DeleteShardPlacementRow(shardId, workerName, workerPort);
DeleteShardPlacementRow(placementId);
InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, shardSize,
workerName, workerPort);
groupId);
}
/* only update shard min/max values for append-partitioned tables */

View File

@ -209,7 +209,7 @@ ShouldSyncTableMetadata(Oid relationId)
* (ii) Queries that create the clustered tables
* (iii) Queries that populate pg_dist_partition table referenced by (ii)
* (iv) Queries that populate pg_dist_shard table referenced by (iii)
* (v) Queries that populate pg_dist_shard_placement table referenced by (iv)
* (v) Queries that populate pg_dist_placement table referenced by (iv)
*/
List *
MetadataCreateCommands(void)
@ -346,7 +346,7 @@ GetDistributedTableDDLEvents(Oid relationId)
truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
/* commands to insert pg_dist_shard & pg_dist_shard_placement entries */
/* commands to insert pg_dist_shard & pg_dist_placement entries */
shardIntervalList = LoadShardIntervalList(relationId);
shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList);
commandList = list_concat(commandList, shardMetadataInsertCommandList);
@ -370,23 +370,18 @@ GetDistributedTableDDLEvents(Oid relationId)
* from the worker itself to prevent dropping any non-distributed tables
* with the same name.
* (iii) Queries that delete all the rows from pg_dist_shard table referenced by (ii)
* (iv) Queries that delete all the rows from pg_dist_shard_placement table
* (iv) Queries that delete all the rows from pg_dist_placement table
* referenced by (iii)
*/
List *
MetadataDropCommands(void)
{
List *dropSnapshotCommandList = NIL;
char *removeTablesCommand = NULL;
char *removeNodesCommand = NULL;
removeNodesCommand = DELETE_ALL_NODES;
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
removeNodesCommand);
REMOVE_ALL_CLUSTERED_TABLES_COMMAND);
removeTablesCommand = REMOVE_ALL_CLUSTERED_TABLES_COMMAND;
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
removeTablesCommand);
dropSnapshotCommandList = lappend(dropSnapshotCommandList, DELETE_ALL_NODES);
return dropSnapshotCommandList;
}
@ -566,9 +561,9 @@ ShardListInsertCommand(List *shardIntervalList)
{
/* generate the shard placement query without any values yet */
appendStringInfo(insertPlacementCommand,
"INSERT INTO pg_dist_shard_placement "
"INSERT INTO pg_dist_placement "
"(shardid, shardstate, shardlength,"
" nodename, nodeport, placementid) "
" groupid, placementid) "
"VALUES ");
}
else
@ -577,11 +572,10 @@ ShardListInsertCommand(List *shardIntervalList)
}
appendStringInfo(insertPlacementCommand,
"(%lu, 1, %lu, %s, %d, %lu)",
"(%lu, 1, %lu, %d, %lu)",
shardId,
placement->shardLength,
quote_literal_cstr(placement->nodeName),
placement->nodePort,
placement->groupId,
placement->placementId);
}
}
@ -664,7 +658,7 @@ ShardDeleteCommandList(ShardInterval *shardInterval)
/* create command to delete shard placements */
deletePlacementCommand = makeStringInfo();
appendStringInfo(deletePlacementCommand,
"DELETE FROM pg_dist_shard_placement WHERE shardid = %lu",
"DELETE FROM pg_dist_placement WHERE shardid = %lu",
shardId);
commandList = lappend(commandList, deletePlacementCommand->data);
@ -735,18 +729,18 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
/*
* PlacementUpsertCommand creates a SQL command for upserting a pg_dist_shard_placment
* PlacementUpsertCommand creates a SQL command for upserting a pg_dist_placment
* entry with the given properties. In the case of a conflict on placementId, the command
* updates all properties (excluding the placementId) with the given ones.
*/
char *
PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, char *nodeName, uint32 nodePort)
uint64 shardLength, uint32 groupId)
{
StringInfo command = makeStringInfo();
appendStringInfo(command, UPSERT_PLACEMENT, shardId, shardState, shardLength,
quote_literal_cstr(nodeName), nodePort, placementId);
groupId, placementId);
return command->data;
}

View File

@ -5100,7 +5100,7 @@ ActiveShardPlacementLists(List *taskList)
/*
* CompareShardPlacements compares two shard placements by their tuple oid; this
* oid reflects the tuple's insertion order into pg_dist_shard_placement.
* oid reflects the tuple's insertion order into pg_dist_placement.
*/
int
CompareShardPlacements(const void *leftElement, const void *rightElement)

View File

@ -1543,6 +1543,7 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC
(ShardPlacement *) CitusMakeNode(ShardPlacement);
dummyPlacement->nodeName = workerNode->workerName;
dummyPlacement->nodePort = workerNode->workerPort;
dummyPlacement->groupId = workerNode->groupId;
workerList = lappend(workerList, dummyPlacement);
}

View File

@ -48,9 +48,6 @@ PG_FUNCTION_INFO_V1(partition_column_id);
PG_FUNCTION_INFO_V1(partition_type);
PG_FUNCTION_INFO_V1(is_distributed_table);
PG_FUNCTION_INFO_V1(create_monolithic_shard_row);
PG_FUNCTION_INFO_V1(create_healthy_local_shard_placement_row);
PG_FUNCTION_INFO_V1(delete_shard_placement_row);
PG_FUNCTION_INFO_V1(update_shard_placement_row_state);
PG_FUNCTION_INFO_V1(acquire_shared_shard_lock);
@ -241,66 +238,6 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS)
}
/*
* create_healthy_local_shard_placement_row inserts a row representing a
* finalized placement for localhost (on the default port) into the backing
* store.
*/
Datum
create_healthy_local_shard_placement_row(PG_FUNCTION_ARGS)
{
int64 shardId = PG_GETARG_INT64(0);
int64 shardLength = 0;
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, FILE_FINALIZED, shardLength,
"localhost", 5432);
PG_RETURN_VOID();
}
/*
* delete_shard_placement_row removes a shard placement with the specified ID.
*/
Datum
delete_shard_placement_row(PG_FUNCTION_ARGS)
{
int64 shardId = PG_GETARG_INT64(0);
text *hostName = PG_GETARG_TEXT_P(1);
int64 hostPort = PG_GETARG_INT64(2);
bool successful = true;
char *hostNameString = text_to_cstring(hostName);
DeleteShardPlacementRow(shardId, hostNameString, hostPort);
PG_RETURN_BOOL(successful);
}
/*
* update_shard_placement_row_state sets the state of the placement with the
* specified ID.
*/
Datum
update_shard_placement_row_state(PG_FUNCTION_ARGS)
{
int64 shardId = PG_GETARG_INT64(0);
text *hostName = PG_GETARG_TEXT_P(1);
int64 hostPort = PG_GETARG_INT64(2);
RelayFileState shardState = (RelayFileState) PG_GETARG_INT32(3);
bool successful = true;
char *hostNameString = text_to_cstring(hostName);
uint64 shardLength = 0;
uint64 placementId = INVALID_PLACEMENT_ID;
placementId = DeleteShardPlacementRow(shardId, hostNameString, hostPort);
InsertShardPlacementRow(shardId, placementId, shardState, shardLength,
hostNameString, hostPort);
PG_RETURN_BOOL(successful);
}
/*
* acquire_shared_shard_lock grabs a shared lock for the specified shard.
*/

View File

@ -108,8 +108,7 @@ OpenTransactionsForAllTasks(List *taskList, int connectionFlags)
placementAccessList = lappend(placementAccessList, &placementModification);
/* add additional placement accesses for subselects (e.g. INSERT .. SELECT) */
placementSelectList = BuildPlacementSelectList(shardPlacement->nodeName,
shardPlacement->nodePort,
placementSelectList = BuildPlacementSelectList(shardPlacement->groupId,
task->relationShardList);
placementAccessList = list_concat(placementAccessList, placementSelectList);

View File

@ -35,7 +35,8 @@ static const char *CitusNodeTagNamesD[] = {
"ShardInterval",
"ShardPlacement",
"RelationShard",
"DeferredErrorMessage"
"DeferredErrorMessage",
"GroupShardPlacement"
};
const char **CitusNodeTagNames = CitusNodeTagNamesD;
@ -388,6 +389,7 @@ const ExtensibleNodeMethods nodeMethods[] =
DEFINE_NODE_METHODS(RelationShard),
DEFINE_NODE_METHODS(Task),
DEFINE_NODE_METHODS(DeferredErrorMessage),
DEFINE_NODE_METHODS(GroupShardPlacement),
/* nodes with only output support */
DEFINE_NODE_METHODS_NO_READ(MultiNode),

View File

@ -357,6 +357,7 @@ OutShardPlacement(OUTFUNC_ARGS)
WRITE_UINT64_FIELD(shardId);
WRITE_UINT64_FIELD(shardLength);
WRITE_ENUM_FIELD(shardState, RelayFileState);
WRITE_UINT_FIELD(groupId);
WRITE_STRING_FIELD(nodeName);
WRITE_UINT_FIELD(nodePort);
/* so we can deal with 0 */
@ -366,6 +367,20 @@ OutShardPlacement(OUTFUNC_ARGS)
}
void
OutGroupShardPlacement(OUTFUNC_ARGS)
{
WRITE_LOCALS(GroupShardPlacement);
WRITE_NODE_TYPE("GROUPSHARDPLACEMENT");
WRITE_UINT64_FIELD(placementId);
WRITE_UINT64_FIELD(shardId);
WRITE_UINT64_FIELD(shardLength);
WRITE_ENUM_FIELD(shardState, RelayFileState);
WRITE_UINT_FIELD(groupId);
}
void
OutRelationShard(OUTFUNC_ARGS)
{

View File

@ -265,6 +265,7 @@ ReadShardPlacement(READFUNC_ARGS)
READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState);
READ_UINT_FIELD(groupId);
READ_STRING_FIELD(nodeName);
READ_UINT_FIELD(nodePort);
/* so we can deal with 0 */
@ -276,6 +277,21 @@ ReadShardPlacement(READFUNC_ARGS)
}
READFUNC_RET
ReadGroupShardPlacement(READFUNC_ARGS)
{
READ_LOCALS(GroupShardPlacement);
READ_UINT64_FIELD(placementId);
READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState);
READ_UINT_FIELD(groupId);
READ_DONE();
}
READFUNC_RET
ReadRelationShard(READFUNC_ARGS)
{

View File

@ -34,7 +34,7 @@
#include "distributed/pg_dist_node.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_shard_placement.h"
#include "distributed/pg_dist_placement.h"
#include "distributed/shared_library_init.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h"
@ -91,7 +91,7 @@ typedef struct MetadataCacheData
{
bool extensionLoaded;
Oid distShardRelationId;
Oid distShardPlacementRelationId;
Oid distPlacementRelationId;
Oid distNodeRelationId;
Oid distLocalGroupRelationId;
Oid distColocationRelationId;
@ -102,9 +102,9 @@ typedef struct MetadataCacheData
Oid distPartitionColocationidIndexId;
Oid distShardLogicalRelidIndexId;
Oid distShardShardidIndexId;
Oid distShardPlacementShardidIndexId;
Oid distShardPlacementPlacementidIndexId;
Oid distShardPlacementNodeidIndexId;
Oid distPlacementShardidIndexId;
Oid distPlacementPlacementidIndexId;
Oid distPlacementGroupidIndexId;
Oid distTransactionRelationId;
Oid distTransactionGroupIndexId;
Oid extraDataContainerFuncId;
@ -115,7 +115,6 @@ typedef struct MetadataCacheData
static MetadataCacheData MetadataCache;
/* Citus extension version variables */
bool EnableVersionChecks = true; /* version checks are enabled */
@ -179,6 +178,8 @@ static ShardInterval * TupleToShardInterval(HeapTuple heapTuple,
TupleDesc tupleDescriptor, Oid intervalTypeId,
int32 intervalTypeMod);
static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
static ShardPlacement * ResolveGroupShardPlacement(
GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry);
/* exports for SQL callable functions */
@ -315,18 +316,18 @@ LoadShardInterval(uint64 shardId)
/*
* LoadShardPlacement returns the, cached, metadata about a shard placement.
* LoadGroupShardPlacement returns the cached shard placement metadata
*
* The return value is a copy of the cached ShardPlacement struct and may
* The return value is a copy of the cached GroupShardPlacement struct and may
* therefore be modified and/or freed.
*/
ShardPlacement *
LoadShardPlacement(uint64 shardId, uint64 placementId)
GroupShardPlacement *
LoadGroupShardPlacement(uint64 shardId, uint64 placementId)
{
ShardCacheEntry *shardEntry = NULL;
DistTableCacheEntry *tableEntry = NULL;
ShardPlacement *placementArray = NULL;
GroupShardPlacement *placementArray = NULL;
int numberOfPlacements = 0;
int i = 0;
@ -344,8 +345,9 @@ LoadShardPlacement(uint64 shardId, uint64 placementId)
{
if (placementArray[i].placementId == placementId)
{
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
CopyShardPlacement(&placementArray[i], shardPlacement);
GroupShardPlacement *shardPlacement = CitusMakeNode(GroupShardPlacement);
memcpy(shardPlacement, &placementArray[i], sizeof(GroupShardPlacement));
return shardPlacement;
}
@ -357,16 +359,16 @@ LoadShardPlacement(uint64 shardId, uint64 placementId)
/*
* FindShardPlacementOnNode returns the shard placement for the given shard
* on the given node, or returns NULL of no placement for the shard exists
* on the node.
* FindShardPlacementOnGroup returns the shard placement for the given shard
* on the given group, or returns NULL of no placement for the shard exists
* on the group.
*/
ShardPlacement *
FindShardPlacementOnNode(char *nodeName, int nodePort, uint64 shardId)
FindShardPlacementOnGroup(uint32 groupId, uint64 shardId)
{
ShardCacheEntry *shardEntry = NULL;
DistTableCacheEntry *tableEntry = NULL;
ShardPlacement *placementArray = NULL;
GroupShardPlacement *placementArray = NULL;
int numberOfPlacements = 0;
ShardPlacement *placementOnNode = NULL;
int placementIndex = 0;
@ -378,13 +380,11 @@ FindShardPlacementOnNode(char *nodeName, int nodePort, uint64 shardId)
for (placementIndex = 0; placementIndex < numberOfPlacements; placementIndex++)
{
ShardPlacement *placement = &placementArray[placementIndex];
GroupShardPlacement *placement = &placementArray[placementIndex];
if (strncmp(nodeName, placement->nodeName, WORKER_LENGTH) == 0 &&
nodePort == placement->nodePort)
if (placement->groupId == groupId)
{
placementOnNode = CitusMakeNode(ShardPlacement);
CopyShardPlacement(placement, placementOnNode);
placementOnNode = ResolveGroupShardPlacement(placement, shardEntry);
break;
}
}
@ -393,6 +393,62 @@ FindShardPlacementOnNode(char *nodeName, int nodePort, uint64 shardId)
}
/*
* ResolveGroupShardPlacement takes a GroupShardPlacement and adds additional data to it,
* such as the node we should consider it to be on.
*/
static ShardPlacement *
ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
ShardCacheEntry *shardEntry)
{
DistTableCacheEntry *tableEntry = shardEntry->tableEntry;
int shardIndex = shardEntry->shardIndex;
ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex];
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
uint32 groupId = groupShardPlacement->groupId;
WorkerNode *workerNode = NodeForGroup(groupId);
if (workerNode == NULL)
{
ereport(ERROR, (errmsg("the metadata is inconsistent"),
errdetail("there is a placement in group %u but "
"there are no nodes in that group", groupId)));
}
/* copy everything into shardPlacement but preserve the header */
memcpy((((CitusNode *) shardPlacement) + 1),
(((CitusNode *) groupShardPlacement) + 1),
sizeof(GroupShardPlacement) - sizeof(CitusNode));
shardPlacement->nodeName = pstrdup(workerNode->workerName);
shardPlacement->nodePort = workerNode->workerPort;
/* fill in remaining fields */
Assert(tableEntry->partitionMethod != 0);
shardPlacement->partitionMethod = tableEntry->partitionMethod;
shardPlacement->colocationGroupId = tableEntry->colocationId;
if (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH)
{
Assert(shardInterval->minValueExists);
Assert(shardInterval->valueTypeId == INT4OID);
/*
* Use the lower boundary of the interval's range to identify
* it for colocation purposes. That remains meaningful even if
* a concurrent session splits a shard.
*/
shardPlacement->representativeValue = DatumGetInt32(shardInterval->minValue);
}
else
{
shardPlacement->representativeValue = 0;
}
return shardPlacement;
}
/*
* ShardPlacementList returns the list of placements for the given shard from
* the cache.
@ -405,7 +461,7 @@ ShardPlacementList(uint64 shardId)
{
ShardCacheEntry *shardEntry = NULL;
DistTableCacheEntry *tableEntry = NULL;
ShardPlacement *placementArray = NULL;
GroupShardPlacement *placementArray = NULL;
int numberOfPlacements = 0;
List *placementList = NIL;
int i = 0;
@ -421,11 +477,11 @@ ShardPlacementList(uint64 shardId)
for (i = 0; i < numberOfPlacements; i++)
{
/* copy placement into target context */
ShardPlacement *placement = CitusMakeNode(ShardPlacement);
CopyShardPlacement(&placementArray[i], placement);
GroupShardPlacement *groupShardPlacement = &placementArray[i];
ShardPlacement *shardPlacement = ResolveGroupShardPlacement(groupShardPlacement,
shardEntry);
placementList = lappend(placementList, placement);
placementList = lappend(placementList, shardPlacement);
}
/* if no shard placements are found, warn the user */
@ -779,7 +835,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
cacheEntry->arrayOfPlacementArrays =
MemoryContextAllocZero(CacheMemoryContext,
shardIntervalArrayLength *
sizeof(ShardPlacement *));
sizeof(GroupShardPlacement *));
cacheEntry->arrayOfPlacementArrayLengths =
MemoryContextAllocZero(CacheMemoryContext,
shardIntervalArrayLength *
@ -913,7 +969,7 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
List *placementList = NIL;
MemoryContext oldContext = NULL;
ListCell *placementCell = NULL;
ShardPlacement *placementArray = NULL;
GroupShardPlacement *placementArray = NULL;
int placementOffset = 0;
int numberOfPlacements = 0;
@ -936,35 +992,14 @@ BuildCachedShardList(DistTableCacheEntry *cacheEntry)
/* and copy that list into the cache entry */
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
placementArray = palloc0(numberOfPlacements * sizeof(ShardPlacement));
placementArray = palloc0(numberOfPlacements * sizeof(GroupShardPlacement));
foreach(placementCell, placementList)
{
ShardPlacement *srcPlacement = (ShardPlacement *) lfirst(placementCell);
ShardPlacement *dstPlacement = &placementArray[placementOffset];
GroupShardPlacement *srcPlacement =
(GroupShardPlacement *) lfirst(placementCell);
GroupShardPlacement *dstPlacement = &placementArray[placementOffset];
CopyShardPlacement(srcPlacement, dstPlacement);
/* fill in remaining fields */
Assert(cacheEntry->partitionMethod != 0);
dstPlacement->partitionMethod = cacheEntry->partitionMethod;
dstPlacement->colocationGroupId = cacheEntry->colocationId;
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH)
{
Assert(shardInterval->minValueExists);
Assert(shardInterval->valueTypeId == INT4OID);
/*
* Use the lower boundary of the interval's range to identify
* it for colocation purposes. That remains meaningful even if
* a concurrent session splits a shard.
*/
dstPlacement->representativeValue =
DatumGetInt32(shardInterval->minValue);
}
else
{
dstPlacement->representativeValue = 0;
}
memcpy(dstPlacement, srcPlacement, sizeof(GroupShardPlacement));
placementOffset++;
}
MemoryContextSwitchTo(oldContext);
@ -1486,14 +1521,14 @@ DistShardRelationId(void)
}
/* return oid of pg_dist_shard_placement relation */
/* return oid of pg_dist_placement relation */
Oid
DistShardPlacementRelationId(void)
DistPlacementRelationId(void)
{
CachedRelationLookup("pg_dist_shard_placement",
&MetadataCache.distShardPlacementRelationId);
CachedRelationLookup("pg_dist_placement",
&MetadataCache.distPlacementRelationId);
return MetadataCache.distShardPlacementRelationId;
return MetadataCache.distPlacementRelationId;
}
@ -1607,25 +1642,25 @@ DistShardShardidIndexId(void)
}
/* return oid of pg_dist_shard_placement_shardid_index */
/* return oid of pg_dist_placement_shardid_index */
Oid
DistShardPlacementShardidIndexId(void)
DistPlacementShardidIndexId(void)
{
CachedRelationLookup("pg_dist_shard_placement_shardid_index",
&MetadataCache.distShardPlacementShardidIndexId);
CachedRelationLookup("pg_dist_placement_shardid_index",
&MetadataCache.distPlacementShardidIndexId);
return MetadataCache.distShardPlacementShardidIndexId;
return MetadataCache.distPlacementShardidIndexId;
}
/* return oid of pg_dist_shard_placement_shardid_index */
/* return oid of pg_dist_placement_placementid_index */
Oid
DistShardPlacementPlacementidIndexId(void)
DistPlacementPlacementidIndexId(void)
{
CachedRelationLookup("pg_dist_shard_placement_placementid_index",
&MetadataCache.distShardPlacementPlacementidIndexId);
CachedRelationLookup("pg_dist_placement_placementid_index",
&MetadataCache.distPlacementPlacementidIndexId);
return MetadataCache.distShardPlacementPlacementidIndexId;
return MetadataCache.distPlacementPlacementidIndexId;
}
@ -1651,14 +1686,14 @@ DistTransactionGroupIndexId(void)
}
/* return oid of pg_dist_shard_placement_nodeid_index */
/* return oid of pg_dist_placement_groupid_index */
Oid
DistShardPlacementNodeidIndexId(void)
DistPlacementGroupidIndexId(void)
{
CachedRelationLookup("pg_dist_shard_placement_nodeid_index",
&MetadataCache.distShardPlacementNodeidIndexId);
CachedRelationLookup("pg_dist_placement_groupid_index",
&MetadataCache.distPlacementGroupidIndexId);
return MetadataCache.distShardPlacementNodeidIndexId;
return MetadataCache.distPlacementGroupidIndexId;
}
@ -1914,8 +1949,8 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS)
/*
* master_dist_placmeent_cache_invalidate is a trigger function that performs
* relcache invalidations when the contents of pg_dist_shard_placement are
* master_dist_placement_cache_invalidate is a trigger function that performs
* relcache invalidations when the contents of pg_dist_placement are
* changed on the SQL level.
*
* NB: We decided there is little point in checking permissions here, there
@ -1944,16 +1979,16 @@ master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS)
/* collect shardid for OLD and NEW tuple */
if (oldTuple != NULL)
{
Form_pg_dist_shard_placement distPlacement =
(Form_pg_dist_shard_placement) GETSTRUCT(oldTuple);
Form_pg_dist_placement distPlacement =
(Form_pg_dist_placement) GETSTRUCT(oldTuple);
oldShardId = distPlacement->shardid;
}
if (newTuple != NULL)
{
Form_pg_dist_shard_placement distPlacement =
(Form_pg_dist_shard_placement) GETSTRUCT(newTuple);
Form_pg_dist_placement distPlacement =
(Form_pg_dist_placement) GETSTRUCT(newTuple);
newShardId = distPlacement->shardid;
}
@ -2360,26 +2395,12 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
shardIndex++)
{
ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[shardIndex];
ShardPlacement *placementArray = cacheEntry->arrayOfPlacementArrays[shardIndex];
int numberOfPlacements = cacheEntry->arrayOfPlacementArrayLengths[shardIndex];
GroupShardPlacement *placementArray =
cacheEntry->arrayOfPlacementArrays[shardIndex];
bool valueByVal = shardInterval->valueByVal;
bool foundInCache = false;
int placementIndex = 0;
/* delete the shard's placements */
for (placementIndex = 0;
placementIndex < numberOfPlacements;
placementIndex++)
{
ShardPlacement *placement = &placementArray[placementIndex];
if (placement->nodeName)
{
pfree(placement->nodeName);
}
/* placement itself is deleted as part of the array */
}
pfree(placementArray);
/* delete per-shard cache-entry */
@ -2809,7 +2830,7 @@ CachedRelationLookup(const char *relationName, Oid *cachedOid)
if (*cachedOid == InvalidOid)
{
ereport(ERROR, (errmsg("cache lookup failed for %s, called to early?",
ereport(ERROR, (errmsg("cache lookup failed for %s, called too early?",
relationName)));
}
}

View File

@ -145,8 +145,8 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
* The call to the master_remove_node should be done by the super user and the specified
* node should not have any active placements.
* This function also deletes all reference table placements belong to the given node from
* pg_dist_shard_placement, but it does not drop actual placement at the node. In the case
* of re-adding the node, master_add_node first drops and re-creates the reference tables.
* pg_dist_placement, but it does not drop actual placement at the node. In the case of
* re-adding the node, master_add_node first drops and re-creates the reference tables.
*/
Datum
master_remove_node(PG_FUNCTION_ARGS)
@ -164,31 +164,35 @@ master_remove_node(PG_FUNCTION_ARGS)
/*
* master_disable_node function sets isactive value of the provided node as inactive
* at master node and all nodes with metadata regardless of the node having an
* active shard placement.
* master_disable_node function sets isactive value of the provided node as inactive at
* master node and all nodes with metadata regardless of the node having an active shard
* placement.
*
* The call to the master_disable_node must be done by the super user.
* This function also deletes all reference table placements belong to the given
* node from pg_dist_shard_placement, but it does not drop actual placement at
* the node. In the case of re-activating the node, master_add_node first drops
* and re-creates the reference tables.
*
* This function also deletes all reference table placements belong to the given node
* from pg_dist_placement, but it does not drop actual placement at the node. In the case
* of re-activating the node, master_add_node first drops and re-creates the reference
* tables.
*/
Datum
master_disable_node(PG_FUNCTION_ARGS)
{
const bool onlyConsiderActivePlacements = true;
text *nodeNameText = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeName = text_to_cstring(nodeNameText);
bool hasShardPlacements = false;
bool hasActiveShardPlacements = false;
bool isActive = false;
CheckCitusVersion(ERROR);
DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort);
hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort);
if (hasShardPlacements)
hasActiveShardPlacements = NodeHasShardPlacements(nodeName, nodePort,
onlyConsiderActivePlacements);
if (hasActiveShardPlacements)
{
ereport(NOTICE, (errmsg("Node %s:%d has active shard placements. Some queries "
"may fail after this operation. Use "
@ -224,6 +228,51 @@ master_activate_node(PG_FUNCTION_ARGS)
}
/*
* GroupForNode returns the group which a given node belongs to
*/
uint32
GroupForNode(char *nodeName, int nodePort)
{
WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
}
return workerNode->groupId;
}
/*
* NodeForGroup returns the (unique) node which is in this group.
* In a future where we have nodeRole this will return the primary node.
*/
WorkerNode *
NodeForGroup(uint32 groupId)
{
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
HTAB *workerNodeHash = GetWorkerNodeHash();
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
uint32 workerNodeGroupId = workerNode->groupId;
if (workerNodeGroupId == groupId)
{
hash_seq_term(&status);
return workerNode;
}
}
return NULL;
}
/*
* ActivateNode activates the node with nodeName and nodePort. Currently, activation
* includes only replicating the reference tables and setting isactive column of the
@ -421,7 +470,7 @@ ReadWorkerNodes()
List *workerNodeList = NIL;
TupleDesc tupleDescriptor = NULL;
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
scanDescriptor = systable_beginscan(pgDistNode,
InvalidOid, false,
@ -439,7 +488,7 @@ ReadWorkerNodes()
}
systable_endscan(scanDescriptor);
heap_close(pgDistNode, AccessExclusiveLock);
heap_close(pgDistNode, NoLock);
return workerNodeList;
}
@ -451,15 +500,16 @@ ReadWorkerNodes()
* The call to the master_remove_node should be done by the super user. If there are
* active shard placements on the node; the function errors out.
* This function also deletes all reference table placements belong to the given node from
* pg_dist_shard_placement, but it does not drop actual placement at the node. It also
* pg_dist_placement, but it does not drop actual placement at the node. It also
* modifies replication factor of the colocation group of reference tables, so that
* replication factor will be equal to worker count.
*/
static void
RemoveNodeFromCluster(char *nodeName, int32 nodePort)
{
const bool onlyConsiderActivePlacements = false;
char *nodeDeleteCommand = NULL;
bool hasShardPlacements = false;
bool hasAnyShardPlacements = false;
WorkerNode *workerNode = NULL;
List *referenceTableList = NIL;
uint32 deletedNodeId = INVALID_PLACEMENT_ID;
@ -474,10 +524,17 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
deletedNodeId = workerNode->nodeId;
}
DeleteNodeRow(nodeName, nodePort);
DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort);
hasAnyShardPlacements = NodeHasShardPlacements(nodeName, nodePort,
onlyConsiderActivePlacements);
if (hasAnyShardPlacements)
{
ereport(ERROR, (errmsg("you cannot remove a node which has shard placements")));
}
DeleteNodeRow(nodeName, nodePort);
/*
* After deleting reference tables placements, we will update replication factor
* column for colocation group of reference tables so that replication factor will
@ -495,13 +552,6 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount);
}
hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort);
if (hasShardPlacements)
{
ereport(ERROR, (errmsg("you cannot remove a node which has active "
"shard placements")));
}
nodeDeleteCommand = NodeDeleteCommand(deletedNodeId);
/* make sure we don't have any lingering session lifespan connections */

View File

@ -210,7 +210,7 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId)
/*
* After the table has been officially marked as a reference table, we need to create
* the reference table itself and insert its pg_dist_partition, pg_dist_shard and
* existing pg_dist_shard_placement rows.
* existing pg_dist_placement rows.
*/
CreateTableMetadataOnWorkers(relationId);
}
@ -254,7 +254,7 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval)
* ReplicateShardToNode function replicates given shard to the given worker node
* in a separate transaction. While replicating, it only replicates the shard to the
* workers which does not have a healthy replica of the shard. This function also modifies
* metadata by inserting/updating related rows in pg_dist_shard_placement.
* metadata by inserting/updating related rows in pg_dist_placement.
*/
static void
ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
@ -279,11 +279,12 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
* placements always have shardState = FILE_FINALIZED, in case of an upgrade of
* a non-reference table to reference table, unhealty placements may exist. In
* this case, we repair the shard placement and update its state in
* pg_dist_shard_placement table.
* pg_dist_placement table.
*/
if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED)
{
uint64 placementId = 0;
uint32 groupId = 0;
ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d",
get_rel_name(shardInterval->relationId), nodeName,
@ -293,12 +294,14 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
ddlCommandList);
if (targetPlacement == NULL)
{
groupId = GroupForNode(nodeName, nodePort);
placementId = GetNextPlacementId();
InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0,
nodeName, nodePort);
InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0, groupId);
}
else
{
groupId = targetPlacement->groupId;
placementId = targetPlacement->placementId;
UpdateShardPlacementState(placementId, FILE_FINALIZED);
}
@ -314,7 +317,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
{
char *placementCommand = PlacementUpsertCommand(shardId, placementId,
FILE_FINALIZED, 0,
nodeName, nodePort);
groupId);
SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand);
}
@ -380,7 +383,7 @@ CreateReferenceTableColocationId()
/*
* DeleteAllReferenceTablePlacementsFromNode function iterates over list of reference
* tables and deletes all reference table placements from pg_dist_shard_placement table
* tables and deletes all reference table placements from pg_dist_placement table
* for given worker node. However, it does not modify replication factor of the colocation
* group of reference tables. It is caller's responsibility to do that if it is necessary.
*/
@ -403,20 +406,24 @@ DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort)
referenceTableList = SortList(referenceTableList, CompareOids);
foreach(referenceTableCell, referenceTableList)
{
Oid referenceTableId = lfirst_oid(referenceTableCell);
uint32 workerGroup = GroupForNode(workerName, workerPort);
Oid referenceTableId = lfirst_oid(referenceTableCell);
List *placements = GroupShardPlacementsForTableOnGroup(referenceTableId,
workerGroup);
GroupShardPlacement *placement = (GroupShardPlacement *) linitial(placements);
uint64 shardId = placement->shardId;
uint64 placementId = placement->placementId;
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
uint64 placementId = INVALID_PLACEMENT_ID;
StringInfo deletePlacementCommand = makeStringInfo();
LockShardDistributionMetadata(shardId, ExclusiveLock);
placementId = DeleteShardPlacementRow(shardId, workerName, workerPort);
DeleteShardPlacementRow(placementId);
appendStringInfo(deletePlacementCommand,
"DELETE FROM pg_dist_shard_placement WHERE placementid=%lu",
"DELETE FROM pg_dist_placement WHERE placementid=%lu",
placementId);
SendCommandToWorkers(WORKERS_WITH_METADATA, deletePlacementCommand->data);
}

View File

@ -32,7 +32,7 @@ PG_FUNCTION_INFO_V1(worker_drop_distributed_table);
/*
* worker_drop_distributed_table drops the distributed table with the given oid,
* then, removes the associated rows from pg_dist_partition, pg_dist_shard and
* pg_dist_shard_placement. The function also drops the server for foreign tables.
* pg_dist_placement. The function also drops the server for foreign tables.
*
* Note that drop fails if any dependent objects are present for any of the
* distributed tables. Also, shard placements of the distributed tables are
@ -117,11 +117,9 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
/* delete the row from pg_dist_shard_placement */
DeleteShardPlacementRow(shardId, workerName, workerPort);
/* delete the row from pg_dist_placement */
DeleteShardPlacementRow(placement->placementId);
}
/* delete the row from pg_dist_shard */

View File

@ -48,6 +48,7 @@ extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
extern READFUNC_RET ReadRelationShard(READFUNC_ARGS);
extern READFUNC_RET ReadTask(READFUNC_ARGS);
extern READFUNC_RET ReadDeferredErrorMessage(READFUNC_ARGS);
extern READFUNC_RET ReadGroupShardPlacement(READFUNC_ARGS);
extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS);
@ -59,6 +60,7 @@ extern void OutShardPlacement(OUTFUNC_ARGS);
extern void OutRelationShard(OUTFUNC_ARGS);
extern void OutTask(OUTFUNC_ARGS);
extern void OutDeferredErrorMessage(OUTFUNC_ARGS);
extern void OutGroupShardPlacement(OUTFUNC_ARGS);
extern void OutMultiNode(OUTFUNC_ARGS);
extern void OutMultiTreeRoot(OUTFUNC_ARGS);

View File

@ -37,6 +37,8 @@
* Citus Node Tags
*
* These have to be distinct from the ideas used in postgres' nodes.h
*
* NOTE: This list must match CitusNodeTagNamesD from citus_nodefuncs.c
*/
#define CITUS_NODE_TAG_START 1200
typedef enum CitusNodeTag
@ -58,7 +60,8 @@ typedef enum CitusNodeTag
T_ShardInterval,
T_ShardPlacement,
T_RelationShard,
T_DeferredErrorMessage
T_DeferredErrorMessage,
T_GroupShardPlacement
} CitusNodeTag;

View File

@ -69,14 +69,32 @@ typedef struct ShardInterval
} ShardInterval;
/* In-memory representation of a tuple in pg_dist_shard_placement. */
typedef struct ShardPlacement
/* In-memory representation of a tuple in pg_dist_placement. */
typedef struct GroupShardPlacement
{
CitusNode type;
uint64 placementId; /* sequence that implies this placement creation order */
uint64 placementId; /* sequence that implies this placement creation order */
uint64 shardId;
uint64 shardLength;
RelayFileState shardState;
uint32 groupId;
} GroupShardPlacement;
/* A GroupShardPlacement which has had some extra data resolved */
typedef struct ShardPlacement
{
/*
* careful, the rest of the code assumes this exactly matches GroupShardPlacement
*/
CitusNode type;
uint64 placementId;
uint64 shardId;
uint64 shardLength;
RelayFileState shardState;
uint32 groupId;
/* the rest of the fields aren't from pg_dist_placement */
char *nodeName;
uint32 nodePort;
char partitionMethod;
@ -97,10 +115,12 @@ extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInt
extern void CopyShardPlacement(ShardPlacement *srcPlacement,
ShardPlacement *destPlacement);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeHasActiveShardPlacements(char *nodeName, int32 nodePort);
extern bool NodeHasShardPlacements(char *nodeName, int32 nodePort,
bool onlyLookForActivePlacements);
extern List * FinalizedShardPlacementList(uint64 shardId);
extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk);
extern List * BuildShardPlacementList(ShardInterval *shardInterval);
extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, uint32 groupId);
/* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
@ -108,15 +128,14 @@ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
extern void DeleteShardRow(uint64 shardId);
extern void InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength,
char *nodeName, uint32 nodePort);
uint32 groupId);
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId,
char replicationModel);
extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardRow(uint64 shardId);
extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32
workerPort);
extern void DeleteShardPlacementRow(uint64 placementId);
extern void UpdateColocationGroupReplicationFactor(uint32 colocationId,
int replicationFactor);
extern void CreateTruncateTrigger(Oid relationId);

View File

@ -51,7 +51,7 @@
#define CSTORE_FDW_NAME "cstore_fdw"
#define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq"
#define PLACEMENTID_SEQUENCE_NAME "pg_dist_shard_placement_placementid_seq"
#define PLACEMENTID_SEQUENCE_NAME "pg_dist_placement_placementid_seq"
/* Remote call definitions to help with data staging and deletion */
#define WORKER_APPLY_SHARD_DDL_COMMAND \

View File

@ -60,8 +60,8 @@ typedef struct
FmgrInfo *shardIntervalCompareFunction;
FmgrInfo *hashFunction; /* NULL if table is not distributed by hash */
/* pg_dist_shard_placement metadata */
ShardPlacement **arrayOfPlacementArrays;
/* pg_dist_placement metadata */
GroupShardPlacement **arrayOfPlacementArrays;
int *arrayOfPlacementArrayLengths;
} DistTableCacheEntry;
@ -69,9 +69,8 @@ typedef struct
extern bool IsDistributedTable(Oid relationId);
extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern ShardPlacement * FindShardPlacementOnNode(char *nodeName, int nodePort,
uint64 shardId);
extern ShardPlacement * FindShardPlacementOnGroup(uint32 groupId, uint64 shardId);
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern int GetLocalGroupId(void);
extern List * DistTableOidList(void);
@ -93,7 +92,7 @@ extern Oid DistColocationConfigurationIndexId(void);
extern Oid DistColocationColocationidIndexId(void);
extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void);
extern Oid DistShardPlacementRelationId(void);
extern Oid DistPlacementRelationId(void);
extern Oid DistNodeRelationId(void);
extern Oid DistLocalGroupIdRelationId(void);
@ -102,11 +101,11 @@ extern Oid DistPartitionLogicalRelidIndexId(void);
extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void);
extern Oid DistShardPlacementShardidIndexId(void);
extern Oid DistShardPlacementPlacementidIndexId(void);
extern Oid DistPlacementShardidIndexId(void);
extern Oid DistPlacementPlacementidIndexId(void);
extern Oid DistTransactionRelationId(void);
extern Oid DistTransactionGroupIndexId(void);
extern Oid DistShardPlacementNodeidIndexId(void);
extern Oid DistPlacementGroupidIndexId(void);
/* function oids */
extern Oid CitusExtraDataContainerFuncId(void);

View File

@ -34,7 +34,7 @@ extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive);
extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId);
extern char * CreateSchemaDDLCommand(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, char *nodeName, uint32 nodePort);
uint64 shardLength, uint32 groupId);
extern void CreateTableMetadataOnWorkers(Oid relationId);
@ -43,16 +43,15 @@ extern void CreateTableMetadataOnWorkers(Oid relationId);
"SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition"
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
#define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s)"
#define UPSERT_PLACEMENT "INSERT INTO pg_dist_shard_placement " \
#define UPSERT_PLACEMENT "INSERT INTO pg_dist_placement " \
"(shardid, shardstate, shardlength, " \
"nodename, nodeport, placementid) " \
"VALUES (%lu, %d, %lu, %s, %d, %lu) " \
"groupid, placementid) " \
"VALUES (%lu, %d, %lu, %d, %lu) " \
"ON CONFLICT (placementid) DO UPDATE SET " \
"shardid = EXCLUDED.shardid, " \
"shardstate = EXCLUDED.shardstate, " \
"shardlength = EXCLUDED.shardlength, " \
"nodename = EXCLUDED.nodename, " \
"nodeport = EXCLUDED.nodeport"
"groupid = EXCLUDED.groupid"
#endif /* METADATA_SYNC_H */

View File

@ -43,7 +43,6 @@ extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern void ExecuteTasksSequentiallyWithoutResults(List *taskList);
extern List * BuildPlacementSelectList(char *nodeName, int nodePort,
List *relationShardList);
extern List * BuildPlacementSelectList(uint32 groupId, List *relationShardList);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -0,0 +1,52 @@
/*-------------------------------------------------------------------------
*
* pg_dist_placement.h
* definition of the "server" relation (pg_dist_placement).
*
* This table keeps information on remote shards and their whereabouts on the
* master node. The table's contents are updated and used as follows: (i) the
* worker nodes send periodic reports about the shards they contain, and (ii)
* the master reconciles these shard reports, and determines outdated, under-
* and over-replicated shards.
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_PLACEMENT_H
#define PG_DIST_PLACEMENT_H
/* ----------------
* pg_dist_placement definition.
* ----------------
*/
typedef struct FormData_pg_dist_placement
{
int64 placementid; /* global placementId on remote node */
int64 shardid; /* global shardId on remote node */
int32 shardstate; /* shard state on remote node; see RelayFileState */
int64 shardlength; /* shard length on remote node; stored as bigint */
int32 groupid; /* the group the shard is placed on */
} FormData_pg_dist_placement;
/* ----------------
* Form_pg_dist_placement corresponds to a pointer to a tuple with
* the format of pg_dist_placement relation.
* ----------------
*/
typedef FormData_pg_dist_placement *Form_pg_dist_placement;
/* ----------------
* compiler constants for pg_dist_placement
* ----------------
*/
#define Natts_pg_dist_placement 5
#define Anum_pg_dist_placement_placementid 1
#define Anum_pg_dist_placement_shardid 2
#define Anum_pg_dist_placement_shardstate 3
#define Anum_pg_dist_placement_shardlength 4
#define Anum_pg_dist_placement_groupid 5
#endif /* PG_DIST_PLACEMENT_H */

View File

@ -1,56 +0,0 @@
/*-------------------------------------------------------------------------
*
* pg_dist_shard_placement.h
* definition of the "server" relation (pg_dist_shard_placement).
*
* This table keeps information on remote shards and their whereabouts on the
* master node. The table's contents are updated and used as follows: (i) the
* worker nodes send periodic reports about the shards they contain, and (ii)
* the master reconciles these shard reports, and determines outdated, under-
* and over-replicated shards.
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_SHARD_PLACEMENT_H
#define PG_DIST_SHARD_PLACEMENT_H
/* ----------------
* pg_dist_shard_placement definition.
* ----------------
*/
typedef struct FormData_pg_dist_shard_placement
{
int64 shardid; /* global shardId on remote node */
int32 shardstate; /* shard state on remote node; see RelayFileState */
int64 shardlength; /* shard length on remote node; stored as bigint */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text nodename; /* remote node's host name */
int32 nodeport; /* remote node's port number */
int64 placementid; /* global placementId on remote node (added later) */
#endif
} FormData_pg_dist_shard_placement;
/* ----------------
* Form_pg_dist_shard_placement corresponds to a pointer to a tuple with
* the format of pg_dist_shard_placement relation.
* ----------------
*/
typedef FormData_pg_dist_shard_placement *Form_pg_dist_shard_placement;
/* ----------------
* compiler constants for pg_dist_shard_placement
* ----------------
*/
#define Natts_pg_dist_shard_placement 6
#define Anum_pg_dist_shard_placement_shardid 1
#define Anum_pg_dist_shard_placement_shardstate 2
#define Anum_pg_dist_shard_placement_shardlength 3
#define Anum_pg_dist_shard_placement_nodename 4
#define Anum_pg_dist_shard_placement_nodeport 5
#define Anum_pg_dist_shard_placement_placementid 6
#endif /* PG_DIST_SHARD_PLACEMENT_H */

View File

@ -53,9 +53,6 @@ extern Datum distributed_tables_exist(PG_FUNCTION_ARGS);
extern Datum column_name_to_column(PG_FUNCTION_ARGS);
extern Datum column_name_to_column_id(PG_FUNCTION_ARGS);
extern Datum create_monolithic_shard_row(PG_FUNCTION_ARGS);
extern Datum create_healthy_local_shard_placement_row(PG_FUNCTION_ARGS);
extern Datum delete_shard_placement_row(PG_FUNCTION_ARGS);
extern Datum update_shard_placement_row_state(PG_FUNCTION_ARGS);
extern Datum next_shard_id(PG_FUNCTION_ARGS);
extern Datum acquire_shared_shard_lock(PG_FUNCTION_ARGS);

View File

@ -63,6 +63,8 @@ extern List * ActiveWorkerNodeList(void);
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
extern List * ReadWorkerNodes(void);
extern void EnsureCoordinator(void);
extern uint32 GroupForNode(char *nodeName, int32 nodePorT);
extern WorkerNode * NodeForGroup(uint32 groupId);
/* Function declarations for worker node utilities */
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);

View File

@ -16,4 +16,5 @@
/worker_copy.out
/multi_complex_count_distinct.out
/multi_mx_copy_data.out
/multi_behavioral_analytics_create_table.out
/multi_insert_select_behavioral_analytics_create_table.out

View File

@ -295,7 +295,9 @@ step s2-commit:
<waiting ...>
step s2-commit: <... completed>
step s1-add-second-worker: <... completed>
error in steps s2-commit s1-add-second-worker: ERROR: deadlock detected
nodename nodeport isactive
localhost 57638 t
step s2-print-content:
SELECT
nodeport, success, result
@ -307,9 +309,11 @@ step s2-print-content:
nodeport success result
57637 t 5
57638 t 5
master_remove_node
starting permutation: s1-begin s1-add-second-worker s2-insert-to-reference-table s1-commit s2-print-content
create_reference_table

View File

@ -294,7 +294,9 @@ step s2-commit:
COMMIT;
step s1-add-second-worker: <... completed>
error in steps s2-commit s1-add-second-worker: ERROR: deadlock detected
nodename nodeport isactive
localhost 57638 t
step s2-print-content:
SELECT
nodeport, success, result
@ -306,9 +308,11 @@ step s2-print-content:
nodeport success result
57637 t 5
57638 t 5
master_remove_node
starting permutation: s1-begin s1-add-second-worker s2-insert-to-reference-table s1-commit s2-print-content
create_reference_table

View File

@ -0,0 +1,29 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 103000;
-- tests that the upgrade from 7.0-2 to 7.0-3 properly migrates shard placements
DROP EXTENSION citus;
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '7.0-2';
INSERT INTO pg_dist_shard_placement
(placementid, shardid, shardstate, shardlength, nodename, nodeport) VALUES
(1, 1, 1, 0, 'localhost', :worker_1_port);
-- if there are no worker nodes which match the shards this should fail
ALTER EXTENSION citus UPDATE TO '7.0-3';
ERROR: There is no node at "localhost:57637"
CONTEXT: PL/pgSQL function citus.find_groupid_for_node(text,integer) line 6 at RAISE
-- if you add a matching worker the upgrade should succeed
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------------------
(1,1,localhost,57637,default,f,t)
(1 row)
ALTER EXTENSION citus UPDATE TO '7.0-3';
SELECT * FROM pg_dist_placement;
placementid | shardid | shardstate | shardlength | groupid
-------------+---------+------------+-------------+---------
1 | 1 | 1 | 0 | 1
(1 row)
-- reset and prepare for the rest of the tests
DROP EXTENSION citus;
CREATE EXTENSION citus;

View File

@ -7,16 +7,16 @@ SELECT create_reference_table('test_reference_table');
ERROR: cannot create reference table "test_reference_table"
DETAIL: There are no active worker nodes.
-- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-----------------------------------
(1,1,localhost,57637,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
?column?
----------
1
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(2,2,localhost,57638,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
-- get the active nodes
@ -57,10 +57,10 @@ SELECT master_get_active_worker_nodes();
(1 row)
-- try to disable a node with no placements see that node is removed
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(3,3,localhost,57638,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
SELECT master_disable_node('localhost', :worker_2_port);
@ -111,7 +111,7 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHER
-- try to remove a node with active placements and see that node removal is failed
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: you cannot remove a node which has active shard placements
ERROR: you cannot remove a node which has shard placements
SELECT master_get_active_worker_nodes();
master_get_active_worker_nodes
--------------------------------
@ -137,17 +137,18 @@ SELECT master_get_active_worker_nodes();
(1 row)
-- restore the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
SELECT master_activate_node('localhost', :worker_2_port);
master_activate_node
-----------------------------------
(3,3,localhost,57638,default,f,f)
(3,3,localhost,57638,default,f,t)
(1 row)
-- try to remove a node with active placements and see that node removal is failed
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: you cannot remove a node which has active shard placements
ERROR: you cannot remove a node which has shard placements
-- mark all placements in the candidate node as inactive
UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port;
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
UPDATE pg_dist_placement SET shardstate=3 WHERE groupid=:worker_2_group;
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
@ -161,27 +162,24 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHER
1220015 | 3 | localhost | 57638
(8 rows)
-- try to remove a node with only inactive placements and see that node is removed
-- try to remove a node with only inactive placements and see that removal still fails
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
ERROR: you cannot remove a node which has shard placements
SELECT master_get_active_worker_nodes();
master_get_active_worker_nodes
--------------------------------
(localhost,57638)
(localhost,57637)
(1 row)
(2 rows)
-- clean-up
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(4,4,localhost,57638,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;
UPDATE pg_dist_placement SET shardstate=1 WHERE groupid=:worker_2_group;
DROP TABLE cluster_management_test;
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
SELECT master_remove_node('localhost', :worker_2_port);
@ -191,10 +189,10 @@ SELECT master_remove_node('localhost', :worker_2_port);
(1 row)
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(5,5,localhost,57638,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
\c - - - :worker_1_port
@ -220,10 +218,10 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep
\c - - - :master_port
-- check that added nodes are not propagated to nodes with hasmetadata=false
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(6,6,localhost,57638,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
\c - - - :worker_1_port
@ -253,14 +251,14 @@ SELECT
master_add_node('localhost', :worker_2_port);
master_add_node | master_add_node
-----------------------------------+-----------------------------------
(7,7,localhost,57637,default,f,t) | (8,8,localhost,57638,default,f,t)
(6,6,localhost,57637,default,f,t) | (7,7,localhost,57638,default,f,t)
(1 row)
SELECT * FROM pg_dist_node ORDER BY nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
--------+---------+-----------+----------+----------+-------------+----------
7 | 7 | localhost | 57637 | default | f | t
8 | 8 | localhost | 57638 | default | f | t
6 | 6 | localhost | 57637 | default | f | t
7 | 7 | localhost | 57638 | default | f | t
(2 rows)
-- check that mixed add/remove node commands work fine inside transaction
@ -271,10 +269,10 @@ SELECT master_remove_node('localhost', :worker_2_port);
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-----------------------------------
(9,9,localhost,57638,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
SELECT master_remove_node('localhost', :worker_2_port);
@ -291,10 +289,10 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------------
(10,10,localhost,57638,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
SELECT master_remove_node('localhost', :worker_2_port);
@ -303,10 +301,10 @@ SELECT master_remove_node('localhost', :worker_2_port);
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------------
(11,11,localhost,57638,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
COMMIT;
@ -331,16 +329,16 @@ SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
(2 rows)
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
-------------------------------------
(12,12,localhost,57637,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
?column?
----------
1
(1 row)
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------------
(13,13,localhost,57638,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
-- check that a distributed table can be created after adding a node in a transaction
@ -351,10 +349,10 @@ SELECT master_remove_node('localhost', :worker_2_port);
(1 row)
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
-------------------------------------
(14,14,localhost,57638,default,f,t)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
CREATE TABLE temp(col1 text, col2 int);
@ -392,7 +390,7 @@ DROP TABLE temp;
\c - - - :worker_1_port
DELETE FROM pg_dist_partition;
DELETE FROM pg_dist_shard;
DELETE FROM pg_dist_shard_placement;
DELETE FROM pg_dist_placement;
DELETE FROM pg_dist_node;
\c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);

View File

@ -34,18 +34,6 @@ CREATE FUNCTION create_monolithic_shard_row(regclass)
RETURNS bigint
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION create_healthy_local_shard_placement_row(bigint)
RETURNS void
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION delete_shard_placement_row(bigint, text, bigint)
RETURNS bool
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION update_shard_placement_row_state(bigint, text, bigint, int)
RETURNS bool
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION acquire_shared_shard_lock(bigint)
RETURNS void
AS 'citus'
@ -72,7 +60,8 @@ SELECT master_create_worker_shards('events_hash', 4, 2);
(1 row)
-- set shardstate of one replication from each shard to 0 (invalid value)
UPDATE pg_dist_shard_placement SET shardstate = 0 WHERE nodeport = 57638 AND shardid BETWEEN 540000 AND 540003;
UPDATE pg_dist_placement SET shardstate = 0 WHERE shardid BETWEEN 540000 AND 540003
AND groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port);
-- should see above shard identifiers
SELECT load_shard_id_array('events_hash');
load_shard_id_array
@ -187,7 +176,7 @@ ERROR: cannot reference system column "ctid" in relation "events_hash"
SELECT column_name_to_column_id('events_hash', 'non_existent');
ERROR: column "non_existent" of relation "events_hash" does not exist
-- drop shard rows (must drop placements first)
DELETE FROM pg_dist_shard_placement
DELETE FROM pg_dist_placement
WHERE shardid BETWEEN 540000 AND 540004;
DELETE FROM pg_dist_shard
WHERE logicalrelid = 'events_hash'::regclass;
@ -227,53 +216,6 @@ WHERE shardid = :new_shard_id;
t | -2147483648 | 2147483647
(1 row)
-- add a placement and manually inspect row
SELECT create_healthy_local_shard_placement_row(:new_shard_id);
create_healthy_local_shard_placement_row
------------------------------------------
(1 row)
SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement
WHERE shardid = :new_shard_id AND nodename = 'localhost' and nodeport = 5432;
shardstate | nodename | nodeport
------------+-----------+----------
1 | localhost | 5432
(1 row)
-- mark it as unhealthy and inspect
SELECT update_shard_placement_row_state(:new_shard_id, 'localhost', 5432, 3);
update_shard_placement_row_state
----------------------------------
t
(1 row)
SELECT shardstate FROM pg_dist_shard_placement
WHERE shardid = :new_shard_id AND nodename = 'localhost' and nodeport = 5432;
shardstate
------------
3
(1 row)
-- remove it and verify it is gone
SELECT delete_shard_placement_row(:new_shard_id, 'localhost', 5432);
delete_shard_placement_row
----------------------------
t
(1 row)
SELECT COUNT(*) FROM pg_dist_shard_placement
WHERE shardid = :new_shard_id AND nodename = 'localhost' and nodeport = 5432;
count
-------
0
(1 row)
-- deleting or updating a non-existent row should fail
SELECT delete_shard_placement_row(:new_shard_id, 'wrong_localhost', 5432);
ERROR: could not find valid entry for shard placement 540005 on node "wrong_localhost:5432"
SELECT update_shard_placement_row_state(:new_shard_id, 'localhost', 5432, 3);
ERROR: could not find valid entry for shard placement 540005 on node "localhost:5432"
-- now we'll even test our lock methods...
-- use transaction to bound how long we hold the lock
BEGIN;

View File

@ -112,6 +112,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-3';
ALTER EXTENSION citus UPDATE TO '6.2-4';
ALTER EXTENSION citus UPDATE TO '7.0-1';
ALTER EXTENSION citus UPDATE TO '7.0-2';
ALTER EXTENSION citus UPDATE TO '7.0-3';
-- show running version
SHOW citus.version;
citus.version

View File

@ -4,9 +4,9 @@
-- Tests for metadata snapshot functions, metadata syncing functions and propagation of
-- metadata changes to MX tables.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1310000;
SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_placement_id
SELECT nextval('pg_catalog.pg_dist_placement_placementid_seq') AS last_placement_id
\gset
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100000;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
-- Create the necessary test utility function
@ -29,8 +29,8 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
SELECT unnest(master_metadata_snapshot());
unnest
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
(3 rows)
@ -55,8 +55,8 @@ UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='mx_test_table'::re
SELECT unnest(master_metadata_snapshot());
unnest
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
@ -66,7 +66,7 @@ SELECT unnest(master_metadata_snapshot());
ALTER TABLE public.mx_test_table OWNER TO postgres
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
SELECT worker_create_truncate_trigger('public.mx_test_table')
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007)
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(13 rows)
@ -77,8 +77,8 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
SELECT unnest(master_metadata_snapshot());
unnest
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
@ -89,7 +89,7 @@ SELECT unnest(master_metadata_snapshot());
ALTER TABLE public.mx_test_table OWNER TO postgres
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
SELECT worker_create_truncate_trigger('public.mx_test_table')
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007)
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(14 rows)
@ -101,8 +101,8 @@ HINT: Connect to worker nodes directly to manually change schemas of affected o
SELECT unnest(master_metadata_snapshot());
unnest
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
@ -115,7 +115,7 @@ SELECT unnest(master_metadata_snapshot());
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007)
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(16 rows)
@ -131,8 +131,8 @@ UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='non_mx_test_table'
SELECT unnest(master_metadata_snapshot());
unnest
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
@ -145,7 +145,7 @@ SELECT unnest(master_metadata_snapshot());
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007)
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(16 rows)
@ -154,8 +154,8 @@ UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_tabl
SELECT unnest(master_metadata_snapshot());
unnest
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
TRUNCATE pg_dist_node
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
TRUNCATE pg_dist_node
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
@ -168,7 +168,7 @@ SELECT unnest(master_metadata_snapshot());
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007)
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(16 rows)
@ -1098,10 +1098,11 @@ DROP TABLE mx_table_with_sequence;
-- Remove a node so that shards and sequences won't be created on table creation. Therefore,
-- we can test that start_metadata_sync_to_node can actually create the sequence with proper
-- owner
CREATE TABLE pg_dist_shard_placement_temp AS SELECT * FROM pg_dist_shard_placement;
CREATE TABLE pg_dist_placement_temp AS SELECT * FROM pg_dist_placement;
CREATE TABLE pg_dist_partition_temp AS SELECT * FROM pg_dist_partition;
DELETE FROM pg_dist_shard_placement;
DELETE FROM pg_dist_placement;
DELETE FROM pg_dist_partition;
SELECT groupid AS old_worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
@ -1180,11 +1181,24 @@ SELECT * FROM mx_table ORDER BY a;
\c - mx_user - :master_port
DROP TABLE mx_table;
-- put the metadata back into a consistent state
\c - postgres - :master_port
INSERT INTO pg_dist_shard_placement SELECT * FROM pg_dist_shard_placement_temp;
INSERT INTO pg_dist_placement SELECT * FROM pg_dist_placement_temp;
INSERT INTO pg_dist_partition SELECT * FROM pg_dist_partition_temp;
DROP TABLE pg_dist_shard_placement_temp;
DROP TABLE pg_dist_placement_temp;
DROP TABLE pg_dist_partition_temp;
UPDATE pg_dist_placement
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
WHERE groupid = :old_worker_2_group;
\c - - - :worker_1_port
UPDATE pg_dist_placement
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
WHERE groupid = :old_worker_2_group;
\c - - - :worker_2_port
UPDATE pg_dist_placement
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
WHERE groupid = :old_worker_2_group;
\c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
stop_metadata_sync_to_node
----------------------------
@ -1294,8 +1308,12 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid;
-- Check that master_add_node propagates the metadata about new placements of a reference table
\c - - - :master_port
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
SELECT groupid AS old_worker_2_group
FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
CREATE TABLE tmp_placement AS
SELECT * FROM pg_dist_placement WHERE groupid = :old_worker_2_group;
DELETE FROM pg_dist_placement
WHERE groupid = :old_worker_2_group;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
@ -1355,9 +1373,17 @@ ORDER BY shardid, nodeport;
1310184 | localhost | 57638
(2 rows)
-- Get the metadata back into a consistent state
\c - - - :master_port
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;
INSERT INTO pg_dist_placement (SELECT * FROM tmp_placement);
DROP TABLE tmp_placement;
UPDATE pg_dist_placement
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
WHERE groupid = :old_worker_2_group;
\c - - - :worker_1_port
UPDATE pg_dist_placement
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
WHERE groupid = :old_worker_2_group;
-- Cleanup
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
@ -1384,4 +1410,4 @@ RESET citus.multi_shard_commit_protocol;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART :last_placement_id;

View File

@ -1,4 +1,5 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1200000;
-- ===================================================================
-- test end-to-end modification functionality
-- ===================================================================
@ -205,7 +206,7 @@ BEGIN;
INSERT INTO researchers VALUES (2, 1, 'Knuth Donald');
INSERT INTO researchers VALUES (10, 6, 'Lamport Leslie');
\copy researchers from stdin delimiter ','
ERROR: cannot establish a new connection for placement 2704, since DML has been executed on a connection that is in use
ERROR: cannot establish a new connection for placement 1200003, since DML has been executed on a connection that is in use
CONTEXT: COPY researchers, line 2: "10,6,Lesport Lampie"
ROLLBACK;
-- after a COPY you can modify multiple shards, since they'll use different connections
@ -1382,7 +1383,7 @@ BEGIN;
INSERT INTO users VALUES (2, 'burak');
INSERT INTO users VALUES (3, 'burak');
\COPY items FROM STDIN WITH CSV
ERROR: cannot establish a new connection for placement 2743, since DML has been executed on a connection that is in use
ERROR: cannot establish a new connection for placement 1200042, since DML has been executed on a connection that is in use
END;
-- cannot perform DDL after a co-located table has been read over 1 connection
BEGIN;
@ -1434,7 +1435,7 @@ SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id =
-- perform a DDL command on the reference table
ALTER TABLE itemgroups ADD COLUMN last_update timestamptz;
ERROR: cannot perform DDL on placement 2737, which has been read over multiple connections
ERROR: cannot perform DDL on placement 1200036, which has been read over multiple connections
END;
BEGIN;
-- establish multiple connections to a node
@ -1452,7 +1453,7 @@ SELECT user_id FROM items JOIN itemgroups ON (item_group = gid) WHERE user_id =
-- perform a DDL command on a co-located reference table
ALTER TABLE usergroups ADD COLUMN last_update timestamptz;
ERROR: cannot perform DDL on placement 2735 since a co-located placement has been read over multiple connections
ERROR: cannot perform DDL on placement 1200034 since a co-located placement has been read over multiple connections
END;
BEGIN;
-- make a modification over connection 1

View File

@ -265,32 +265,18 @@ SELECT create_distributed_table('mx_sequence', 'key');
(1 row)
\c - - - :worker_1_port
SELECT groupid FROM pg_dist_local_group;
groupid
---------
12
(1 row)
SELECT last_value FROM mx_sequence_value_seq;
last_value
------------------
3377699720527873
(1 row)
SELECT last_value AS worker_1_lastval FROM mx_sequence_value_seq \gset
\c - - - :worker_2_port
SELECT groupid FROM pg_dist_local_group;
groupid
---------
14
(1 row)
SELECT last_value FROM mx_sequence_value_seq;
last_value
------------------
3940649673949185
(1 row)
SELECT last_value AS worker_2_lastval FROM mx_sequence_value_seq \gset
\c - - - :master_port
-- don't look at the actual values because they rely on the groupids of the nodes
-- which can change depending on the tests which have run before this one
SELECT :worker_1_lastval = :worker_2_lastval;
?column?
----------
f
(1 row)
-- the type of sequences can't be changed
ALTER TABLE mx_sequence ALTER value TYPE BIGINT;
NOTICE: using one-phase commit for distributed DDL commands

View File

@ -255,8 +255,9 @@ CREATE TABLE should_be_sorted_into_middle (value int);
PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
\c - - - :master_port
-- Add "fake" pg_dist_transaction records and run recovery
INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_be_forgotten');
SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport = :worker_1_port \gset
INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_be_forgotten');
SELECT recover_prepared_transactions();
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort'

View File

@ -446,6 +446,19 @@ END;
COMMIT
-- verify interaction of default values, SERIAL, and RETURNING
\set QUIET on
-- make sure this test always returns the same output no matter which tests have run
SELECT minimum_value::bigint AS min_value,
maximum_value::bigint AS max_value
FROM information_schema.sequences
WHERE sequence_name = 'app_analytics_events_mx_id_seq' \gset
SELECT last_value FROM app_analytics_events_mx_id_seq \gset
ALTER SEQUENCE app_analytics_events_mx_id_seq NO MINVALUE NO MAXVALUE;
SELECT setval('app_analytics_events_mx_id_seq'::regclass, 3940649673949184);
setval
------------------
3940649673949184
(1 row)
INSERT INTO app_analytics_events_mx VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
id
------------------
@ -464,3 +477,12 @@ INSERT INTO app_analytics_events_mx (app_id, name) VALUES (103, 'Mynt') RETURNIN
3940649673949187 | 103 | Mynt
(1 row)
-- clean up
SELECT setval('app_analytics_events_mx_id_seq'::regclass, :last_value);
setval
------------------
3659174697238529
(1 row)
ALTER SEQUENCE app_analytics_events_mx_id_seq
MINVALUE :min_value MAXVALUE :max_value;

View File

@ -18,7 +18,7 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- remove non-existing node
SELECT master_remove_node('localhost', 55555);
ERROR: could not find valid entry for node "localhost:55555"
ERROR: node at "localhost:55555" does not exist
-- remove a node with no reference tables
-- verify node exist before removal
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
@ -160,7 +160,7 @@ WHERE
\c - - - :master_port
-- remove same node twice
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: could not find valid entry for node "localhost:57638"
ERROR: node at "localhost:57638" does not exist
-- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638

View File

@ -1,4 +1,6 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 820000;
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
-- ===================================================================
-- test shard repair functionality
-- ===================================================================
@ -36,7 +38,8 @@ INSERT INTO customer_engagements VALUES (1, '03-01-2015', 'third event');
SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass
\gset
-- now, update the second placement as unhealthy
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :newshardid AND nodeport = :worker_2_port;
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid
AND groupid = :worker_2_group;
-- cannot repair a shard after a modification (transaction still open during repair)
BEGIN;
ALTER TABLE customer_engagements ADD COLUMN value float;
@ -70,10 +73,12 @@ SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'lo
INSERT INTO customer_engagements VALUES (4, '04-01-2015', 'fourth event');
ROLLBACK;
-- add a fake healthy placement for the tests
INSERT INTO pg_dist_shard_placement (nodename, nodeport, shardid, shardstate, shardlength)
VALUES ('dummyhost', :worker_2_port, :newshardid, 1, 0);
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'dummyhost', :worker_2_port);
INSERT INTO pg_dist_placement (groupid, shardid, shardstate, shardlength)
VALUES (:worker_2_group, :newshardid, 1, 0);
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: target placement must be in inactive state
DELETE FROM pg_dist_placement
WHERE groupid = :worker_2_group AND shardid = :newshardid AND shardstate = 1;
-- also try to copy from an inactive placement
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
ERROR: source placement must be in finalized state
@ -85,9 +90,7 @@ SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'lo
(1 row)
-- now, update first placement as unhealthy (and raise a notice) so that queries are not routed to there
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :newshardid AND nodeport = :worker_1_port;
-- we are done with dummyhost, it is safe to remove it
DELETE FROM pg_dist_shard_placement WHERE nodename = 'dummyhost';
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid AND groupid = :worker_1_group;
-- get the data from the second placement
SELECT * FROM customer_engagements;
id | created_at | event_data
@ -122,7 +125,7 @@ NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
SELECT shardid as remotenewshardid FROM pg_dist_shard WHERE logicalrelid = 'remote_engagements'::regclass
\gset
-- now, update the second placement as unhealthy
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AND nodeport = :worker_2_port;
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AND groupid = :worker_2_group;
-- oops! we don't support repairing shards backed by foreign tables
SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: cannot repair shard

View File

@ -335,7 +335,6 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
(1 row)
\c - - - :worker_2_port
DELETE FROM pg_dist_node;
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition;
worker_drop_distributed_table
-------------------------------
@ -343,6 +342,7 @@ SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition;
(2 rows)
DELETE FROM pg_dist_node;
\c - - - :worker_1_port
-- DROP TABLE
DROP TABLE mx_table;
@ -372,8 +372,9 @@ FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid = 'mx_table'::regclass AND nodeport=:worker_1_port
ORDER BY shardid
LIMIT 1 \gset
INSERT INTO pg_dist_shard_placement (nodename, nodeport, shardid, shardstate, shardlength)
VALUES ('localhost', :worker_2_port, :testshardid, 3, 0);
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
INSERT INTO pg_dist_placement (groupid, shardid, shardstate, shardlength)
VALUES (:worker_2_group, :testshardid, 3, 0);
SELECT master_copy_shard_placement(:testshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
@ -387,7 +388,7 @@ ORDER BY nodeport;
1270000 | localhost | 57638 | 3
(2 rows)
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port AND shardid = :testshardid;
DELETE FROM pg_dist_placement WHERE groupid = :worker_2_group AND shardid = :testshardid;
-- master_get_new_placementid
SELECT master_get_new_placementid();
ERROR: operation is not allowed on this node

View File

@ -16,6 +16,7 @@
# Tests around schema changes, these are run first, so there's no preexisting objects.
# ---
test: multi_extension
test: multi_703_upgrade
test: multi_cluster_management
test: multi_test_helpers
test: multi_table_ddl

View File

@ -15,4 +15,5 @@
/worker_copy.sql
/multi_complex_count_distinct.sql
/multi_mx_copy_data.sql
/multi_behavioral_analytics_create_table.sql
/multi_insert_select_behavioral_analytics_create_table.sql

View File

@ -0,0 +1,25 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 103000;
-- tests that the upgrade from 7.0-2 to 7.0-3 properly migrates shard placements
DROP EXTENSION citus;
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '7.0-2';
INSERT INTO pg_dist_shard_placement
(placementid, shardid, shardstate, shardlength, nodename, nodeport) VALUES
(1, 1, 1, 0, 'localhost', :worker_1_port);
-- if there are no worker nodes which match the shards this should fail
ALTER EXTENSION citus UPDATE TO '7.0-3';
-- if you add a matching worker the upgrade should succeed
SELECT master_add_node('localhost', :worker_1_port);
ALTER EXTENSION citus UPDATE TO '7.0-3';
SELECT * FROM pg_dist_placement;
-- reset and prepare for the rest of the tests
DROP EXTENSION citus;
CREATE EXTENSION citus;

View File

@ -8,8 +8,8 @@ CREATE TABLE test_reference_table (y int primary key, name text);
SELECT create_reference_table('test_reference_table');
-- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
SELECT master_add_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
-- get the active nodes
SELECT master_get_active_worker_nodes();
@ -27,7 +27,7 @@ SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_get_active_worker_nodes();
-- try to disable a node with no placements see that node is removed
SELECT master_add_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT master_disable_node('localhost', :worker_2_port);
SELECT master_get_active_worker_nodes();
@ -53,28 +53,30 @@ SELECT master_disable_node('localhost', :worker_2_port);
SELECT master_get_active_worker_nodes();
-- restore the node for next tests
SELECT master_add_node('localhost', :worker_2_port);
SELECT master_activate_node('localhost', :worker_2_port);
-- try to remove a node with active placements and see that node removal is failed
SELECT master_remove_node('localhost', :worker_2_port);
-- mark all placements in the candidate node as inactive
UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port;
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
UPDATE pg_dist_placement SET shardstate=3 WHERE groupid=:worker_2_group;
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
-- try to remove a node with only inactive placements and see that node is removed
-- try to remove a node with only inactive placements and see that removal still fails
SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_get_active_worker_nodes();
-- clean-up
SELECT master_add_node('localhost', :worker_2_port);
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
UPDATE pg_dist_placement SET shardstate=1 WHERE groupid=:worker_2_group;
DROP TABLE cluster_management_test;
-- check that adding/removing nodes are propagated to nodes with hasmetadata=true
SELECT master_remove_node('localhost', :worker_2_port);
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
SELECT master_add_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
\c - - - :master_port
@ -85,7 +87,7 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep
-- check that added nodes are not propagated to nodes with hasmetadata=false
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
SELECT master_add_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
\c - - - :worker_1_port
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
\c - - - :master_port
@ -105,7 +107,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
-- check that mixed add/remove node commands work fine inside transaction
BEGIN;
SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_add_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT master_remove_node('localhost', :worker_2_port);
COMMIT;
@ -113,9 +115,9 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_add_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
COMMIT;
SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodeport=:worker_2_port;
@ -125,14 +127,14 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep
\c - - - :master_port
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
SELECT master_add_node('localhost', :worker_1_port);
SELECT master_add_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
-- check that a distributed table can be created after adding a node in a transaction
SELECT master_remove_node('localhost', :worker_2_port);
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
CREATE TABLE temp(col1 text, col2 int);
SELECT create_distributed_table('temp', 'col1');
INSERT INTO temp VALUES ('row1', 1);
@ -155,7 +157,7 @@ DROP TABLE temp;
\c - - - :worker_1_port
DELETE FROM pg_dist_partition;
DELETE FROM pg_dist_shard;
DELETE FROM pg_dist_shard_placement;
DELETE FROM pg_dist_placement;
DELETE FROM pg_dist_node;
\c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);

View File

@ -46,21 +46,6 @@ CREATE FUNCTION create_monolithic_shard_row(regclass)
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION create_healthy_local_shard_placement_row(bigint)
RETURNS void
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION delete_shard_placement_row(bigint, text, bigint)
RETURNS bool
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION update_shard_placement_row_state(bigint, text, bigint, int)
RETURNS bool
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION acquire_shared_shard_lock(bigint)
RETURNS void
AS 'citus'
@ -81,7 +66,8 @@ SELECT master_create_distributed_table('events_hash', 'name', 'hash');
SELECT master_create_worker_shards('events_hash', 4, 2);
-- set shardstate of one replication from each shard to 0 (invalid value)
UPDATE pg_dist_shard_placement SET shardstate = 0 WHERE nodeport = 57638 AND shardid BETWEEN 540000 AND 540003;
UPDATE pg_dist_placement SET shardstate = 0 WHERE shardid BETWEEN 540000 AND 540003
AND groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port);
-- should see above shard identifiers
SELECT load_shard_id_array('events_hash');
@ -137,7 +123,7 @@ SELECT column_name_to_column_id('events_hash', 'ctid');
SELECT column_name_to_column_id('events_hash', 'non_existent');
-- drop shard rows (must drop placements first)
DELETE FROM pg_dist_shard_placement
DELETE FROM pg_dist_placement
WHERE shardid BETWEEN 540000 AND 540004;
DELETE FROM pg_dist_shard
WHERE logicalrelid = 'events_hash'::regclass;
@ -168,25 +154,6 @@ SELECT create_monolithic_shard_row('customers') AS new_shard_id
SELECT shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard
WHERE shardid = :new_shard_id;
-- add a placement and manually inspect row
SELECT create_healthy_local_shard_placement_row(:new_shard_id);
SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement
WHERE shardid = :new_shard_id AND nodename = 'localhost' and nodeport = 5432;
-- mark it as unhealthy and inspect
SELECT update_shard_placement_row_state(:new_shard_id, 'localhost', 5432, 3);
SELECT shardstate FROM pg_dist_shard_placement
WHERE shardid = :new_shard_id AND nodename = 'localhost' and nodeport = 5432;
-- remove it and verify it is gone
SELECT delete_shard_placement_row(:new_shard_id, 'localhost', 5432);
SELECT COUNT(*) FROM pg_dist_shard_placement
WHERE shardid = :new_shard_id AND nodename = 'localhost' and nodeport = 5432;
-- deleting or updating a non-existent row should fail
SELECT delete_shard_placement_row(:new_shard_id, 'wrong_localhost', 5432);
SELECT update_shard_placement_row_state(:new_shard_id, 'localhost', 5432, 3);
-- now we'll even test our lock methods...
-- use transaction to bound how long we hold the lock

View File

@ -112,6 +112,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-3';
ALTER EXTENSION citus UPDATE TO '6.2-4';
ALTER EXTENSION citus UPDATE TO '7.0-1';
ALTER EXTENSION citus UPDATE TO '7.0-2';
ALTER EXTENSION citus UPDATE TO '7.0-3';
-- show running version
SHOW citus.version;

View File

@ -8,9 +8,9 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1310000;
SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_placement_id
SELECT nextval('pg_catalog.pg_dist_placement_placementid_seq') AS last_placement_id
\gset
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100000;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
@ -479,10 +479,11 @@ DROP TABLE mx_table_with_sequence;
-- Remove a node so that shards and sequences won't be created on table creation. Therefore,
-- we can test that start_metadata_sync_to_node can actually create the sequence with proper
-- owner
CREATE TABLE pg_dist_shard_placement_temp AS SELECT * FROM pg_dist_shard_placement;
CREATE TABLE pg_dist_placement_temp AS SELECT * FROM pg_dist_placement;
CREATE TABLE pg_dist_partition_temp AS SELECT * FROM pg_dist_partition;
DELETE FROM pg_dist_shard_placement;
DELETE FROM pg_dist_placement;
DELETE FROM pg_dist_partition;
SELECT groupid AS old_worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
SELECT master_remove_node('localhost', :worker_2_port);
-- the master user needs superuser permissions to change the replication model
@ -518,11 +519,25 @@ SELECT * FROM mx_table ORDER BY a;
\c - mx_user - :master_port
DROP TABLE mx_table;
-- put the metadata back into a consistent state
\c - postgres - :master_port
INSERT INTO pg_dist_shard_placement SELECT * FROM pg_dist_shard_placement_temp;
INSERT INTO pg_dist_placement SELECT * FROM pg_dist_placement_temp;
INSERT INTO pg_dist_partition SELECT * FROM pg_dist_partition_temp;
DROP TABLE pg_dist_shard_placement_temp;
DROP TABLE pg_dist_placement_temp;
DROP TABLE pg_dist_partition_temp;
UPDATE pg_dist_placement
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
WHERE groupid = :old_worker_2_group;
\c - - - :worker_1_port
UPDATE pg_dist_placement
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
WHERE groupid = :old_worker_2_group;
\c - - - :worker_2_port
UPDATE pg_dist_placement
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
WHERE groupid = :old_worker_2_group;
\c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
DROP USER mx_user;
@ -575,8 +590,12 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid;
-- Check that master_add_node propagates the metadata about new placements of a reference table
\c - - - :master_port
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
SELECT groupid AS old_worker_2_group
FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
CREATE TABLE tmp_placement AS
SELECT * FROM pg_dist_placement WHERE groupid = :old_worker_2_group;
DELETE FROM pg_dist_placement
WHERE groupid = :old_worker_2_group;
SELECT master_remove_node('localhost', :worker_2_port);
CREATE TABLE mx_ref (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref');
@ -604,9 +623,19 @@ FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass
ORDER BY shardid, nodeport;
-- Get the metadata back into a consistent state
\c - - - :master_port
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;
INSERT INTO pg_dist_placement (SELECT * FROM tmp_placement);
DROP TABLE tmp_placement;
UPDATE pg_dist_placement
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
WHERE groupid = :old_worker_2_group;
\c - - - :worker_1_port
UPDATE pg_dist_placement
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
WHERE groupid = :old_worker_2_group;
-- Cleanup
\c - - - :master_port
@ -625,4 +654,4 @@ RESET citus.multi_shard_commit_protocol;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART :last_placement_id;

View File

@ -1,6 +1,6 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1200000;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1200000;
-- ===================================================================
-- test end-to-end modification functionality

View File

@ -108,16 +108,18 @@ SELECT create_distributed_table('mx_sequence', 'key');
\c - - - :worker_1_port
SELECT groupid FROM pg_dist_local_group;
SELECT last_value FROM mx_sequence_value_seq;
SELECT last_value AS worker_1_lastval FROM mx_sequence_value_seq \gset
\c - - - :worker_2_port
SELECT groupid FROM pg_dist_local_group;
SELECT last_value FROM mx_sequence_value_seq;
SELECT last_value AS worker_2_lastval FROM mx_sequence_value_seq \gset
\c - - - :master_port
-- don't look at the actual values because they rely on the groupids of the nodes
-- which can change depending on the tests which have run before this one
SELECT :worker_1_lastval = :worker_2_lastval;
-- the type of sequences can't be changed
ALTER TABLE mx_sequence ALTER value TYPE BIGINT;
ALTER TABLE mx_sequence ALTER value TYPE INT;

View File

@ -154,8 +154,9 @@ PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
\c - - - :master_port
-- Add "fake" pg_dist_transaction records and run recovery
INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_be_forgotten');
SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport = :worker_1_port \gset
INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (:worker_1_group, 'citus_0_should_be_forgotten');
SELECT recover_prepared_transactions();
SELECT count(*) FROM pg_dist_transaction;

View File

@ -283,6 +283,20 @@ END;
-- verify interaction of default values, SERIAL, and RETURNING
\set QUIET on
-- make sure this test always returns the same output no matter which tests have run
SELECT minimum_value::bigint AS min_value,
maximum_value::bigint AS max_value
FROM information_schema.sequences
WHERE sequence_name = 'app_analytics_events_mx_id_seq' \gset
SELECT last_value FROM app_analytics_events_mx_id_seq \gset
ALTER SEQUENCE app_analytics_events_mx_id_seq NO MINVALUE NO MAXVALUE;
SELECT setval('app_analytics_events_mx_id_seq'::regclass, 3940649673949184);
INSERT INTO app_analytics_events_mx VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
INSERT INTO app_analytics_events_mx (app_id, name) VALUES (102, 'Wayz') RETURNING id;
INSERT INTO app_analytics_events_mx (app_id, name) VALUES (103, 'Mynt') RETURNING *;
-- clean up
SELECT setval('app_analytics_events_mx_id_seq'::regclass, :last_value);
ALTER SEQUENCE app_analytics_events_mx_id_seq
MINVALUE :min_value MAXVALUE :max_value;

View File

@ -1,6 +1,6 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 820000;
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
-- ===================================================================
-- test shard repair functionality
@ -37,7 +37,8 @@ SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_e
\gset
-- now, update the second placement as unhealthy
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :newshardid AND nodeport = :worker_2_port;
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid
AND groupid = :worker_2_group;
-- cannot repair a shard after a modification (transaction still open during repair)
BEGIN;
@ -62,10 +63,14 @@ INSERT INTO customer_engagements VALUES (4, '04-01-2015', 'fourth event');
ROLLBACK;
-- add a fake healthy placement for the tests
INSERT INTO pg_dist_shard_placement (nodename, nodeport, shardid, shardstate, shardlength)
VALUES ('dummyhost', :worker_2_port, :newshardid, 1, 0);
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'dummyhost', :worker_2_port);
INSERT INTO pg_dist_placement (groupid, shardid, shardstate, shardlength)
VALUES (:worker_2_group, :newshardid, 1, 0);
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
DELETE FROM pg_dist_placement
WHERE groupid = :worker_2_group AND shardid = :newshardid AND shardstate = 1;
-- also try to copy from an inactive placement
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
@ -74,10 +79,7 @@ SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_2_port, 'lo
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
-- now, update first placement as unhealthy (and raise a notice) so that queries are not routed to there
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :newshardid AND nodeport = :worker_1_port;
-- we are done with dummyhost, it is safe to remove it
DELETE FROM pg_dist_shard_placement WHERE nodename = 'dummyhost';
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid AND groupid = :worker_1_group;
-- get the data from the second placement
SELECT * FROM customer_engagements;
@ -100,7 +102,7 @@ SELECT shardid as remotenewshardid FROM pg_dist_shard WHERE logicalrelid = 'remo
\gset
-- now, update the second placement as unhealthy
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AND nodeport = :worker_2_port;
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AND groupid = :worker_2_group;
-- oops! we don't support repairing shards backed by foreign tables
SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);

View File

@ -176,8 +176,8 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
\c - - - :worker_2_port
DELETE FROM pg_dist_node;
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition;
DELETE FROM pg_dist_node;
\c - - - :worker_1_port
-- DROP TABLE
@ -195,8 +195,9 @@ WHERE logicalrelid = 'mx_table'::regclass AND nodeport=:worker_1_port
ORDER BY shardid
LIMIT 1 \gset
INSERT INTO pg_dist_shard_placement (nodename, nodeport, shardid, shardstate, shardlength)
VALUES ('localhost', :worker_2_port, :testshardid, 3, 0);
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
INSERT INTO pg_dist_placement (groupid, shardid, shardstate, shardlength)
VALUES (:worker_2_group, :testshardid, 3, 0);
SELECT master_copy_shard_placement(:testshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
@ -205,7 +206,7 @@ FROM pg_dist_shard_placement
WHERE shardid = :testshardid
ORDER BY nodeport;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port AND shardid = :testshardid;
DELETE FROM pg_dist_placement WHERE groupid = :worker_2_group AND shardid = :testshardid;
-- master_get_new_placementid
SELECT master_get_new_placementid();