Make sure that shard repair considers replication factor

pull/2389/head
Onder Kalaci 2018-09-18 18:29:10 +03:00
parent 8520a5b432
commit abc443d7fa
10 changed files with 551 additions and 33 deletions

View File

@ -152,6 +152,7 @@ static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt); static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement); static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
static void ProcessTruncateStatement(TruncateStmt *truncateStatement); static void ProcessTruncateStatement(TruncateStmt *truncateStatement);
static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement);
static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement); static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement);
static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode); static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode);
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId); static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
@ -2931,7 +2932,51 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)
/* /*
* ProcessTruncateStatement handles distributed locking * ProcessTruncateStatement handles few things that should be
* done before standard process utility is called for truncate
* command.
*/
static void
ProcessTruncateStatement(TruncateStmt *truncateStatement)
{
EnsurePartitionTableNotReplicatedForTruncate(truncateStatement);
LockTruncatedRelationMetadataInWorkers(truncateStatement);
}
/*
* EnsurePartitionTableNotReplicatedForTruncate a simple wrapper around
* EnsurePartitionTableNotReplicated for TRUNCATE command.
*/
static void
EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement)
{
ListCell *relationCell = NULL;
foreach(relationCell, truncateStatement->relations)
{
RangeVar *relationRV = (RangeVar *) lfirst(relationCell);
Relation relation = heap_openrv(relationRV, NoLock);
Oid relationId = RelationGetRelid(relation);
if (!IsDistributedTable(relationId))
{
heap_close(relation, NoLock);
continue;
}
EnsurePartitionTableNotReplicated(relationId);
heap_close(relation, NoLock);
}
}
/*
* LockTruncatedRelationMetadataInWorkers determines if distributed
* lock is necessary for truncated relations, and acquire locks.
*
* LockTruncatedRelationMetadataInWorkers handles distributed locking
* of truncated tables before standard utility takes over. * of truncated tables before standard utility takes over.
* *
* Actual distributed truncation occurs inside truncate trigger. * Actual distributed truncation occurs inside truncate trigger.
@ -2941,17 +2986,6 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)
* non-distributed and distributed relations. * non-distributed and distributed relations.
*/ */
static void static void
ProcessTruncateStatement(TruncateStmt *truncateStatement)
{
LockTruncatedRelationMetadataInWorkers(truncateStatement);
}
/*
* LockTruncatedRelationMetadataInWorkers determines if distributed
* lock is necessary for truncated relations, and acquire locks.
*/
static void
LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement) LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
{ {
List *distributedRelationList = NIL; List *distributedRelationList = NIL;
@ -3316,7 +3350,11 @@ AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement,
* in its default value of '1pc', then a notice message indicating that '2pc' might be * in its default value of '1pc', then a notice message indicating that '2pc' might be
* used for extra safety. In the commit protocol, a BEGIN is sent after connection to * used for extra safety. In the commit protocol, a BEGIN is sent after connection to
* each shard placement and COMMIT/ROLLBACK is handled by * each shard placement and COMMIT/ROLLBACK is handled by
* CompleteShardPlacementTransactions function. * CoordinatedTransactionCallback function.
*
* The function errors out if the node is not the coordinator or if the DDL is on
* a partitioned table which has replication factor > 1.
*
*/ */
static void static void
ExecuteDistributedDDLJob(DDLJob *ddlJob) ExecuteDistributedDDLJob(DDLJob *ddlJob)

View File

@ -143,7 +143,6 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
rangeVar->schemaname = schemaName; rangeVar->schemaname = schemaName;
} }
EnsurePartitionTableNotReplicated(relationId);
EnsureTablePermissions(relationId, ACL_TRUNCATE); EnsureTablePermissions(relationId, ACL_TRUNCATE);
if (ShouldExecuteTruncateStmtSequential(truncateStatement)) if (ShouldExecuteTruncateStmtSequential(truncateStatement))

View File

@ -25,6 +25,7 @@
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -50,6 +51,9 @@ static char LookupShardTransferMode(Oid shardReplicationModeOid);
static void RepairShardPlacement(int64 shardId, char *sourceNodeName, static void RepairShardPlacement(int64 shardId, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName, int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort); int32 targetNodePort);
static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval,
char *sourceNodeName,
int32 sourceNodePort);
static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName, int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort); int32 targetNodePort);
@ -219,6 +223,8 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
char relationKind = get_rel_relkind(distributedTableId); char relationKind = get_rel_relkind(distributedTableId);
char *tableOwner = TableOwner(shardInterval->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
bool missingOk = false; bool missingOk = false;
bool includeData = false;
bool partitionedTable = false;
List *ddlCommandList = NIL; List *ddlCommandList = NIL;
List *foreignConstraintCommandList = NIL; List *foreignConstraintCommandList = NIL;
@ -237,6 +243,18 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
"not supported.", relationName))); "not supported.", relationName)));
} }
/*
* Let's not allow repairing partitions to prevent any edge cases.
* We're already not allowing any kind of modifications on the partitions
* so their placements are not likely to to be marked as INVALID. The only
* possible case to mark placement of a partition as invalid is
* "ALTER TABLE parent_table DETACH PARTITION partition_table". But,
* given that the table would become a regular distributed table if the
* command succeeds, we're OK since the regular distributed tables can
* be repaired later on.
*/
EnsurePartitionTableNotReplicated(distributedTableId);
/* /*
* We take a lock on the referenced table if there is a foreign constraint * We take a lock on the referenced table if there is a foreign constraint
* during the copy procedure. If we do not block DMLs on the referenced * during the copy procedure. If we do not block DMLs on the referenced
@ -260,10 +278,46 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
EnsureShardCanBeRepaired(shardId, sourceNodeName, sourceNodePort, targetNodeName, EnsureShardCanBeRepaired(shardId, sourceNodeName, sourceNodePort, targetNodeName,
targetNodePort); targetNodePort);
/*
* If the shard belongs to a partitioned table, we need to load the data after
* creating the partitions and the partitioning hierarcy.
*/
partitionedTable = PartitionedTableNoLock(distributedTableId);
includeData = !partitionedTable;
/* we generate necessary commands to recreate the shard in target node */ /* we generate necessary commands to recreate the shard in target node */
ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort); ddlCommandList =
CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort, includeData);
foreignConstraintCommandList = CopyShardForeignConstraintCommandList(shardInterval); foreignConstraintCommandList = CopyShardForeignConstraintCommandList(shardInterval);
ddlCommandList = list_concat(ddlCommandList, foreignConstraintCommandList); ddlCommandList = list_concat(ddlCommandList, foreignConstraintCommandList);
/*
* CopyShardCommandList() drops the table which cascades to partitions if the
* table is a partitioned table. This means that we need to create both parent
* table and its partitions.
*
* We also skipped copying the data, so include it here.
*/
if (partitionedTable)
{
List *partitionCommandList = NIL;
char *shardName = ConstructQualifiedShardName(shardInterval);
StringInfo copyShardDataCommand = makeStringInfo();
partitionCommandList =
CopyPartitionShardsCommandList(shardInterval, sourceNodeName, sourceNodePort);
ddlCommandList = list_concat(ddlCommandList, partitionCommandList);
/* finally copy the data as well */
appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,
quote_literal_cstr(shardName), /* table to append */
quote_literal_cstr(shardName), /* remote table name */
quote_literal_cstr(sourceNodeName), /* remote host */
sourceNodePort); /* remote port */
ddlCommandList = lappend(ddlCommandList, copyShardDataCommand->data);
}
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner,
ddlCommandList); ddlCommandList);
@ -275,6 +329,49 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
} }
/*
* CopyPartitionShardsCommandList gets a shardInterval which is a shard that
* belongs to partitioned table (this is asserted).
*
* The function returns a list of commands which re-creates all the partitions
* of the input shardInterval.
*/
static List *
CopyPartitionShardsCommandList(ShardInterval *shardInterval, char *sourceNodeName,
int32 sourceNodePort)
{
Oid distributedTableId = shardInterval->relationId;
List *partitionList = NIL;
ListCell *partitionOidCell = NULL;
List *ddlCommandList = NIL;
Assert(PartitionedTableNoLock(distributedTableId));
partitionList = PartitionList(distributedTableId);
foreach(partitionOidCell, partitionList)
{
Oid partitionOid = lfirst_oid(partitionOidCell);
uint64 partitionShardId =
ColocatedShardIdInRelation(partitionOid, shardInterval->shardIndex);
ShardInterval *partitionShardInterval = LoadShardInterval(partitionShardId);
bool includeData = false;
List *copyCommandList = NIL;
char *attachPartitionCommand = NULL;
copyCommandList =
CopyShardCommandList(partitionShardInterval, sourceNodeName, sourceNodePort,
includeData);
ddlCommandList = list_concat(ddlCommandList, copyCommandList);
attachPartitionCommand =
GenerateAttachShardPartitionCommand(partitionShardInterval);
ddlCommandList = lappend(ddlCommandList, attachPartitionCommand);
}
return ddlCommandList;
}
/* /*
* EnsureShardCanBeRepaired checks if the given shard has a healthy placement in the source * EnsureShardCanBeRepaired checks if the given shard has a healthy placement in the source
* node and inactive node on the target node. * node and inactive node on the target node.
@ -350,11 +447,12 @@ SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 node
/* /*
* CopyShardCommandList generates command list to copy the given shard placement * CopyShardCommandList generates command list to copy the given shard placement
* from the source node to the target node. * from the source node to the target node. Caller could optionally skip copying
* the data by the flag includeDataCopy.
*/ */
List * List *
CopyShardCommandList(ShardInterval *shardInterval, CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName,
char *sourceNodeName, int32 sourceNodePort) int32 sourceNodePort, bool includeDataCopy)
{ {
int64 shardId = shardInterval->shardId; int64 shardId = shardInterval->shardId;
char *shardName = ConstructQualifiedShardName(shardInterval); char *shardName = ConstructQualifiedShardName(shardInterval);
@ -371,14 +469,21 @@ CopyShardCommandList(ShardInterval *shardInterval,
copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList,
tableRecreationCommandList); tableRecreationCommandList);
appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, /*
quote_literal_cstr(shardName), /* table to append */ * The caller doesn't want to include the COPY command, perhaps using
quote_literal_cstr(shardName), /* remote table name */ * logical replication to copy the data.
quote_literal_cstr(sourceNodeName), /* remote host */ */
sourceNodePort); /* remote port */ if (includeDataCopy)
{
appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,
quote_literal_cstr(shardName), /* table to append */
quote_literal_cstr(shardName), /* remote table name */
quote_literal_cstr(sourceNodeName), /* remote host */
sourceNodePort); /* remote port */
copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList,
copyShardDataCommand->data); copyShardDataCommand->data);
}
indexCommandList = GetTableIndexAndConstraintCommands(relationId); indexCommandList = GetTableIndexAndConstraintCommands(relationId);
indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList, shardId); indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList, shardId);

View File

@ -284,7 +284,9 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk);
char *srcNodeName = sourceShardPlacement->nodeName; char *srcNodeName = sourceShardPlacement->nodeName;
uint32 srcNodePort = sourceShardPlacement->nodePort; uint32 srcNodePort = sourceShardPlacement->nodePort;
List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort); bool includeData = true;
List *ddlCommandList =
CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData);
List *shardPlacementList = ShardPlacementList(shardId); List *shardPlacementList = ShardPlacementList(shardId);
bool missingWorkerOk = true; bool missingWorkerOk = true;

View File

@ -16,6 +16,7 @@
#include "catalog/pg_collation.h" #include "catalog/pg_collation.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/distributed_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/shard_pruning.h" #include "distributed/shard_pruning.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
@ -418,6 +419,7 @@ SingleReplicatedTable(Oid relationId)
List *shardPlacementList = NIL; List *shardPlacementList = NIL;
Oid shardId = INVALID_SHARD_ID; Oid shardId = INVALID_SHARD_ID;
/* we could have append/range distributed tables without shards */
if (list_length(shardList) <= 1) if (list_length(shardList) <= 1)
{ {
return false; return false;
@ -425,10 +427,32 @@ SingleReplicatedTable(Oid relationId)
/* checking only for the first shard id should suffice */ /* checking only for the first shard id should suffice */
shardId = (*(uint64 *) linitial(shardList)); shardId = (*(uint64 *) linitial(shardList));
shardPlacementList = ShardPlacementList(shardId);
if (list_length(shardPlacementList) != 1) /* for hash distributed tables, it is sufficient to only check one shard */
if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH)
{ {
return false; shardPlacementList = ShardPlacementList(shardId);
if (list_length(shardPlacementList) != 1)
{
return false;
}
}
else
{
List *shardIntervalList = LoadShardList(relationId);
ListCell *shardIntervalCell = NULL;
foreach(shardIntervalCell, shardIntervalList)
{
uint64 *shardIdPointer = (uint64 *) lfirst(shardIntervalCell);
uint64 shardId = (*shardIdPointer);
List *shardPlacementList = ShardPlacementList(shardId);
if (list_length(shardPlacementList) != 1)
{
return false;
}
}
} }
return true; return true;

View File

@ -163,7 +163,7 @@ extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS);
/* function declarations for shard copy functinality */ /* function declarations for shard copy functinality */
extern List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, extern List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName,
int32 sourceNodePort); int32 sourceNodePort, bool includeData);
extern List * CopyShardForeignConstraintCommandList(ShardInterval *shardInterval); extern List * CopyShardForeignConstraintCommandList(ShardInterval *shardInterval);
extern void CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval, extern void CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval,
List ** List **

View File

@ -202,9 +202,95 @@ SELECT create_distributed_table('collections_agg', 'key');
INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key;
-- coordinator roll-up -- coordinator roll-up
INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id;
-- now make sure that repair functionality works fine
-- create a table and create its distribution metadata
CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id );
CREATE TABLE customer_engagements_1
PARTITION OF customer_engagements
FOR VALUES IN ( 1 );
CREATE TABLE customer_engagements_2
PARTITION OF customer_engagements
FOR VALUES IN ( 2 );
-- add some indexes
CREATE INDEX ON customer_engagements (id);
CREATE INDEX ON customer_engagements (event_id);
CREATE INDEX ON customer_engagements (id, event_id);
-- distribute the table
-- create a single shard on the first worker
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('customer_engagements', 'id', 'hash');
create_distributed_table
--------------------------
(1 row)
-- ingest some data for the tests
INSERT INTO customer_engagements VALUES (1, 1);
INSERT INTO customer_engagements VALUES (2, 1);
INSERT INTO customer_engagements VALUES (1, 2);
INSERT INTO customer_engagements VALUES (2, 2);
-- the following queries does the following:
-- (i) create a new shard
-- (ii) mark the second shard placements as unhealthy
-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones
-- (iv) do a successful master_copy_shard_placement from the first placement to the second
-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
-- get the newshardid
SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass
\gset
-- now, update the second placement as unhealthy
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid
AND groupid = :worker_2_group;
-- cannot repair a shard after a modification (transaction still open during repair)
BEGIN;
INSERT INTO customer_engagements VALUES (1, 1);
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
-- modifications after reparing a shard are fine (will use new metadata)
BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_copy_shard_placement
-----------------------------
(1 row)
ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0;
SELECT * FROM customer_engagements ORDER BY 1,2,3;
id | event_id | value
----+----------+-------
1 | 1 | 1
1 | 2 | 1
2 | 1 | 1
2 | 2 | 1
(4 rows)
ROLLBACK;
BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_copy_shard_placement
-----------------------------
(1 row)
INSERT INTO customer_engagements VALUES (1, 1);
SELECT count(*) FROM customer_engagements;
count
-------
5
(1 row)
ROLLBACK;
-- TRUNCATE is allowed on the parent table
-- try it just before dropping the table
TRUNCATE collections;
SET search_path TO public; SET search_path TO public;
DROP SCHEMA partitioned_table_replicated CASCADE; DROP SCHEMA partitioned_table_replicated CASCADE;
NOTICE: drop cascades to 3 other objects NOTICE: drop cascades to 4 other objects
DETAIL: drop cascades to table partitioned_table_replicated.collections DETAIL: drop cascades to table partitioned_table_replicated.collections
drop cascades to table partitioned_table_replicated.fkey_test drop cascades to table partitioned_table_replicated.fkey_test
drop cascades to table partitioned_table_replicated.collections_agg drop cascades to table partitioned_table_replicated.collections_agg
drop cascades to table partitioned_table_replicated.customer_engagements

View File

@ -202,9 +202,98 @@ SELECT create_distributed_table('collections_agg', 'key');
INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key;
-- coordinator roll-up -- coordinator roll-up
INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id;
-- now make sure that repair functionality works fine
-- create a table and create its distribution metadata
CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id );
CREATE TABLE customer_engagements_1
PARTITION OF customer_engagements
FOR VALUES IN ( 1 );
CREATE TABLE customer_engagements_2
PARTITION OF customer_engagements
FOR VALUES IN ( 2 );
-- add some indexes
CREATE INDEX ON customer_engagements (id);
ERROR: cannot create index on partitioned table "customer_engagements"
CREATE INDEX ON customer_engagements (event_id);
ERROR: cannot create index on partitioned table "customer_engagements"
CREATE INDEX ON customer_engagements (id, event_id);
ERROR: cannot create index on partitioned table "customer_engagements"
-- distribute the table
-- create a single shard on the first worker
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('customer_engagements', 'id', 'hash');
create_distributed_table
--------------------------
(1 row)
-- ingest some data for the tests
INSERT INTO customer_engagements VALUES (1, 1);
INSERT INTO customer_engagements VALUES (2, 1);
INSERT INTO customer_engagements VALUES (1, 2);
INSERT INTO customer_engagements VALUES (2, 2);
-- the following queries does the following:
-- (i) create a new shard
-- (ii) mark the second shard placements as unhealthy
-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones
-- (iv) do a successful master_copy_shard_placement from the first placement to the second
-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
-- get the newshardid
SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass
\gset
-- now, update the second placement as unhealthy
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid
AND groupid = :worker_2_group;
-- cannot repair a shard after a modification (transaction still open during repair)
BEGIN;
INSERT INTO customer_engagements VALUES (1, 1);
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: cannot open new connections after the first modification command within a transaction
ROLLBACK;
-- modifications after reparing a shard are fine (will use new metadata)
BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_copy_shard_placement
-----------------------------
(1 row)
ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0;
SELECT * FROM customer_engagements ORDER BY 1,2,3;
id | event_id | value
----+----------+-------
1 | 1 | 1
1 | 2 | 1
2 | 1 | 1
2 | 2 | 1
(4 rows)
ROLLBACK;
BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_copy_shard_placement
-----------------------------
(1 row)
INSERT INTO customer_engagements VALUES (1, 1);
SELECT count(*) FROM customer_engagements;
count
-------
5
(1 row)
ROLLBACK;
-- TRUNCATE is allowed on the parent table
-- try it just before dropping the table
TRUNCATE collections;
SET search_path TO public; SET search_path TO public;
DROP SCHEMA partitioned_table_replicated CASCADE; DROP SCHEMA partitioned_table_replicated CASCADE;
NOTICE: drop cascades to 3 other objects NOTICE: drop cascades to 4 other objects
DETAIL: drop cascades to table partitioned_table_replicated.collections DETAIL: drop cascades to table partitioned_table_replicated.collections
drop cascades to table partitioned_table_replicated.fkey_test drop cascades to table partitioned_table_replicated.fkey_test
drop cascades to table partitioned_table_replicated.collections_agg drop cascades to table partitioned_table_replicated.collections_agg
drop cascades to table partitioned_table_replicated.customer_engagements

View File

@ -245,6 +245,112 @@ INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GR
ERROR: relation "collections_1" does not exist ERROR: relation "collections_1" does not exist
LINE 1: ...llections_agg SELECT collection_id, sum(key) FROM collection... LINE 1: ...llections_agg SELECT collection_id, sum(key) FROM collection...
^ ^
-- now make sure that repair functionality works fine
-- create a table and create its distribution metadata
CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id );
ERROR: syntax error at or near "PARTITION"
LINE 1: ...E customer_engagements (id integer, event_id int) PARTITION ...
^
CREATE TABLE customer_engagements_1
PARTITION OF customer_engagements
FOR VALUES IN ( 1 );
ERROR: syntax error at or near "PARTITION"
LINE 2: PARTITION OF customer_engagements
^
CREATE TABLE customer_engagements_2
PARTITION OF customer_engagements
FOR VALUES IN ( 2 );
ERROR: syntax error at or near "PARTITION"
LINE 2: PARTITION OF customer_engagements
^
-- add some indexes
CREATE INDEX ON customer_engagements (id);
ERROR: relation "customer_engagements" does not exist
CREATE INDEX ON customer_engagements (event_id);
ERROR: relation "customer_engagements" does not exist
CREATE INDEX ON customer_engagements (id, event_id);
ERROR: relation "customer_engagements" does not exist
-- distribute the table
-- create a single shard on the first worker
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('customer_engagements', 'id', 'hash');
ERROR: relation "customer_engagements" does not exist
LINE 1: SELECT create_distributed_table('customer_engagements', 'id'...
^
-- ingest some data for the tests
INSERT INTO customer_engagements VALUES (1, 1);
ERROR: relation "customer_engagements" does not exist
LINE 1: INSERT INTO customer_engagements VALUES (1, 1);
^
INSERT INTO customer_engagements VALUES (2, 1);
ERROR: relation "customer_engagements" does not exist
LINE 1: INSERT INTO customer_engagements VALUES (2, 1);
^
INSERT INTO customer_engagements VALUES (1, 2);
ERROR: relation "customer_engagements" does not exist
LINE 1: INSERT INTO customer_engagements VALUES (1, 2);
^
INSERT INTO customer_engagements VALUES (2, 2);
ERROR: relation "customer_engagements" does not exist
LINE 1: INSERT INTO customer_engagements VALUES (2, 2);
^
-- the following queries does the following:
-- (i) create a new shard
-- (ii) mark the second shard placements as unhealthy
-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones
-- (iv) do a successful master_copy_shard_placement from the first placement to the second
-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
-- get the newshardid
SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass
\gset
ERROR: relation "customer_engagements" does not exist
LINE 1: ...ewshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_...
^
-- now, update the second placement as unhealthy
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid
AND groupid = :worker_2_group;
ERROR: syntax error at or near ":"
LINE 1: ...dist_placement SET shardstate = 3 WHERE shardid = :newshardi...
^
-- cannot repair a shard after a modification (transaction still open during repair)
BEGIN;
INSERT INTO customer_engagements VALUES (1, 1);
ERROR: relation "customer_engagements" does not exist
LINE 1: INSERT INTO customer_engagements VALUES (1, 1);
^
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: syntax error at or near ":"
LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',...
^
ROLLBACK;
-- modifications after reparing a shard are fine (will use new metadata)
BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: syntax error at or near ":"
LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',...
^
ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0;
ERROR: current transaction is aborted, commands ignored until end of transaction block
SELECT * FROM customer_engagements ORDER BY 1,2,3;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: syntax error at or near ":"
LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',...
^
INSERT INTO customer_engagements VALUES (1, 1);
ERROR: current transaction is aborted, commands ignored until end of transaction block
SELECT count(*) FROM customer_engagements;
ERROR: current transaction is aborted, commands ignored until end of transaction block
ROLLBACK;
-- TRUNCATE is allowed on the parent table
-- try it just before dropping the table
TRUNCATE collections;
ERROR: relation "collections" does not exist
SET search_path TO public; SET search_path TO public;
DROP SCHEMA partitioned_table_replicated CASCADE; DROP SCHEMA partitioned_table_replicated CASCADE;
NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to 2 other objects

View File

@ -157,6 +157,75 @@ INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key
-- coordinator roll-up -- coordinator roll-up
INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id;
-- now make sure that repair functionality works fine
-- create a table and create its distribution metadata
CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id );
CREATE TABLE customer_engagements_1
PARTITION OF customer_engagements
FOR VALUES IN ( 1 );
CREATE TABLE customer_engagements_2
PARTITION OF customer_engagements
FOR VALUES IN ( 2 );
-- add some indexes
CREATE INDEX ON customer_engagements (id);
CREATE INDEX ON customer_engagements (event_id);
CREATE INDEX ON customer_engagements (id, event_id);
-- distribute the table
-- create a single shard on the first worker
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('customer_engagements', 'id', 'hash');
-- ingest some data for the tests
INSERT INTO customer_engagements VALUES (1, 1);
INSERT INTO customer_engagements VALUES (2, 1);
INSERT INTO customer_engagements VALUES (1, 2);
INSERT INTO customer_engagements VALUES (2, 2);
-- the following queries does the following:
-- (i) create a new shard
-- (ii) mark the second shard placements as unhealthy
-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones
-- (iv) do a successful master_copy_shard_placement from the first placement to the second
-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement
SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
-- get the newshardid
SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass
\gset
-- now, update the second placement as unhealthy
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid
AND groupid = :worker_2_group;
-- cannot repair a shard after a modification (transaction still open during repair)
BEGIN;
INSERT INTO customer_engagements VALUES (1, 1);
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ROLLBACK;
-- modifications after reparing a shard are fine (will use new metadata)
BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0;
SELECT * FROM customer_engagements ORDER BY 1,2,3;
ROLLBACK;
BEGIN;
SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
INSERT INTO customer_engagements VALUES (1, 1);
SELECT count(*) FROM customer_engagements;
ROLLBACK;
-- TRUNCATE is allowed on the parent table
-- try it just before dropping the table
TRUNCATE collections;
SET search_path TO public; SET search_path TO public;
DROP SCHEMA partitioned_table_replicated CASCADE; DROP SCHEMA partitioned_table_replicated CASCADE;