From abc443d7fa5ea8e280fa8b55d73df66c302f3371 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 18 Sep 2018 18:29:10 +0300 Subject: [PATCH] Make sure that shard repair considers replication factor --- .../distributed/executor/multi_utility.c | 64 +++++++-- .../master/master_modify_multiple_shards.c | 1 - .../distributed/master/master_repair_shards.c | 127 ++++++++++++++++-- .../distributed/utils/reference_table_utils.c | 4 +- .../distributed/utils/shardinterval_utils.c | 30 ++++- src/include/distributed/master_protocol.h | 2 +- .../expected/replicated_partitioned_table.out | 88 +++++++++++- .../replicated_partitioned_table_0.out | 91 ++++++++++++- .../replicated_partitioned_table_1.out | 106 +++++++++++++++ .../sql/replicated_partitioned_table.sql | 71 +++++++++- 10 files changed, 551 insertions(+), 33 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index acbfb3192..6d8919041 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -152,6 +152,7 @@ static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt); static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement); static void ProcessTruncateStatement(TruncateStmt *truncateStatement); +static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement); static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement); static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode); static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId); @@ -2931,7 +2932,51 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement) /* - * ProcessTruncateStatement handles distributed locking + * ProcessTruncateStatement handles few things that should be + * done before standard process utility is called for truncate + * command. + */ +static void +ProcessTruncateStatement(TruncateStmt *truncateStatement) +{ + EnsurePartitionTableNotReplicatedForTruncate(truncateStatement); + LockTruncatedRelationMetadataInWorkers(truncateStatement); +} + + +/* + * EnsurePartitionTableNotReplicatedForTruncate a simple wrapper around + * EnsurePartitionTableNotReplicated for TRUNCATE command. + */ +static void +EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement) +{ + ListCell *relationCell = NULL; + + foreach(relationCell, truncateStatement->relations) + { + RangeVar *relationRV = (RangeVar *) lfirst(relationCell); + Relation relation = heap_openrv(relationRV, NoLock); + Oid relationId = RelationGetRelid(relation); + + if (!IsDistributedTable(relationId)) + { + heap_close(relation, NoLock); + continue; + } + + EnsurePartitionTableNotReplicated(relationId); + + heap_close(relation, NoLock); + } +} + + +/* + * LockTruncatedRelationMetadataInWorkers determines if distributed + * lock is necessary for truncated relations, and acquire locks. + * + * LockTruncatedRelationMetadataInWorkers handles distributed locking * of truncated tables before standard utility takes over. * * Actual distributed truncation occurs inside truncate trigger. @@ -2941,17 +2986,6 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement) * non-distributed and distributed relations. */ static void -ProcessTruncateStatement(TruncateStmt *truncateStatement) -{ - LockTruncatedRelationMetadataInWorkers(truncateStatement); -} - - -/* - * LockTruncatedRelationMetadataInWorkers determines if distributed - * lock is necessary for truncated relations, and acquire locks. - */ -static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement) { List *distributedRelationList = NIL; @@ -3316,7 +3350,11 @@ AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement, * in its default value of '1pc', then a notice message indicating that '2pc' might be * used for extra safety. In the commit protocol, a BEGIN is sent after connection to * each shard placement and COMMIT/ROLLBACK is handled by - * CompleteShardPlacementTransactions function. + * CoordinatedTransactionCallback function. + * + * The function errors out if the node is not the coordinator or if the DDL is on + * a partitioned table which has replication factor > 1. + * */ static void ExecuteDistributedDDLJob(DDLJob *ddlJob) diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index f71e3b1d9..1668b5985 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -143,7 +143,6 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) rangeVar->schemaname = schemaName; } - EnsurePartitionTableNotReplicated(relationId); EnsureTablePermissions(relationId, ACL_TRUNCATE); if (ShouldExecuteTruncateStmtSequential(truncateStatement)) diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 666505e61..2fc95d0eb 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -25,6 +25,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_router_executor.h" #include "distributed/resource_lock.h" #include "distributed/worker_manager.h" @@ -50,6 +51,9 @@ static char LookupShardTransferMode(Oid shardReplicationModeOid); static void RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); +static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval, + char *sourceNodeName, + int32 sourceNodePort); static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); @@ -219,6 +223,8 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char relationKind = get_rel_relkind(distributedTableId); char *tableOwner = TableOwner(shardInterval->relationId); bool missingOk = false; + bool includeData = false; + bool partitionedTable = false; List *ddlCommandList = NIL; List *foreignConstraintCommandList = NIL; @@ -237,6 +243,18 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, "not supported.", relationName))); } + /* + * Let's not allow repairing partitions to prevent any edge cases. + * We're already not allowing any kind of modifications on the partitions + * so their placements are not likely to to be marked as INVALID. The only + * possible case to mark placement of a partition as invalid is + * "ALTER TABLE parent_table DETACH PARTITION partition_table". But, + * given that the table would become a regular distributed table if the + * command succeeds, we're OK since the regular distributed tables can + * be repaired later on. + */ + EnsurePartitionTableNotReplicated(distributedTableId); + /* * We take a lock on the referenced table if there is a foreign constraint * during the copy procedure. If we do not block DMLs on the referenced @@ -260,10 +278,46 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, EnsureShardCanBeRepaired(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort); + /* + * If the shard belongs to a partitioned table, we need to load the data after + * creating the partitions and the partitioning hierarcy. + */ + partitionedTable = PartitionedTableNoLock(distributedTableId); + includeData = !partitionedTable; + /* we generate necessary commands to recreate the shard in target node */ - ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort); + ddlCommandList = + CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort, includeData); foreignConstraintCommandList = CopyShardForeignConstraintCommandList(shardInterval); ddlCommandList = list_concat(ddlCommandList, foreignConstraintCommandList); + + /* + * CopyShardCommandList() drops the table which cascades to partitions if the + * table is a partitioned table. This means that we need to create both parent + * table and its partitions. + * + * We also skipped copying the data, so include it here. + */ + if (partitionedTable) + { + List *partitionCommandList = NIL; + + char *shardName = ConstructQualifiedShardName(shardInterval); + StringInfo copyShardDataCommand = makeStringInfo(); + + partitionCommandList = + CopyPartitionShardsCommandList(shardInterval, sourceNodeName, sourceNodePort); + ddlCommandList = list_concat(ddlCommandList, partitionCommandList); + + /* finally copy the data as well */ + appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, + quote_literal_cstr(shardName), /* table to append */ + quote_literal_cstr(shardName), /* remote table name */ + quote_literal_cstr(sourceNodeName), /* remote host */ + sourceNodePort); /* remote port */ + ddlCommandList = lappend(ddlCommandList, copyShardDataCommand->data); + } + SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); @@ -275,6 +329,49 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, } +/* + * CopyPartitionShardsCommandList gets a shardInterval which is a shard that + * belongs to partitioned table (this is asserted). + * + * The function returns a list of commands which re-creates all the partitions + * of the input shardInterval. + */ +static List * +CopyPartitionShardsCommandList(ShardInterval *shardInterval, char *sourceNodeName, + int32 sourceNodePort) +{ + Oid distributedTableId = shardInterval->relationId; + List *partitionList = NIL; + ListCell *partitionOidCell = NULL; + List *ddlCommandList = NIL; + + Assert(PartitionedTableNoLock(distributedTableId)); + + partitionList = PartitionList(distributedTableId); + foreach(partitionOidCell, partitionList) + { + Oid partitionOid = lfirst_oid(partitionOidCell); + uint64 partitionShardId = + ColocatedShardIdInRelation(partitionOid, shardInterval->shardIndex); + ShardInterval *partitionShardInterval = LoadShardInterval(partitionShardId); + bool includeData = false; + List *copyCommandList = NIL; + char *attachPartitionCommand = NULL; + + copyCommandList = + CopyShardCommandList(partitionShardInterval, sourceNodeName, sourceNodePort, + includeData); + ddlCommandList = list_concat(ddlCommandList, copyCommandList); + + attachPartitionCommand = + GenerateAttachShardPartitionCommand(partitionShardInterval); + ddlCommandList = lappend(ddlCommandList, attachPartitionCommand); + } + + return ddlCommandList; +} + + /* * EnsureShardCanBeRepaired checks if the given shard has a healthy placement in the source * node and inactive node on the target node. @@ -350,11 +447,12 @@ SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 node /* * CopyShardCommandList generates command list to copy the given shard placement - * from the source node to the target node. + * from the source node to the target node. Caller could optionally skip copying + * the data by the flag includeDataCopy. */ List * -CopyShardCommandList(ShardInterval *shardInterval, - char *sourceNodeName, int32 sourceNodePort) +CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, + int32 sourceNodePort, bool includeDataCopy) { int64 shardId = shardInterval->shardId; char *shardName = ConstructQualifiedShardName(shardInterval); @@ -371,14 +469,21 @@ CopyShardCommandList(ShardInterval *shardInterval, copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, tableRecreationCommandList); - appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, - quote_literal_cstr(shardName), /* table to append */ - quote_literal_cstr(shardName), /* remote table name */ - quote_literal_cstr(sourceNodeName), /* remote host */ - sourceNodePort); /* remote port */ + /* + * The caller doesn't want to include the COPY command, perhaps using + * logical replication to copy the data. + */ + if (includeDataCopy) + { + appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, + quote_literal_cstr(shardName), /* table to append */ + quote_literal_cstr(shardName), /* remote table name */ + quote_literal_cstr(sourceNodeName), /* remote host */ + sourceNodePort); /* remote port */ - copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, - copyShardDataCommand->data); + copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, + copyShardDataCommand->data); + } indexCommandList = GetTableIndexAndConstraintCommands(relationId); indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList, shardId); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 99a94ba19..ccefdae4b 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -284,7 +284,9 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); char *srcNodeName = sourceShardPlacement->nodeName; uint32 srcNodePort = sourceShardPlacement->nodePort; - List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort); + bool includeData = true; + List *ddlCommandList = + CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData); List *shardPlacementList = ShardPlacementList(shardId); bool missingWorkerOk = true; diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index d9fbd40ef..7825c7f40 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -16,6 +16,7 @@ #include "catalog/pg_collation.h" #include "catalog/pg_type.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_join_order.h" #include "distributed/distributed_planner.h" #include "distributed/shard_pruning.h" #include "distributed/shardinterval_utils.h" @@ -418,6 +419,7 @@ SingleReplicatedTable(Oid relationId) List *shardPlacementList = NIL; Oid shardId = INVALID_SHARD_ID; + /* we could have append/range distributed tables without shards */ if (list_length(shardList) <= 1) { return false; @@ -425,10 +427,32 @@ SingleReplicatedTable(Oid relationId) /* checking only for the first shard id should suffice */ shardId = (*(uint64 *) linitial(shardList)); - shardPlacementList = ShardPlacementList(shardId); - if (list_length(shardPlacementList) != 1) + + /* for hash distributed tables, it is sufficient to only check one shard */ + if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH) { - return false; + shardPlacementList = ShardPlacementList(shardId); + if (list_length(shardPlacementList) != 1) + { + return false; + } + } + else + { + List *shardIntervalList = LoadShardList(relationId); + ListCell *shardIntervalCell = NULL; + + foreach(shardIntervalCell, shardIntervalList) + { + uint64 *shardIdPointer = (uint64 *) lfirst(shardIntervalCell); + uint64 shardId = (*shardIdPointer); + List *shardPlacementList = ShardPlacementList(shardId); + + if (list_length(shardPlacementList) != 1) + { + return false; + } + } } return true; diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index f68051283..06c68e4ea 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -163,7 +163,7 @@ extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS); /* function declarations for shard copy functinality */ extern List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, - int32 sourceNodePort); + int32 sourceNodePort, bool includeData); extern List * CopyShardForeignConstraintCommandList(ShardInterval *shardInterval); extern void CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval, List ** diff --git a/src/test/regress/expected/replicated_partitioned_table.out b/src/test/regress/expected/replicated_partitioned_table.out index 1cdc25c6a..df990cb02 100644 --- a/src/test/regress/expected/replicated_partitioned_table.out +++ b/src/test/regress/expected/replicated_partitioned_table.out @@ -202,9 +202,95 @@ SELECT create_distributed_table('collections_agg', 'key'); INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; -- coordinator roll-up INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); +-- add some indexes +CREATE INDEX ON customer_engagements (id); +CREATE INDEX ON customer_engagements (event_id); +CREATE INDEX ON customer_engagements (id, event_id); +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); + create_distributed_table +-------------------------- + +(1 row) + +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +INSERT INTO customer_engagements VALUES (2, 1); +INSERT INTO customer_engagements VALUES (1, 2); +INSERT INTO customer_engagements VALUES (2, 2); +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +SELECT * FROM customer_engagements ORDER BY 1,2,3; + id | event_id | value +----+----------+------- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 1 + 2 | 2 | 1 +(4 rows) + +ROLLBACK; +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +INSERT INTO customer_engagements VALUES (1, 1); +SELECT count(*) FROM customer_engagements; + count +------- + 5 +(1 row) + +ROLLBACK; +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; SET search_path TO public; DROP SCHEMA partitioned_table_replicated CASCADE; -NOTICE: drop cascades to 3 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table partitioned_table_replicated.collections drop cascades to table partitioned_table_replicated.fkey_test drop cascades to table partitioned_table_replicated.collections_agg +drop cascades to table partitioned_table_replicated.customer_engagements diff --git a/src/test/regress/expected/replicated_partitioned_table_0.out b/src/test/regress/expected/replicated_partitioned_table_0.out index 31e080bae..015b644a7 100644 --- a/src/test/regress/expected/replicated_partitioned_table_0.out +++ b/src/test/regress/expected/replicated_partitioned_table_0.out @@ -202,9 +202,98 @@ SELECT create_distributed_table('collections_agg', 'key'); INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; -- coordinator roll-up INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); +-- add some indexes +CREATE INDEX ON customer_engagements (id); +ERROR: cannot create index on partitioned table "customer_engagements" +CREATE INDEX ON customer_engagements (event_id); +ERROR: cannot create index on partitioned table "customer_engagements" +CREATE INDEX ON customer_engagements (id, event_id); +ERROR: cannot create index on partitioned table "customer_engagements" +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); + create_distributed_table +-------------------------- + +(1 row) + +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +INSERT INTO customer_engagements VALUES (2, 1); +INSERT INTO customer_engagements VALUES (1, 2); +INSERT INTO customer_engagements VALUES (2, 2); +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +SELECT * FROM customer_engagements ORDER BY 1,2,3; + id | event_id | value +----+----------+------- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 1 + 2 | 2 | 1 +(4 rows) + +ROLLBACK; +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +INSERT INTO customer_engagements VALUES (1, 1); +SELECT count(*) FROM customer_engagements; + count +------- + 5 +(1 row) + +ROLLBACK; +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; SET search_path TO public; DROP SCHEMA partitioned_table_replicated CASCADE; -NOTICE: drop cascades to 3 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table partitioned_table_replicated.collections drop cascades to table partitioned_table_replicated.fkey_test drop cascades to table partitioned_table_replicated.collections_agg +drop cascades to table partitioned_table_replicated.customer_engagements diff --git a/src/test/regress/expected/replicated_partitioned_table_1.out b/src/test/regress/expected/replicated_partitioned_table_1.out index c7ffc500b..cc4405995 100644 --- a/src/test/regress/expected/replicated_partitioned_table_1.out +++ b/src/test/regress/expected/replicated_partitioned_table_1.out @@ -245,6 +245,112 @@ INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GR ERROR: relation "collections_1" does not exist LINE 1: ...llections_agg SELECT collection_id, sum(key) FROM collection... ^ +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); +ERROR: syntax error at or near "PARTITION" +LINE 1: ...E customer_engagements (id integer, event_id int) PARTITION ... + ^ +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF customer_engagements + ^ +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF customer_engagements + ^ +-- add some indexes +CREATE INDEX ON customer_engagements (id); +ERROR: relation "customer_engagements" does not exist +CREATE INDEX ON customer_engagements (event_id); +ERROR: relation "customer_engagements" does not exist +CREATE INDEX ON customer_engagements (id, event_id); +ERROR: relation "customer_engagements" does not exist +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); +ERROR: relation "customer_engagements" does not exist +LINE 1: SELECT create_distributed_table('customer_engagements', 'id'... + ^ +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (1, 1); + ^ +INSERT INTO customer_engagements VALUES (2, 1); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (2, 1); + ^ +INSERT INTO customer_engagements VALUES (1, 2); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (1, 2); + ^ +INSERT INTO customer_engagements VALUES (2, 2); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (2, 2); + ^ +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset +ERROR: relation "customer_engagements" does not exist +LINE 1: ...ewshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_... + ^ +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; +ERROR: syntax error at or near ":" +LINE 1: ...dist_placement SET shardstate = 3 WHERE shardid = :newshardi... + ^ +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (1, 1); + ^ +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: syntax error at or near ":" +LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',... + ^ +ROLLBACK; +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: syntax error at or near ":" +LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',... + ^ +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +ERROR: current transaction is aborted, commands ignored until end of transaction block +SELECT * FROM customer_engagements ORDER BY 1,2,3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: syntax error at or near ":" +LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',... + ^ +INSERT INTO customer_engagements VALUES (1, 1); +ERROR: current transaction is aborted, commands ignored until end of transaction block +SELECT count(*) FROM customer_engagements; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; +ERROR: relation "collections" does not exist SET search_path TO public; DROP SCHEMA partitioned_table_replicated CASCADE; NOTICE: drop cascades to 2 other objects diff --git a/src/test/regress/sql/replicated_partitioned_table.sql b/src/test/regress/sql/replicated_partitioned_table.sql index 46f592c29..eba4b8d35 100644 --- a/src/test/regress/sql/replicated_partitioned_table.sql +++ b/src/test/regress/sql/replicated_partitioned_table.sql @@ -157,6 +157,75 @@ INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key -- coordinator roll-up INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); + +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); + +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); + +-- add some indexes +CREATE INDEX ON customer_engagements (id); +CREATE INDEX ON customer_engagements (event_id); +CREATE INDEX ON customer_engagements (id, event_id); + +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); + +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +INSERT INTO customer_engagements VALUES (2, 1); +INSERT INTO customer_engagements VALUES (1, 2); +INSERT INTO customer_engagements VALUES (2, 2); + +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement + +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset + +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset + +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; + +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ROLLBACK; + +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +SELECT * FROM customer_engagements ORDER BY 1,2,3; +ROLLBACK; + +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +INSERT INTO customer_engagements VALUES (1, 1); +SELECT count(*) FROM customer_engagements; +ROLLBACK; + +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; + SET search_path TO public; DROP SCHEMA partitioned_table_replicated CASCADE; -