diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 3e3c4ca56..76ea64958 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -743,14 +743,6 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, "for hash-distributed tables"))); } - /* we currently don't support partitioned tables for replication factor > 1 */ - if (ShardReplicationFactor > 1) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("distributing partitioned tables with replication " - "factor greater than 1 is not supported"))); - } - /* we don't support distributing tables with multi-level partitioning */ if (PartitionTable(relationId)) { diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 05bcf8432..c91db46f5 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -64,6 +64,7 @@ #include "distributed/multi_copy.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_router_planner.h" #include "distributed/multi_shard_transaction.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" @@ -199,6 +200,9 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); char partitionMethod = PartitionMethod(relationId); + /* disallow modifications to a partition table which have rep. factpr > 1 */ + EnsurePartitionTableNotReplicated(relationId); + if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE || partitionMethod == DISTRIBUTE_BY_NONE) { diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 91f02c3a6..6d8919041 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -152,6 +152,7 @@ static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt); static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement); static void ProcessTruncateStatement(TruncateStmt *truncateStatement); +static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement); static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement); static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode); static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId); @@ -2931,7 +2932,51 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement) /* - * ProcessTruncateStatement handles distributed locking + * ProcessTruncateStatement handles few things that should be + * done before standard process utility is called for truncate + * command. + */ +static void +ProcessTruncateStatement(TruncateStmt *truncateStatement) +{ + EnsurePartitionTableNotReplicatedForTruncate(truncateStatement); + LockTruncatedRelationMetadataInWorkers(truncateStatement); +} + + +/* + * EnsurePartitionTableNotReplicatedForTruncate a simple wrapper around + * EnsurePartitionTableNotReplicated for TRUNCATE command. + */ +static void +EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement) +{ + ListCell *relationCell = NULL; + + foreach(relationCell, truncateStatement->relations) + { + RangeVar *relationRV = (RangeVar *) lfirst(relationCell); + Relation relation = heap_openrv(relationRV, NoLock); + Oid relationId = RelationGetRelid(relation); + + if (!IsDistributedTable(relationId)) + { + heap_close(relation, NoLock); + continue; + } + + EnsurePartitionTableNotReplicated(relationId); + + heap_close(relation, NoLock); + } +} + + +/* + * LockTruncatedRelationMetadataInWorkers determines if distributed + * lock is necessary for truncated relations, and acquire locks. + * + * LockTruncatedRelationMetadataInWorkers handles distributed locking * of truncated tables before standard utility takes over. * * Actual distributed truncation occurs inside truncate trigger. @@ -2941,17 +2986,6 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement) * non-distributed and distributed relations. */ static void -ProcessTruncateStatement(TruncateStmt *truncateStatement) -{ - LockTruncatedRelationMetadataInWorkers(truncateStatement); -} - - -/* - * LockTruncatedRelationMetadataInWorkers determines if distributed - * lock is necessary for truncated relations, and acquire locks. - */ -static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement) { List *distributedRelationList = NIL; @@ -3316,7 +3350,11 @@ AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement, * in its default value of '1pc', then a notice message indicating that '2pc' might be * used for extra safety. In the commit protocol, a BEGIN is sent after connection to * each shard placement and COMMIT/ROLLBACK is handled by - * CompleteShardPlacementTransactions function. + * CoordinatedTransactionCallback function. + * + * The function errors out if the node is not the coordinator or if the DDL is on + * a partitioned table which has replication factor > 1. + * */ static void ExecuteDistributedDDLJob(DDLJob *ddlJob) @@ -3324,6 +3362,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId); EnsureCoordinator(); + EnsurePartitionTableNotReplicated(ddlJob->targetRelationId); if (!ddlJob->concurrentIndexCmd) { diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 162bfb0a6..1668b5985 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -37,6 +37,7 @@ #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_shard_transaction.h" +#include "distributed/distributed_planner.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 666505e61..2fc95d0eb 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -25,6 +25,7 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_router_executor.h" #include "distributed/resource_lock.h" #include "distributed/worker_manager.h" @@ -50,6 +51,9 @@ static char LookupShardTransferMode(Oid shardReplicationModeOid); static void RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); +static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval, + char *sourceNodeName, + int32 sourceNodePort); static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); @@ -219,6 +223,8 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char relationKind = get_rel_relkind(distributedTableId); char *tableOwner = TableOwner(shardInterval->relationId); bool missingOk = false; + bool includeData = false; + bool partitionedTable = false; List *ddlCommandList = NIL; List *foreignConstraintCommandList = NIL; @@ -237,6 +243,18 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, "not supported.", relationName))); } + /* + * Let's not allow repairing partitions to prevent any edge cases. + * We're already not allowing any kind of modifications on the partitions + * so their placements are not likely to to be marked as INVALID. The only + * possible case to mark placement of a partition as invalid is + * "ALTER TABLE parent_table DETACH PARTITION partition_table". But, + * given that the table would become a regular distributed table if the + * command succeeds, we're OK since the regular distributed tables can + * be repaired later on. + */ + EnsurePartitionTableNotReplicated(distributedTableId); + /* * We take a lock on the referenced table if there is a foreign constraint * during the copy procedure. If we do not block DMLs on the referenced @@ -260,10 +278,46 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, EnsureShardCanBeRepaired(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort); + /* + * If the shard belongs to a partitioned table, we need to load the data after + * creating the partitions and the partitioning hierarcy. + */ + partitionedTable = PartitionedTableNoLock(distributedTableId); + includeData = !partitionedTable; + /* we generate necessary commands to recreate the shard in target node */ - ddlCommandList = CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort); + ddlCommandList = + CopyShardCommandList(shardInterval, sourceNodeName, sourceNodePort, includeData); foreignConstraintCommandList = CopyShardForeignConstraintCommandList(shardInterval); ddlCommandList = list_concat(ddlCommandList, foreignConstraintCommandList); + + /* + * CopyShardCommandList() drops the table which cascades to partitions if the + * table is a partitioned table. This means that we need to create both parent + * table and its partitions. + * + * We also skipped copying the data, so include it here. + */ + if (partitionedTable) + { + List *partitionCommandList = NIL; + + char *shardName = ConstructQualifiedShardName(shardInterval); + StringInfo copyShardDataCommand = makeStringInfo(); + + partitionCommandList = + CopyPartitionShardsCommandList(shardInterval, sourceNodeName, sourceNodePort); + ddlCommandList = list_concat(ddlCommandList, partitionCommandList); + + /* finally copy the data as well */ + appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, + quote_literal_cstr(shardName), /* table to append */ + quote_literal_cstr(shardName), /* remote table name */ + quote_literal_cstr(sourceNodeName), /* remote host */ + sourceNodePort); /* remote port */ + ddlCommandList = lappend(ddlCommandList, copyShardDataCommand->data); + } + SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, ddlCommandList); @@ -275,6 +329,49 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, } +/* + * CopyPartitionShardsCommandList gets a shardInterval which is a shard that + * belongs to partitioned table (this is asserted). + * + * The function returns a list of commands which re-creates all the partitions + * of the input shardInterval. + */ +static List * +CopyPartitionShardsCommandList(ShardInterval *shardInterval, char *sourceNodeName, + int32 sourceNodePort) +{ + Oid distributedTableId = shardInterval->relationId; + List *partitionList = NIL; + ListCell *partitionOidCell = NULL; + List *ddlCommandList = NIL; + + Assert(PartitionedTableNoLock(distributedTableId)); + + partitionList = PartitionList(distributedTableId); + foreach(partitionOidCell, partitionList) + { + Oid partitionOid = lfirst_oid(partitionOidCell); + uint64 partitionShardId = + ColocatedShardIdInRelation(partitionOid, shardInterval->shardIndex); + ShardInterval *partitionShardInterval = LoadShardInterval(partitionShardId); + bool includeData = false; + List *copyCommandList = NIL; + char *attachPartitionCommand = NULL; + + copyCommandList = + CopyShardCommandList(partitionShardInterval, sourceNodeName, sourceNodePort, + includeData); + ddlCommandList = list_concat(ddlCommandList, copyCommandList); + + attachPartitionCommand = + GenerateAttachShardPartitionCommand(partitionShardInterval); + ddlCommandList = lappend(ddlCommandList, attachPartitionCommand); + } + + return ddlCommandList; +} + + /* * EnsureShardCanBeRepaired checks if the given shard has a healthy placement in the source * node and inactive node on the target node. @@ -350,11 +447,12 @@ SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 node /* * CopyShardCommandList generates command list to copy the given shard placement - * from the source node to the target node. + * from the source node to the target node. Caller could optionally skip copying + * the data by the flag includeDataCopy. */ List * -CopyShardCommandList(ShardInterval *shardInterval, - char *sourceNodeName, int32 sourceNodePort) +CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, + int32 sourceNodePort, bool includeDataCopy) { int64 shardId = shardInterval->shardId; char *shardName = ConstructQualifiedShardName(shardInterval); @@ -371,14 +469,21 @@ CopyShardCommandList(ShardInterval *shardInterval, copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList, tableRecreationCommandList); - appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, - quote_literal_cstr(shardName), /* table to append */ - quote_literal_cstr(shardName), /* remote table name */ - quote_literal_cstr(sourceNodeName), /* remote host */ - sourceNodePort); /* remote port */ + /* + * The caller doesn't want to include the COPY command, perhaps using + * logical replication to copy the data. + */ + if (includeDataCopy) + { + appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, + quote_literal_cstr(shardName), /* table to append */ + quote_literal_cstr(shardName), /* remote table name */ + quote_literal_cstr(sourceNodeName), /* remote host */ + sourceNodePort); /* remote port */ - copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, - copyShardDataCommand->data); + copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, + copyShardDataCommand->data); + } indexCommandList = GetTableIndexAndConstraintCommands(relationId); indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList, shardId); diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 125c0236e..6ef0f5824 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -28,6 +28,7 @@ #include "distributed/multi_master_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/recursive_planning.h" +#include "distributed/shardinterval_utils.h" #include "distributed/worker_shard_visibility.h" #include "executor/executor.h" #include "nodes/makefuncs.h" @@ -61,6 +62,8 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue bool hasUnresolvedParams, PlannerRestrictionContext * plannerRestrictionContext); +static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid + relationId); static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); static void AssignRTEIdentities(Query *queryTree); @@ -584,8 +587,12 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi if (IsModifyCommand(originalQuery)) { + Oid targetRelationId = InvalidOid; EnsureModificationsCanRun(); + targetRelationId = ModifyQueryResultRelationId(query); + EnsurePartitionTableNotReplicated(targetRelationId); + if (InsertSelectIntoDistributedTable(originalQuery)) { distributedPlan = @@ -764,6 +771,53 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi } +/* + * EnsurePartitionTableNotReplicated errors out if the infput relation is + * a partition table and the table has a replication factor greater than + * one. + * + * If the table is not a partition or replication factor is 1, the function + * becomes a no-op. + */ +void +EnsurePartitionTableNotReplicated(Oid relationId) +{ + DeferredErrorMessage *deferredError = + DeferErrorIfPartitionTableNotSingleReplicated(relationId); + if (deferredError != NULL) + { + RaiseDeferredError(deferredError, ERROR); + } +} + + +/* + * DeferErrorIfPartitionTableNotSingleReplicated defers error if the input relation + * is a partition table with replication factor > 1. Otherwise, the function returns + * NULL. + */ +static DeferredErrorMessage * +DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId) +{ + if (PartitionTableNoLock(relationId) && !SingleReplicatedTable(relationId)) + { + Oid parentOid = PartitionParentOid(relationId); + char *parentRelationTest = get_rel_name(parentOid); + StringInfo errorHint = makeStringInfo(); + + appendStringInfo(errorHint, "Run the query on the parent table " + "\"%s\" instead.", parentRelationTest); + + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "modifications on partitions when replication " + "factor is greater than 1 is not supported", + NULL, errorHint->data); + } + + return NULL; +} + + /* * ResolveExternalParams replaces the external parameters that appears * in the query with the corresponding entries in the boundParams. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 4269a2e24..a48d02fcf 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -33,6 +33,7 @@ #include "distributed/multi_join_order.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_optimizer.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" @@ -472,6 +473,33 @@ ExtractSelectRangeTableEntry(Query *query) } +/* + * ModifyQueryResultRelationId returns the result relation's Oid + * for the given modification query. + * + * The function errors out if the input query is not a + * modify query (e.g., INSERT, UPDATE or DELETE). So, this + * function is not expected to be called on SELECT queries. + */ +Oid +ModifyQueryResultRelationId(Query *query) +{ + RangeTblEntry *resultRte = NULL; + + /* only modify queries have result relations */ + if (!IsModifyCommand(query)) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("input query is not a modification query"))); + } + + resultRte = ExtractInsertRangeTableEntry(query); + Assert(OidIsValid(resultRte->relid)); + + return resultRte->relid; +} + + /* * ExtractInsertRangeTableEntry returns the INSERT'ed table's range table entry. * Note that the function expects and asserts that the input query be @@ -588,7 +616,9 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer if (rangeTableEntry->rtekind == RTE_RELATION) { - if (!IsDistributedTable(rangeTableEntry->relid)) + Oid relationId = rangeTableEntry->relid; + + if (!IsDistributedTable(relationId)) { StringInfo errorMessage = makeStringInfo(); char *relationName = get_rel_name(rangeTableEntry->relid); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 99a94ba19..ccefdae4b 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -284,7 +284,9 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); char *srcNodeName = sourceShardPlacement->nodeName; uint32 srcNodePort = sourceShardPlacement->nodePort; - List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort); + bool includeData = true; + List *ddlCommandList = + CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData); List *shardPlacementList = ShardPlacementList(shardId); bool missingWorkerOk = true; diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index a895b994b..7825c7f40 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -16,6 +16,7 @@ #include "catalog/pg_collation.h" #include "catalog/pg_type.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_join_order.h" #include "distributed/distributed_planner.h" #include "distributed/shard_pruning.h" #include "distributed/shardinterval_utils.h" @@ -414,20 +415,45 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter bool SingleReplicatedTable(Oid relationId) { - List *shardIntervalList = LoadShardList(relationId); - ListCell *shardIntervalCell = NULL; + List *shardList = LoadShardList(relationId); + List *shardPlacementList = NIL; + Oid shardId = INVALID_SHARD_ID; - foreach(shardIntervalCell, shardIntervalList) + /* we could have append/range distributed tables without shards */ + if (list_length(shardList) <= 1) { - uint64 *shardIdPointer = (uint64 *) lfirst(shardIntervalCell); - uint64 shardId = (*shardIdPointer); - List *shardPlacementList = ShardPlacementList(shardId); + return false; + } + /* checking only for the first shard id should suffice */ + shardId = (*(uint64 *) linitial(shardList)); + + /* for hash distributed tables, it is sufficient to only check one shard */ + if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH) + { + shardPlacementList = ShardPlacementList(shardId); if (list_length(shardPlacementList) != 1) { return false; } } + else + { + List *shardIntervalList = LoadShardList(relationId); + ListCell *shardIntervalCell = NULL; + + foreach(shardIntervalCell, shardIntervalList) + { + uint64 *shardIdPointer = (uint64 *) lfirst(shardIntervalCell); + uint64 shardId = (*shardIdPointer); + List *shardPlacementList = ShardPlacementList(shardId); + + if (list_length(shardPlacementList) != 1) + { + return false; + } + } + } return true; } diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index a986b7876..0b8415ea1 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -32,6 +32,7 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_logical_optimizer.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_utility.h" #include "distributed/relay_utility.h" @@ -763,6 +764,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) uint64 shardId = INVALID_SHARD_ID; bool received = false; StringInfo queryString = NULL; + Oid sourceShardRelationId = InvalidOid; + Oid sourceSchemaId = InvalidOid; CheckCitusVersion(ERROR); @@ -787,7 +790,25 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) sourceQualifiedName = quote_qualified_identifier(sourceSchemaName, sourceTableName); sourceCopyCommand = makeStringInfo(); - appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName); + + /* + * Partitioned tables do not support "COPY table TO STDOUT". Thus, we use + * "COPY (SELECT * FROM table) TO STDOUT" for partitioned tables. + * + * If the schema name is not explicitly set, we use the public schema. + */ + sourceSchemaName = sourceSchemaName ? sourceSchemaName : "public"; + sourceSchemaId = get_namespace_oid(sourceSchemaName, false); + sourceShardRelationId = get_relname_relid(sourceTableName, sourceSchemaId); + if (PartitionedTableNoLock(sourceShardRelationId)) + { + appendStringInfo(sourceCopyCommand, COPY_SELECT_ALL_OUT_COMMAND, + sourceQualifiedName); + } + else + { + appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName); + } received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL, sourceCopyCommand, localFilePath); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index f18934e28..03b27b8f5 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -96,6 +96,7 @@ extern void multi_join_restriction_hook(PlannerInfo *root, extern bool IsModifyCommand(Query *query); extern bool IsUpdateOrDelete(struct DistributedPlan *distributedPlan); extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); +extern void EnsurePartitionTableNotReplicated(Oid relationId); extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index f68051283..06c68e4ea 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -163,7 +163,7 @@ extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS); /* function declarations for shard copy functinality */ extern List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, - int32 sourceNodePort); + int32 sourceNodePort, bool includeData); extern List * CopyShardForeignConstraintCommandList(ShardInterval *shardInterval); extern void CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval, List ** diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index ff22d1335..833c0ce2f 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -59,6 +59,7 @@ extern RelationRestrictionContext * CopyRelationRestrictionContext( extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); +extern Oid ModifyQueryResultRelationId(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); extern bool IsMultiRowInsert(Query *query); diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index df4aef827..30bef0f68 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -46,6 +46,7 @@ /* the tablename in the overloaded COPY statement is the to-be-transferred file */ #define TRANSMIT_REGULAR_COMMAND "COPY \"%s\" TO STDOUT WITH (format 'transmit')" #define COPY_OUT_COMMAND "COPY %s TO STDOUT" +#define COPY_SELECT_ALL_OUT_COMMAND "COPY (SELECT * FROM %s) TO STDOUT" #define COPY_IN_COMMAND "COPY %s FROM '%s'" /* Defines that relate to creating tables */ diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index 450b5eadb..9c90376a8 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -272,10 +272,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'range'); ERROR: distributing partitioned tables in only supported for hash-distributed tables SELECT create_reference_table('partitioning_test_failure'); ERROR: distributing partitioned tables in only supported for hash-distributed tables --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); -ERROR: distributing partitioned tables with replication factor greater than 1 is not supported SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; DROP TABLE partitioning_test_failure_2009; diff --git a/src/test/regress/expected/multi_partitioning_0.out b/src/test/regress/expected/multi_partitioning_0.out index f54e3552f..b73fc52a7 100644 --- a/src/test/regress/expected/multi_partitioning_0.out +++ b/src/test/regress/expected/multi_partitioning_0.out @@ -272,10 +272,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'range'); ERROR: distributing partitioned tables in only supported for hash-distributed tables SELECT create_reference_table('partitioning_test_failure'); ERROR: distributing partitioned tables in only supported for hash-distributed tables --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); -ERROR: distributing partitioned tables with replication factor greater than 1 is not supported SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; DROP TABLE partitioning_test_failure_2009; diff --git a/src/test/regress/expected/multi_partitioning_1.out b/src/test/regress/expected/multi_partitioning_1.out index 354087d8a..2d0fb4e5c 100644 --- a/src/test/regress/expected/multi_partitioning_1.out +++ b/src/test/regress/expected/multi_partitioning_1.out @@ -270,12 +270,6 @@ SELECT create_reference_table('partitioning_test_failure'); ERROR: relation "partitioning_test_failure" does not exist LINE 1: SELECT create_reference_table('partitioning_test_failure'); ^ --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); -ERROR: relation "partitioning_test_failure" does not exist -LINE 1: SELECT create_distributed_table('partitioning_test_failure',... - ^ SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; DROP TABLE partitioning_test_failure_2009; diff --git a/src/test/regress/expected/replicated_partitioned_table.out b/src/test/regress/expected/replicated_partitioned_table.out new file mode 100644 index 000000000..df990cb02 --- /dev/null +++ b/src/test/regress/expected/replicated_partitioned_table.out @@ -0,0 +1,296 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + server_version +---------------- + 11 +(1 row) + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); +NOTICE: Copying data from local table... +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); +NOTICE: Copying data from local table... +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +SELECT create_distributed_table('collections_5', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | placement_count +---------------+----------------- + collections | 8 + collections_1 | 8 + collections_2 | 8 + collections_3 | 8 + collections_4 | 8 + collections_5 | 8 +(6 rows) + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + count +------- + 1 +(1 row) + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now() AND key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- COPY is not allowed +COPY collections_1 FROM STDIN; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +\. +invalid command \. +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +TRUNCATE collections, collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; +ERROR: cannot create foreign key constraint +DETAIL: Citus Community Edition currently supports foreign key constraints only for "citus.shard_replication_factor = 1". +HINT: Please change "citus.shard_replication_factor to 1". To learn more about using foreign keys with other replication factors, please contact us at https://citusdata.com/about/contact_us. +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); +ALTER TABLE collections DETACH PARTITION collections_6; +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; + count +------- + 1 +(1 row) + +SELECT count(*) FROM collections_1 WHERE key != 1; + count +------- + 1 +(1 row) + +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); +SELECT create_distributed_table('collections_agg', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); +-- add some indexes +CREATE INDEX ON customer_engagements (id); +CREATE INDEX ON customer_engagements (event_id); +CREATE INDEX ON customer_engagements (id, event_id); +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); + create_distributed_table +-------------------------- + +(1 row) + +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +INSERT INTO customer_engagements VALUES (2, 1); +INSERT INTO customer_engagements VALUES (1, 2); +INSERT INTO customer_engagements VALUES (2, 2); +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +SELECT * FROM customer_engagements ORDER BY 1,2,3; + id | event_id | value +----+----------+------- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 1 + 2 | 2 | 1 +(4 rows) + +ROLLBACK; +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +INSERT INTO customer_engagements VALUES (1, 1); +SELECT count(*) FROM customer_engagements; + count +------- + 5 +(1 row) + +ROLLBACK; +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE; +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table partitioned_table_replicated.collections +drop cascades to table partitioned_table_replicated.fkey_test +drop cascades to table partitioned_table_replicated.collections_agg +drop cascades to table partitioned_table_replicated.customer_engagements diff --git a/src/test/regress/expected/replicated_partitioned_table_0.out b/src/test/regress/expected/replicated_partitioned_table_0.out new file mode 100644 index 000000000..015b644a7 --- /dev/null +++ b/src/test/regress/expected/replicated_partitioned_table_0.out @@ -0,0 +1,299 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + server_version +---------------- + 10 +(1 row) + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); +NOTICE: Copying data from local table... +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); +NOTICE: Copying data from local table... +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +SELECT create_distributed_table('collections_5', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | placement_count +---------------+----------------- + collections | 8 + collections_1 | 8 + collections_2 | 8 + collections_3 | 8 + collections_4 | 8 + collections_5 | 8 +(6 rows) + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + count +------- + 1 +(1 row) + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now() AND key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +DELETE FROM collections_1 WHERE ts = now(); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- COPY is not allowed +COPY collections_1 FROM STDIN; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +\. +invalid command \. +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +TRUNCATE collections, collections_1; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; +ERROR: modifications on partitions when replication factor is greater than 1 is not supported +HINT: Run the query on the parent table "collections" instead. +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; +ERROR: cannot create foreign key constraint +DETAIL: Citus Community Edition currently supports foreign key constraints only for "citus.shard_replication_factor = 1". +HINT: Please change "citus.shard_replication_factor to 1". To learn more about using foreign keys with other replication factors, please contact us at https://citusdata.com/about/contact_us. +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); +ALTER TABLE collections DETACH PARTITION collections_6; +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; + count +------- + 1 +(1 row) + +SELECT count(*) FROM collections_1 WHERE key != 1; + count +------- + 1 +(1 row) + +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); +SELECT create_distributed_table('collections_agg', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); +-- add some indexes +CREATE INDEX ON customer_engagements (id); +ERROR: cannot create index on partitioned table "customer_engagements" +CREATE INDEX ON customer_engagements (event_id); +ERROR: cannot create index on partitioned table "customer_engagements" +CREATE INDEX ON customer_engagements (id, event_id); +ERROR: cannot create index on partitioned table "customer_engagements" +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); + create_distributed_table +-------------------------- + +(1 row) + +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +INSERT INTO customer_engagements VALUES (2, 1); +INSERT INTO customer_engagements VALUES (1, 2); +INSERT INTO customer_engagements VALUES (2, 2); +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +SELECT * FROM customer_engagements ORDER BY 1,2,3; + id | event_id | value +----+----------+------- + 1 | 1 | 1 + 1 | 2 | 1 + 2 | 1 | 1 + 2 | 2 | 1 +(4 rows) + +ROLLBACK; +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +INSERT INTO customer_engagements VALUES (1, 1); +SELECT count(*) FROM customer_engagements; + count +------- + 5 +(1 row) + +ROLLBACK; +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE; +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table partitioned_table_replicated.collections +drop cascades to table partitioned_table_replicated.fkey_test +drop cascades to table partitioned_table_replicated.collections_agg +drop cascades to table partitioned_table_replicated.customer_engagements diff --git a/src/test/regress/expected/replicated_partitioned_table_1.out b/src/test/regress/expected/replicated_partitioned_table_1.out new file mode 100644 index 000000000..cc4405995 --- /dev/null +++ b/src/test/regress/expected/replicated_partitioned_table_1.out @@ -0,0 +1,358 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + server_version +---------------- + 9 +(1 row) + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); +ERROR: syntax error at or near "PARTITION" +LINE 6: ) PARTITION BY LIST ( collection_id ); + ^ +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF collections (key, ts, collection_id, value) + ^ +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF collections (key, ts, collection_id, value) + ^ +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: relation "collections" does not exist +LINE 1: INSERT INTO collections (key, ts, collection_id, value) VALU... + ^ +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); +ERROR: relation "collections" does not exist +LINE 1: SELECT create_distributed_table('collections', 'key'); + ^ +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); +ERROR: syntax error at or near "PARTITION" +LINE 1: CREATE TABLE collections_3 PARTITION OF collections FOR VALU... + ^ +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; +ERROR: relation "collections" does not exist +LINE 1: CREATE TABLE collections_4 AS SELECT * FROM collections LIMI... + ^ +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; +ERROR: relation "collections_4" does not exist +LINE 1: INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM ... + ^ +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE collections ATTACH PARTITION collections_4 FOR V... + ^ +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +ERROR: relation "collections" does not exist +LINE 1: CREATE TABLE collections_5 AS SELECT * FROM collections LIMI... + ^ +SELECT create_distributed_table('collections_5', 'key'); +ERROR: relation "collections_5" does not exist +LINE 1: SELECT create_distributed_table('collections_5', 'key'); + ^ +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ERROR: relation "collections_5" does not exist +LINE 1: INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM ... + ^ +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE collections ATTACH PARTITION collections_5 FOR V... + ^ +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + logicalrelid | placement_count +--------------+----------------- +(0 rows) + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + count +------- + 0 +(1 row) + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); +ERROR: relation "collections_4" does not exist +LINE 1: INSERT INTO collections_4 (key, ts, collection_id, value) VA... + ^ +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: UPDATE collections_1 SET ts = now() WHERE key = 1; + ^ +DELETE FROM collections_1 WHERE ts = now() AND key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: DELETE FROM collections_1 WHERE ts = now() AND key = 1; + ^ +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +ERROR: relation "collections_1" does not exist +LINE 1: UPDATE collections_1 SET ts = now(); + ^ +DELETE FROM collections_1 WHERE ts = now(); +ERROR: relation "collections_1" does not exist +LINE 1: DELETE FROM collections_1 WHERE ts = now(); + ^ +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; +ERROR: relation "collections_1" does not exist +LINE 1: INSERT INTO collections_1 SELECT * FROM collections_1; + ^ +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; +ERROR: relation "collections_1" does not exist +LINE 1: INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET... + ^ +-- COPY is not allowed +COPY collections_1 FROM STDIN; +ERROR: relation "collections_1" does not exist +\. +invalid command \. +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); +ERROR: relation "collections_1" does not exist +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: UPDATE collections_1 SET ts = now() WHERE key = 1; + ^ +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +ERROR: relation "collections_1" does not exist +TRUNCATE collections, collections_1; +ERROR: relation "collections" does not exist +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; +ERROR: relation "collections_5" does not exist +LINE 3: DELETE FROM collections_5 RETURNING * + ^ +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; +ERROR: relation "collections_5" does not exist +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF collections (key, ts, collection_id, value) + ^ +ALTER TABLE collections DETACH PARTITION collections_6; +ERROR: syntax error at or near "DETACH" +LINE 1: ALTER TABLE collections DETACH PARTITION collections_6; + ^ +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); +ERROR: syntax error at or near "ATTACH" +LINE 1: ALTER TABLE collections ATTACH PARTITION collections_6 FOR V... + ^ +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; +ERROR: relation "collections_1" does not exist +LINE 1: SELECT count(*) FROM collections_1 WHERE key = 1; + ^ +SELECT count(*) FROM collections_1 WHERE key != 1; +ERROR: relation "collections_1" does not exist +LINE 1: SELECT count(*) FROM collections_1 WHERE key != 1; + ^ +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); +SELECT create_distributed_table('collections_agg', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; +ERROR: relation "collections_1" does not exist +LINE 1: ...RT INTO collections_agg SELECT key, sum(key) FROM collection... + ^ +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; +ERROR: relation "collections_1" does not exist +LINE 1: ...llections_agg SELECT collection_id, sum(key) FROM collection... + ^ +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); +ERROR: syntax error at or near "PARTITION" +LINE 1: ...E customer_engagements (id integer, event_id int) PARTITION ... + ^ +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF customer_engagements + ^ +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); +ERROR: syntax error at or near "PARTITION" +LINE 2: PARTITION OF customer_engagements + ^ +-- add some indexes +CREATE INDEX ON customer_engagements (id); +ERROR: relation "customer_engagements" does not exist +CREATE INDEX ON customer_engagements (event_id); +ERROR: relation "customer_engagements" does not exist +CREATE INDEX ON customer_engagements (id, event_id); +ERROR: relation "customer_engagements" does not exist +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); +ERROR: relation "customer_engagements" does not exist +LINE 1: SELECT create_distributed_table('customer_engagements', 'id'... + ^ +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (1, 1); + ^ +INSERT INTO customer_engagements VALUES (2, 1); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (2, 1); + ^ +INSERT INTO customer_engagements VALUES (1, 2); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (1, 2); + ^ +INSERT INTO customer_engagements VALUES (2, 2); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (2, 2); + ^ +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset +ERROR: relation "customer_engagements" does not exist +LINE 1: ...ewshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_... + ^ +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; +ERROR: syntax error at or near ":" +LINE 1: ...dist_placement SET shardstate = 3 WHERE shardid = :newshardi... + ^ +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +ERROR: relation "customer_engagements" does not exist +LINE 1: INSERT INTO customer_engagements VALUES (1, 1); + ^ +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: syntax error at or near ":" +LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',... + ^ +ROLLBACK; +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: syntax error at or near ":" +LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',... + ^ +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +ERROR: current transaction is aborted, commands ignored until end of transaction block +SELECT * FROM customer_engagements ORDER BY 1,2,3; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: syntax error at or near ":" +LINE 1: SELECT master_copy_shard_placement(:newshardid, 'localhost',... + ^ +INSERT INTO customer_engagements VALUES (1, 1); +ERROR: current transaction is aborted, commands ignored until end of transaction block +SELECT count(*) FROM customer_engagements; +ERROR: current transaction is aborted, commands ignored until end of transaction block +ROLLBACK; +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; +ERROR: relation "collections" does not exist +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table partitioned_table_replicated.fkey_test +drop cascades to table partitioned_table_replicated.collections_agg diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 367015ea5..68b3ccb8b 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -37,7 +37,7 @@ test: multi_insert_select_window multi_shard_update_delete window_functions dml_ # ---------- # Tests for partitioning support # ---------- -test: multi_partitioning_utils multi_partitioning +test: multi_partitioning_utils multi_partitioning replicated_partitioned_table # ---------- diff --git a/src/test/regress/sql/multi_partitioning.sql b/src/test/regress/sql/multi_partitioning.sql index adb643932..8dbef8169 100644 --- a/src/test/regress/sql/multi_partitioning.sql +++ b/src/test/regress/sql/multi_partitioning.sql @@ -173,9 +173,6 @@ SELECT create_distributed_table('partitioning_test_failure', 'id', 'append'); SELECT create_distributed_table('partitioning_test_failure', 'id', 'range'); SELECT create_reference_table('partitioning_test_failure'); --- replication factor > 1 is not allowed in distributed partitioned tables -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('partitioning_test_failure', 'id'); SET citus.shard_replication_factor TO 1; -- non-distributed tables cannot have distributed partitions; diff --git a/src/test/regress/sql/replicated_partitioned_table.sql b/src/test/regress/sql/replicated_partitioned_table.sql new file mode 100644 index 000000000..eba4b8d35 --- /dev/null +++ b/src/test/regress/sql/replicated_partitioned_table.sql @@ -0,0 +1,231 @@ +-- +-- Distributed Partitioned Table Tests +-- +SET citus.next_shard_id TO 1760000; + +CREATE SCHEMA partitioned_table_replicated; +SET search_path TO partitioned_table_replicated; + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 2; + +-- print major version number for version-specific tests +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int AS server_version; + + +CREATE TABLE collections ( + key bigint, + ts timestamptz, + collection_id integer, + value numeric +) PARTITION BY LIST ( collection_id ); + + +CREATE TABLE collections_1 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 1 ); + +CREATE TABLE collections_2 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 2 ); + +-- load some data data +INSERT INTO collections (key, ts, collection_id, value) VALUES (1, '2009-01-01', 1, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (2, '2009-01-01', 1, 2); +INSERT INTO collections (key, ts, collection_id, value) VALUES (3, '2009-01-01', 2, 1); +INSERT INTO collections (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); + +-- in the first case, we'll distributed the +-- already existing partitioninong hierarcy +SELECT create_distributed_table('collections', 'key'); + +-- now create partition of a already distributed table +CREATE TABLE collections_3 PARTITION OF collections FOR VALUES IN ( 3 ); + +-- now attaching non distributed table to a distributed table +CREATE TABLE collections_4 AS SELECT * FROM collections LIMIT 0; + +-- load some data +INSERT INTO collections_4 SELECT i, '2009-01-01', 4, i FROM generate_series (0, 10) i; + +ALTER TABLE collections ATTACH PARTITION collections_4 FOR VALUES IN ( 4 ); + +-- finally attach a distributed table to a distributed table +CREATE TABLE collections_5 AS SELECT * FROM collections LIMIT 0; +SELECT create_distributed_table('collections_5', 'key'); + +-- load some data +INSERT INTO collections_5 SELECT i, '2009-01-01', 5, i FROM generate_series (0, 10) i; +ALTER TABLE collections ATTACH PARTITION collections_5 FOR VALUES IN ( 5 ); + +-- make sure that we've all the placements +SELECT + logicalrelid, count(*) as placement_count +FROM + pg_dist_shard, pg_dist_shard_placement +WHERE + logicalrelid::text LIKE '%collections%' AND + pg_dist_shard.shardid = pg_dist_shard_placement.shardid +GROUP BY + logicalrelid +ORDER BY + 1,2; + +-- and, make sure that all tables are colocated +SELECT + count(DISTINCT colocationid) +FROM + pg_dist_partition +WHERE + logicalrelid::text LIKE '%collections%'; + +-- make sure that any kind of modification is disallowed on partitions +-- given that replication factor > 1 +INSERT INTO collections_4 (key, ts, collection_id, value) VALUES (4, '2009-01-01', 2, 2); + +-- single shard update/delete not allowed +UPDATE collections_1 SET ts = now() WHERE key = 1; +DELETE FROM collections_1 WHERE ts = now() AND key = 1; + +-- multi shard update/delete are not allowed +UPDATE collections_1 SET ts = now(); +DELETE FROM collections_1 WHERE ts = now(); + +-- insert..select pushdown +INSERT INTO collections_1 SELECT * FROM collections_1; + +-- insert..select via coordinator +INSERT INTO collections_1 SELECT * FROM collections_1 OFFSET 0; + +-- COPY is not allowed +COPY collections_1 FROM STDIN; +\. + +-- DDLs are not allowed +CREATE INDEX index_on_partition ON collections_1(key); + +-- EXPLAIN with modifications is not allowed as well +UPDATE collections_1 SET ts = now() WHERE key = 1; + +-- TRUNCATE is also not allowed +TRUNCATE collections_1; +TRUNCATE collections, collections_1; + +-- modifying CTEs are also not allowed +WITH collections_5_cte AS +( + DELETE FROM collections_5 RETURNING * +) +SELECT * FROM collections_5_cte; + +-- foreign key creation is disallowed due to replication factor > 1 +CREATE TABLE fkey_test (key bigint PRIMARY KEY); +SELECT create_distributed_table('fkey_test', 'key'); + +ALTER TABLE + collections_5 +ADD CONSTRAINT + fkey_delete FOREIGN KEY(key) +REFERENCES + fkey_test(key) ON DELETE CASCADE; + +-- we should be able to attach and detach partitions +-- given that those DDLs are on the parent table +CREATE TABLE collections_6 + PARTITION OF collections (key, ts, collection_id, value) + FOR VALUES IN ( 6 ); + +ALTER TABLE collections DETACH PARTITION collections_6; +ALTER TABLE collections ATTACH PARTITION collections_6 FOR VALUES IN ( 6 ); + +-- read queries works just fine +SELECT count(*) FROM collections_1 WHERE key = 1; +SELECT count(*) FROM collections_1 WHERE key != 1; + +-- rollups SELECT'ing from partitions should work just fine +CREATE TABLE collections_agg ( + key bigint, + sum_value numeric +); + +SELECT create_distributed_table('collections_agg', 'key'); + +-- pushdown roll-up +INSERT INTO collections_agg SELECT key, sum(key) FROM collections_1 GROUP BY key; + +-- coordinator roll-up +INSERT INTO collections_agg SELECT collection_id, sum(key) FROM collections_1 GROUP BY collection_id; + +-- now make sure that repair functionality works fine +-- create a table and create its distribution metadata +CREATE TABLE customer_engagements (id integer, event_id int) PARTITION BY LIST ( event_id ); + +CREATE TABLE customer_engagements_1 + PARTITION OF customer_engagements + FOR VALUES IN ( 1 ); + +CREATE TABLE customer_engagements_2 + PARTITION OF customer_engagements + FOR VALUES IN ( 2 ); + +-- add some indexes +CREATE INDEX ON customer_engagements (id); +CREATE INDEX ON customer_engagements (event_id); +CREATE INDEX ON customer_engagements (id, event_id); + +-- distribute the table +-- create a single shard on the first worker +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +SELECT create_distributed_table('customer_engagements', 'id', 'hash'); + +-- ingest some data for the tests +INSERT INTO customer_engagements VALUES (1, 1); +INSERT INTO customer_engagements VALUES (2, 1); +INSERT INTO customer_engagements VALUES (1, 2); +INSERT INTO customer_engagements VALUES (2, 2); + +-- the following queries does the following: +-- (i) create a new shard +-- (ii) mark the second shard placements as unhealthy +-- (iii) do basic checks i.e., only allow copy from healthy placement to unhealthy ones +-- (iv) do a successful master_copy_shard_placement from the first placement to the second +-- (v) mark the first placement as unhealthy and execute a query that is routed to the second placement + +SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT groupid AS worker_1_group FROM pg_dist_node WHERE nodeport=:worker_1_port \gset + +-- get the newshardid +SELECT shardid as newshardid FROM pg_dist_shard WHERE logicalrelid = 'customer_engagements'::regclass +\gset + +-- now, update the second placement as unhealthy +UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid + AND groupid = :worker_2_group; + +-- cannot repair a shard after a modification (transaction still open during repair) +BEGIN; +INSERT INTO customer_engagements VALUES (1, 1); +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ROLLBACK; + +-- modifications after reparing a shard are fine (will use new metadata) +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ALTER TABLE customer_engagements ADD COLUMN value float DEFAULT 1.0; +SELECT * FROM customer_engagements ORDER BY 1,2,3; +ROLLBACK; + +BEGIN; +SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +INSERT INTO customer_engagements VALUES (1, 1); +SELECT count(*) FROM customer_engagements; +ROLLBACK; + +-- TRUNCATE is allowed on the parent table +-- try it just before dropping the table +TRUNCATE collections; + +SET search_path TO public; +DROP SCHEMA partitioned_table_replicated CASCADE;