From 31cd2357fe3f6648b1e871a731e60779b81036b5 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Sun, 1 Jan 2017 10:52:50 +0300 Subject: [PATCH] Add upgrade_to_reference_table With this change we introduce new UDF, upgrade_to_reference_table, which can be used to upgrade existing broadcast tables reference tables. For upgrading, we require that given table contains only one shard. --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.1-11--6.1-12.sql | 12 + src/backend/distributed/citus.control | 2 +- .../commands/create_distributed_table.c | 109 +-- .../master/master_metadata_utility.c | 180 ++++- .../distributed/master/master_repair_shards.c | 12 +- .../distributed/utils/colocation_utils.c | 33 +- .../distributed/utils/reference_table_utils.c | 252 ++++++ src/include/distributed/colocation_utils.h | 1 + .../distributed/master_metadata_utility.h | 5 + src/include/distributed/master_protocol.h | 8 + .../distributed/reference_table_utils.h | 17 + src/test/regress/expected/multi_extension.out | 1 + .../multi_upgrade_reference_table.out | 762 ++++++++++++++++++ src/test/regress/multi_schedule | 5 + src/test/regress/sql/multi_extension.sql | 1 + .../sql/multi_upgrade_reference_table.sql | 481 +++++++++++ 17 files changed, 1756 insertions(+), 129 deletions(-) create mode 100644 src/backend/distributed/citus--6.1-11--6.1-12.sql create mode 100644 src/backend/distributed/utils/reference_table_utils.c create mode 100644 src/include/distributed/reference_table_utils.h create mode 100644 src/test/regress/expected/multi_upgrade_reference_table.out create mode 100644 src/test/regress/sql/multi_upgrade_reference_table.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index c2f845b02..7df3ad9cc 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -117,6 +117,8 @@ $(EXTENSION)--6.1-10.sql: $(EXTENSION)--6.1-9.sql $(EXTENSION)--6.1-9--6.1-10.sq cat $^ > $@ $(EXTENSION)--6.1-11.sql: $(EXTENSION)--6.1-10.sql $(EXTENSION)--6.1-10--6.1-11.sql cat $^ > $@ +$(EXTENSION)--6.1-12.sql: $(EXTENSION)--6.1-11.sql $(EXTENSION)--6.1-11--6.1-12.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-11--6.1-12.sql b/src/backend/distributed/citus--6.1-11--6.1-12.sql new file mode 100644 index 000000000..67bf30339 --- /dev/null +++ b/src/backend/distributed/citus--6.1-11--6.1-12.sql @@ -0,0 +1,12 @@ +/* citus--6.1-11--6.1-12.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION upgrade_to_reference_table(table_name regclass) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$upgrade_to_reference_table$$; +COMMENT ON FUNCTION upgrade_to_reference_table(table_name regclass) + IS 'upgrades an existing broadcast table to a reference table'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 17dcb793b..d40b2142d 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-11' +default_version = '6.1-12' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 3b3559467..6e6e1b52a 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -42,6 +42,7 @@ #include "distributed/multi_logical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" +#include "distributed/reference_table_utils.h" #include "distributed/worker_transaction.h" #include "executor/spi.h" #include "nodes/execnodes.h" @@ -65,8 +66,6 @@ static void ConvertToDistributedTable(Oid relationId, char *distributionColumnNa char distributionMethod, uint32 colocationId, char replicationModel); static char LookupDistributionMethod(Oid distributionMethodOid); -static void RecordDistributedRelationDependencies(Oid distributedRelationId, - Node *distributionKey); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); static bool LocalTableEmpty(Oid tableId); @@ -76,9 +75,6 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation, char distributionMethod, Var *distributionColumn, uint32 colocationId); -static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, - Var *distributionColumn, uint32 colocationId, - char replicationModel); static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, char *colocateWithTableName, int shardCount, int replicationFactor); @@ -225,9 +221,7 @@ CreateReferenceTable(Oid relationId) { uint32 colocationId = INVALID_COLOCATION_ID; List *workerNodeList = WorkerNodeList(); - int shardCount = 1; int replicationFactor = list_length(workerNodeList); - Oid distributionColumnType = InvalidOid; char *distributionColumnName = NULL; EnsureSchemaNode(); @@ -242,13 +236,7 @@ CreateReferenceTable(Oid relationId) errdetail("There are no active worker nodes."))); } - /* check for existing colocations */ - colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType); - if (colocationId == INVALID_COLOCATION_ID) - { - colocationId = CreateColocationGroup(shardCount, replicationFactor, - distributionColumnType); - } + colocationId = CreateReferenceTableColocationId(); /* first, convert the relation into distributed relation */ ConvertToDistributedTable(relationId, distributionColumnName, @@ -730,99 +718,6 @@ ErrorIfNotSupportedForeignConstraint(Relation relation, char distributionMethod, } -/* - * InsertIntoPgDistPartition inserts a new tuple into pg_dist_partition. - */ -static void -InsertIntoPgDistPartition(Oid relationId, char distributionMethod, - Var *distributionColumn, uint32 colocationId, - char replicationModel) -{ - Relation pgDistPartition = NULL; - char *distributionColumnString = NULL; - - HeapTuple newTuple = NULL; - Datum newValues[Natts_pg_dist_partition]; - bool newNulls[Natts_pg_dist_partition]; - - /* open system catalog and insert new tuple */ - pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock); - - /* form new tuple for pg_dist_partition */ - memset(newValues, 0, sizeof(newValues)); - memset(newNulls, false, sizeof(newNulls)); - - newValues[Anum_pg_dist_partition_logicalrelid - 1] = - ObjectIdGetDatum(relationId); - newValues[Anum_pg_dist_partition_partmethod - 1] = - CharGetDatum(distributionMethod); - newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); - newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); - - /* set partkey column to NULL for reference tables */ - if (distributionMethod != DISTRIBUTE_BY_NONE) - { - distributionColumnString = nodeToString((Node *) distributionColumn); - - newValues[Anum_pg_dist_partition_partkey - 1] = - CStringGetTextDatum(distributionColumnString); - } - else - { - newValues[Anum_pg_dist_partition_partkey - 1] = PointerGetDatum(NULL); - newNulls[Anum_pg_dist_partition_partkey - 1] = true; - } - - newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls); - - /* finally insert tuple, build index entries & register cache invalidation */ - simple_heap_insert(pgDistPartition, newTuple); - CatalogUpdateIndexes(pgDistPartition, newTuple); - CitusInvalidateRelcacheByRelid(relationId); - - RecordDistributedRelationDependencies(relationId, (Node *) distributionColumn); - - CommandCounterIncrement(); - heap_close(pgDistPartition, NoLock); -} - - -/* - * RecordDistributedRelationDependencies creates the dependency entries - * necessary for a distributed relation in addition to the preexisting ones - * for a normal relation. - * - * We create one dependency from the (now distributed) relation to the citus - * extension to prevent the extension from being dropped while distributed - * tables exist. Furthermore a dependency from pg_dist_partition's - * distribution clause to the underlying columns is created, but it's marked - * as being owned by the relation itself. That means the entire table can be - * dropped, but the column itself can't. Neither can the type of the - * distribution column be changed (c.f. ATExecAlterColumnType). - */ -static void -RecordDistributedRelationDependencies(Oid distributedRelationId, Node *distributionKey) -{ - ObjectAddress relationAddr = { 0, 0, 0 }; - ObjectAddress citusExtensionAddr = { 0, 0, 0 }; - - relationAddr.classId = RelationRelationId; - relationAddr.objectId = distributedRelationId; - relationAddr.objectSubId = 0; - - citusExtensionAddr.classId = ExtensionRelationId; - citusExtensionAddr.objectId = get_extension_oid("citus", false); - citusExtensionAddr.objectSubId = 0; - - /* dependency from table entry to extension */ - recordDependencyOn(&relationAddr, &citusExtensionAddr, DEPENDENCY_NORMAL); - - /* make sure the distribution key column/expression does not just go away */ - recordDependencyOnSingleRelExpr(&relationAddr, distributionKey, distributedRelationId, - DEPENDENCY_NORMAL, DEPENDENCY_NORMAL); -} - - /* * LookupDistributionMethod maps the oids of citus.distribution_type enum * values to pg_dist_partition.partmethod values. diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 60b77a164..f44e03a4d 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -17,9 +17,14 @@ #include "access/htup_details.h" #include "access/sysattr.h" #include "access/xact.h" +#include "catalog/dependency.h" #include "catalog/indexing.h" -#include "catalog/pg_type.h" +#include "catalog/pg_constraint.h" +#include "catalog/pg_extension.h" #include "catalog/pg_namespace.h" +#include "catalog/pg_type.h" +#include "commands/extension.h" +#include "distributed/connection_management.h" #include "distributed/citus_nodes.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" @@ -47,6 +52,8 @@ /* Local functions forward declarations */ static uint64 * AllocateUint64(uint64 value); +static void RecordDistributedRelationDependencies(Oid distributedRelationId, + Node *distributionKey); /* exports for SQL callable functions */ @@ -322,6 +329,36 @@ FinalizedShardPlacementList(uint64 shardId) } +/* + * FinalizedShardPlacement finds a shard placement for the given shardId from + * system catalog, chooses a placement that is in finalized state and returns + * that shard placement. If this function cannot find a healthy shard placement + * and missingOk is set to false it errors out. + */ +ShardPlacement * +FinalizedShardPlacement(uint64 shardId, bool missingOk) +{ + List *finalizedPlacementList = FinalizedShardPlacementList(shardId); + ShardPlacement *shardPlacement = NULL; + + if (list_length(finalizedPlacementList) == 0) + { + if (!missingOk) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not find any healthy placement for shard " + UINT64_FORMAT, shardId))); + } + + return shardPlacement; + } + + shardPlacement = (ShardPlacement *) linitial(finalizedPlacementList); + + return shardPlacement; +} + + /* * ShardPlacementList finds shard placements for the given shardId from system * catalogs, converts these placements to their in-memory representation, and @@ -515,6 +552,99 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId, } +/* + * InsertIntoPgDistPartition inserts a new tuple into pg_dist_partition. + */ +void +InsertIntoPgDistPartition(Oid relationId, char distributionMethod, + Var *distributionColumn, uint32 colocationId, + char replicationModel) +{ + Relation pgDistPartition = NULL; + char *distributionColumnString = NULL; + + HeapTuple newTuple = NULL; + Datum newValues[Natts_pg_dist_partition]; + bool newNulls[Natts_pg_dist_partition]; + + /* open system catalog and insert new tuple */ + pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock); + + /* form new tuple for pg_dist_partition */ + memset(newValues, 0, sizeof(newValues)); + memset(newNulls, false, sizeof(newNulls)); + + newValues[Anum_pg_dist_partition_logicalrelid - 1] = + ObjectIdGetDatum(relationId); + newValues[Anum_pg_dist_partition_partmethod - 1] = + CharGetDatum(distributionMethod); + newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); + newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); + + /* set partkey column to NULL for reference tables */ + if (distributionMethod != DISTRIBUTE_BY_NONE) + { + distributionColumnString = nodeToString((Node *) distributionColumn); + + newValues[Anum_pg_dist_partition_partkey - 1] = + CStringGetTextDatum(distributionColumnString); + } + else + { + newValues[Anum_pg_dist_partition_partkey - 1] = PointerGetDatum(NULL); + newNulls[Anum_pg_dist_partition_partkey - 1] = true; + } + + newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls); + + /* finally insert tuple, build index entries & register cache invalidation */ + simple_heap_insert(pgDistPartition, newTuple); + CatalogUpdateIndexes(pgDistPartition, newTuple); + CitusInvalidateRelcacheByRelid(relationId); + + RecordDistributedRelationDependencies(relationId, (Node *) distributionColumn); + + CommandCounterIncrement(); + heap_close(pgDistPartition, NoLock); +} + + +/* + * RecordDistributedRelationDependencies creates the dependency entries + * necessary for a distributed relation in addition to the preexisting ones + * for a normal relation. + * + * We create one dependency from the (now distributed) relation to the citus + * extension to prevent the extension from being dropped while distributed + * tables exist. Furthermore a dependency from pg_dist_partition's + * distribution clause to the underlying columns is created, but it's marked + * as being owned by the relation itself. That means the entire table can be + * dropped, but the column itself can't. Neither can the type of the + * distribution column be changed (c.f. ATExecAlterColumnType). + */ +static void +RecordDistributedRelationDependencies(Oid distributedRelationId, Node *distributionKey) +{ + ObjectAddress relationAddr = { 0, 0, 0 }; + ObjectAddress citusExtensionAddr = { 0, 0, 0 }; + + relationAddr.classId = RelationRelationId; + relationAddr.objectId = distributedRelationId; + relationAddr.objectSubId = 0; + + citusExtensionAddr.classId = ExtensionRelationId; + citusExtensionAddr.objectId = get_extension_oid("citus", false); + citusExtensionAddr.objectSubId = 0; + + /* dependency from table entry to extension */ + recordDependencyOn(&relationAddr, &citusExtensionAddr, DEPENDENCY_NORMAL); + + /* make sure the distribution key column/expression does not just go away */ + recordDependencyOnSingleRelExpr(&relationAddr, distributionKey, distributedRelationId, + DEPENDENCY_NORMAL, DEPENDENCY_NORMAL); +} + + /* * DeletePartitionRow removes the row from pg_dist_partition where the logicalrelid * field equals to distributedRelationId. Then, the function invalidates the @@ -944,3 +1074,51 @@ master_stage_shard_placement_row(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + + +/* + * TableReferenced function checks whether given table is referenced by another table + * via foreign constraints. If it is referenced, this function returns true. To check + * that, this function searches given relation at pg_constraints system catalog. However + * since there is no index for the column we searched, this function performs sequential + * search, therefore call this function with caution. + */ +bool +TableReferenced(Oid relationId) +{ + Relation pgConstraint = NULL; + HeapTuple heapTuple = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + Oid scanIndexId = InvalidOid; + bool useIndex = false; + + pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); + + ScanKeyInit(&scanKey[0], Anum_pg_constraint_confrelid, BTEqualStrategyNumber, F_OIDEQ, + relationId); + scanDescriptor = systable_beginscan(pgConstraint, scanIndexId, useIndex, NULL, + scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); + + if (constraintForm->contype == CONSTRAINT_FOREIGN) + { + systable_endscan(scanDescriptor); + heap_close(pgConstraint, NoLock); + + return true; + } + + heapTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + heap_close(pgConstraint, NoLock); + + return false; +} diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index b64c2bb40..e3e6854fa 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -45,12 +45,6 @@ static void RepairShardPlacement(int64 shardId, char *sourceNodeName, static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); -static ShardPlacement * SearchShardPlacementInList(List *shardPlacementList, - char *nodeName, uint32 nodePort, - bool missingOk); -static List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, - int32 sourceNodePort); -static List * CopyShardForeignConstraintCommandList(ShardInterval *shardInterval); static char * ConstructQualifiedShardName(ShardInterval *shardInterval); static List * RecreateTableDDLCommandList(Oid relationId); @@ -210,7 +204,7 @@ EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePo * specified node name and port. If missingOk is set to true, this function returns NULL * if no such placement exists in the provided list, otherwise it throws an error. */ -static ShardPlacement * +ShardPlacement * SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 nodePort, bool missingOk) { @@ -250,7 +244,7 @@ SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 node * CopyShardCommandList generates command list to copy the given shard placement * from the source node to the target node. */ -static List * +List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, int32 sourceNodePort) { @@ -293,7 +287,7 @@ CopyShardCommandList(ShardInterval *shardInterval, * CopyShardForeignConstraintCommandList generates command list to create foreign * constraints existing in source shard after copying it to the other node. */ -static List * +List * CopyShardForeignConstraintCommandList(ShardInterval *shardInterval) { List *copyShardForeignConstraintCommandList = NIL; diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 14122a232..6df607a89 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -145,16 +145,7 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) UpdateRelationColocationGroup(targetRelationId, sourceColocationId); /* if there is not any remaining table in the colocation group, delete it */ - if (targetColocationId != INVALID_COLOCATION_ID) - { - List *colocatedTableList = ColocationGroupTableList(targetColocationId); - int colocatedTableCount = list_length(colocatedTableList); - - if (colocatedTableCount == 0) - { - DeleteColocationGroup(targetColocationId); - } - } + DeleteColocationGroupIfNoTablesBelong(targetColocationId); heap_close(pgDistColocation, NoLock); } @@ -940,6 +931,28 @@ ColocatedShardIdInRelation(Oid relationId, int shardIndex) } +/* + * DeleteColocationGroupIfNoTablesBelong function deletes given co-location group if there + * is no relation in that co-location group. A co-location group may become empty after + * mark_tables_colocated or upgrade_reference_table UDF calls. In that case we need to + * remove empty co-location group to prevent orphaned co-location groups. + */ +void +DeleteColocationGroupIfNoTablesBelong(uint32 colocationId) +{ + if (colocationId != INVALID_COLOCATION_ID) + { + List *colocatedTableList = ColocationGroupTableList(colocationId); + int colocatedTableCount = list_length(colocatedTableList); + + if (colocatedTableCount == 0) + { + DeleteColocationGroup(colocationId); + } + } +} + + /* * DeleteColocationGroup deletes the colocation group from pg_dist_colocation. */ diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c new file mode 100644 index 000000000..5f80a1bfb --- /dev/null +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -0,0 +1,252 @@ +/*------------------------------------------------------------------------- + * + * reference_table_utils.c + * + * Declarations for public utility functions related to reference tables. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" + +#include "access/heapam.h" +#include "distributed/colocation_utils.h" +#include "distributed/listutils.h" +#include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_logical_planner.h" +#include "distributed/reference_table_utils.h" +#include "distributed/resource_lock.h" +#include "distributed/shardinterval_utils.h" +#include "distributed/worker_manager.h" +#include "distributed/worker_transaction.h" +#include "utils/lsyscache.h" + + +/* local function forward declarations */ +static void ReplicateSingleShardTableToAllWorkers(Oid relationId); +static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); +static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); + + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(upgrade_to_reference_table); + + +/* + * upgrade_to_reference_table accepts a broadcast table which has only one shard and + * replicates it across all nodes to create a reference table. It also modifies related + * metadata to mark the table as reference. + */ +Datum +upgrade_to_reference_table(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + List *shardIntervalList = NIL; + ShardInterval *shardInterval = NULL; + uint64 shardId = INVALID_SHARD_ID; + + EnsureSchemaNode(); + + if (!IsDistributedTable(relationId)) + { + char *relationName = get_rel_name(relationId); + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot upgrade to reference table"), + errdetail("Relation \"%s\" is not distributed.", relationName), + errhint("Instead, you can use; " + "create_reference_table('%s');", relationName))); + } + + if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + { + char *relationName = get_rel_name(relationId); + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot upgrade to reference table"), + errdetail("Relation \"%s\" is already a reference table", + relationName))); + } + + shardIntervalList = LoadShardIntervalList(relationId); + if (list_length(shardIntervalList) != 1) + { + char *relationName = get_rel_name(relationId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot upgrade to reference table"), + errdetail("Relation \"%s\" shard count is not one. Only " + "relations with one shard can be upgraded to " + "reference tables.", relationName))); + } + + shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + LockShardDistributionMetadata(shardId, ExclusiveLock); + LockShardResource(shardId, ExclusiveLock); + + ReplicateSingleShardTableToAllWorkers(relationId); + + PG_RETURN_VOID(); +} + + +/* + * ReplicateSingleShardTableToAllWorkers accepts a broadcast table and replicates it to + * all worker nodes. It assumes that caller of this function ensures that given broadcast + * table has only one shard. + */ +static void +ReplicateSingleShardTableToAllWorkers(Oid relationId) +{ + List *shardIntervalList = LoadShardIntervalList(relationId); + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + + List *foreignConstraintCommandList = CopyShardForeignConstraintCommandList( + shardInterval); + + if (foreignConstraintCommandList != NIL || TableReferenced(relationId)) + { + char *relationName = get_rel_name(relationId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot upgrade to reference table"), + errdetail("Relation \"%s\" is part of a foreign constraint. " + "Foreign key constraints are not allowed " + "from or to reference tables.", relationName))); + } + + /* + * ReplicateShardToAllWorkers function opens separate transactions (i.e., not part + * of any coordinated transactions) to each worker and replicates given shard to all + * workers. If a worker already has a healthy replica of given shard, it skips that + * worker to prevent copying unnecessary data. + */ + ReplicateShardToAllWorkers(shardInterval); + + /* + * After copying the shards, we need to update metadata tables to mark this table as + * reference table. We modify pg_dist_partition, pg_dist_colocation and pg_dist_shard + * tables in ConvertToReferenceTableMetadata function. + */ + ConvertToReferenceTableMetadata(relationId, shardId); +} + + +/* + * ReplicateShardToAllWorkers function replicates given shard to the given worker nodes + * in a separate transactions. While replicating, it only replicates the shard to the + * workers which does not have a healthy replica of the shard. This function also modifies + * metadata by inserting/updating related rows in pg_dist_shard_placement. However, this + * function does not obtain any lock on shard resource and shard metadata. It is caller's + * responsibility to take those locks. + */ +static void +ReplicateShardToAllWorkers(ShardInterval *shardInterval) +{ + uint64 shardId = shardInterval->shardId; + List *shardPlacementList = ShardPlacementList(shardId); + bool missingOk = false; + ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); + char *srcNodeName = sourceShardPlacement->nodeName; + uint32 srcNodePort = sourceShardPlacement->nodePort; + char *tableOwner = TableOwner(shardInterval->relationId); + List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort); + + /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ + Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + List *workerNodeList = WorkerNodeList(); + ListCell *workerNodeCell = NULL; + + /* + * We will iterate over all worker nodes and if healthy placement is not exist at + * given node we will copy the shard to that node. Then we will also modify + * the metadata to reflect newly copied shard. + */ + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + uint32 nodePort = workerNode->workerPort; + bool missingWorkerOk = true; + + ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, + nodeName, nodePort, + missingWorkerOk); + if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) + { + SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, + ddlCommandList); + if (targetPlacement == NULL) + { + InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, FILE_FINALIZED, 0, + nodeName, nodePort); + } + else + { + UpdateShardPlacementState(targetPlacement->placementId, FILE_FINALIZED); + } + } + } + + heap_close(pgDistNode, NoLock); +} + + +/* + * ConvertToReferenceTableMetadata accepts a broadcast table and modifies its metadata to + * reference table metadata. To do this, this function updates pg_dist_partition, + * pg_dist_colocation and pg_dist_shard. This function assumes that caller ensures that + * given broadcast table has only one shard. + */ +static void +ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId) +{ + uint32 currentColocationId = TableColocationId(relationId); + uint32 newColocationId = CreateReferenceTableColocationId(); + Var *distributionColumn = NULL; + char shardStorageType = ShardStorageType(relationId); + text *shardMinValue = NULL; + text *shardMaxValue = NULL; + + /* delete old metadata rows */ + DeletePartitionRow(relationId); + DeleteColocationGroupIfNoTablesBelong(currentColocationId); + DeleteShardRow(shardId); + + /* insert new metadata rows */ + InsertIntoPgDistPartition(relationId, DISTRIBUTE_BY_NONE, distributionColumn, + newColocationId, REPLICATION_MODEL_2PC); + InsertShardRow(relationId, shardId, shardStorageType, shardMinValue, shardMaxValue); +} + + +/* + * CreateReferenceTableColocationId creates a new co-location id for reference tables and + * writes it into pg_dist_colocation, then returns the created co-location id. Since there + * can be only one colocation group for all kinds of reference tables, if a co-location id + * is already created for reference tables, this function returns it without creating + * anything. + */ +uint32 +CreateReferenceTableColocationId() +{ + uint32 colocationId = INVALID_COLOCATION_ID; + List *workerNodeList = WorkerNodeList(); + int shardCount = 1; + int replicationFactor = list_length(workerNodeList); + Oid distributionColumnType = InvalidOid; + + /* check for existing colocations */ + colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType); + if (colocationId == INVALID_COLOCATION_ID) + { + colocationId = CreateColocationGroup(shardCount, replicationFactor, + distributionColumnType); + } + + return colocationId; +} diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index 461ddbb72..900bf509a 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -32,5 +32,6 @@ extern uint32 GetNextColocationId(void); extern void CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId); extern void CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId); +extern void DeleteColocationGroupIfNoTablesBelong(uint32 colocationId); #endif /* COLOCATION_UTILS_H_ */ diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 1e858ae6a..4e3298c05 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -65,6 +65,7 @@ extern void CopyShardInterval(ShardInterval *srcInterval, ShardInterval *destInt extern uint64 ShardLength(uint64 shardId); extern bool NodeHasActiveShardPlacements(char *nodeName, int32 nodePort); extern List * FinalizedShardPlacementList(uint64 shardId); +extern ShardPlacement * FinalizedShardPlacement(uint64 shardId, bool missingOk); extern List * ShardPlacementList(uint64 shardId); extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, HeapTuple heapTuple); @@ -76,6 +77,9 @@ extern void DeleteShardRow(uint64 shardId); extern void InsertShardPlacementRow(uint64 shardId, uint64 placementId, char shardState, uint64 shardLength, char *nodeName, uint32 nodePort); +extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, + Var *distributionColumn, uint32 colocationId, + char replicationModel); extern void DeletePartitionRow(Oid distributedRelationId); extern void DeleteShardRow(uint64 shardId); extern void UpdateShardPlacementState(uint64 placementId, char shardState); @@ -88,5 +92,6 @@ extern char * TableOwner(Oid relationId); extern void EnsureTablePermissions(Oid relationId, AclMode mode); extern void EnsureTableOwner(Oid relationId); extern void EnsureSuperUser(void); +extern bool TableReferenced(Oid relationId); #endif /* MASTER_METADATA_UTILITY_H */ diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 5412d37c9..e59ca1cf2 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -18,6 +18,7 @@ #include "c.h" #include "fmgr.h" +#include "distributed/shardinterval_utils.h" #include "nodes/pg_list.h" @@ -141,5 +142,12 @@ extern Datum master_create_worker_shards(PG_FUNCTION_ARGS); /* function declarations for shard repair functionality */ extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS); +/* function declarations for shard copy functinality */ +extern List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, + int32 sourceNodePort); +extern List * CopyShardForeignConstraintCommandList(ShardInterval *shardInterval); +extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList, + char *nodeName, uint32 nodePort, + bool missingOk); #endif /* MASTER_PROTOCOL_H */ diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h new file mode 100644 index 000000000..0ad98bbd9 --- /dev/null +++ b/src/include/distributed/reference_table_utils.h @@ -0,0 +1,17 @@ +/*------------------------------------------------------------------------- + * + * reference_table_utils.h + * + * Declarations for public utility functions related to reference tables. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef REFERENCE_TABLE_UTILS_H_ +#define REFERENCE_TABLE_UTILS_H_ + +extern uint32 CreateReferenceTableColocationId(void); + +#endif /* REFERENCE_TABLE_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 91394c98b..be7433906 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -69,6 +69,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-8'; ALTER EXTENSION citus UPDATE TO '6.1-9'; ALTER EXTENSION citus UPDATE TO '6.1-10'; ALTER EXTENSION citus UPDATE TO '6.1-11'; +ALTER EXTENSION citus UPDATE TO '6.1-12'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_upgrade_reference_table.out b/src/test/regress/expected/multi_upgrade_reference_table.out new file mode 100644 index 000000000..3ced545fa --- /dev/null +++ b/src/test/regress/expected/multi_upgrade_reference_table.out @@ -0,0 +1,762 @@ +-- +-- MULTI_UPGRADE_REFERENCE_TABLE +-- +-- Tests around upgrade_reference_table UDF +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1360000; +-- test with not distributed table +CREATE TABLE upgrade_reference_table_local(column1 int); +SELECT upgrade_to_reference_table('upgrade_reference_table_local'); +ERROR: cannot upgrade to reference table +DETAIL: Relation "upgrade_reference_table_local" is not distributed. +HINT: Instead, you can use; create_reference_table('upgrade_reference_table_local'); +-- test with table which has more than one shard +SET citus.shard_count TO 4; +CREATE TABLE upgrade_reference_table_multiple_shard(column1 int); +SELECT create_distributed_table('upgrade_reference_table_multiple_shard', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard'); +ERROR: cannot upgrade to reference table +DETAIL: Relation "upgrade_reference_table_multiple_shard" shard count is not one. Only relations with one shard can be upgraded to reference tables. +-- test with table which has no shard +CREATE TABLE upgrade_reference_table_no_shard(column1 int); +SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard'); +ERROR: cannot upgrade to reference table +DETAIL: Relation "upgrade_reference_table_no_shard" shard count is not one. Only relations with one shard can be upgraded to reference tables. +-- test with table with foreign keys +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE upgrade_reference_table_referenced(column1 int PRIMARY KEY); +SELECT create_distributed_table('upgrade_reference_table_referenced', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE upgrade_reference_table_referencing(column1 int REFERENCES upgrade_reference_table_referenced(column1)); +SELECT create_distributed_table('upgrade_reference_table_referencing', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT upgrade_to_reference_table('upgrade_reference_table_referenced'); +ERROR: cannot upgrade to reference table +DETAIL: Relation "upgrade_reference_table_referenced" is part of a foreign constraint. Foreign key constraints are not allowed from or to reference tables. +SELECT upgrade_to_reference_table('upgrade_reference_table_referencing'); +ERROR: cannot upgrade to reference table +DETAIL: Relation "upgrade_reference_table_referencing" is part of a foreign constraint. Foreign key constraints are not allowed from or to reference tables. +-- test with no healthy placements +CREATE TABLE upgrade_reference_table_unhealthy(column1 int); +SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006; +SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy'); +ERROR: could not find any healthy placement for shard 1360006 +-- test with table containing composite type +CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text); +\c - - - :worker_1_port +CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text); +\c - - - :master_port +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE upgrade_reference_table_composite(column1 int, column2 upgrade_test_composite_type); +SELECT create_distributed_table('upgrade_reference_table_composite', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT upgrade_to_reference_table('upgrade_reference_table_composite'); +ERROR: type "public.upgrade_test_composite_type" does not exist +CONTEXT: while executing command on localhost:57638 +-- test with reference table +CREATE TABLE upgrade_reference_table_reference(column1 int); +SELECT create_reference_table('upgrade_reference_table_reference'); + create_reference_table +------------------------ + +(1 row) + +SELECT upgrade_to_reference_table('upgrade_reference_table_reference'); +ERROR: cannot upgrade to reference table +DETAIL: Relation "upgrade_reference_table_reference" is already a reference table +-- test valid cases, append distributed table +CREATE TABLE upgrade_reference_table_append(column1 int); +SELECT create_distributed_table('upgrade_reference_table_append', 'column1', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +COPY upgrade_reference_table_append FROM STDIN; +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_append'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + a | f | 0 | c +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_append'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360009 | f | f +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ +(0 rows) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360009 | 1 | 8192 | localhost | 57637 | 379 +(1 row) + +SELECT upgrade_to_reference_table('upgrade_reference_table_append'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_append'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + n | t | 33 | t +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_append'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360009 | t | t +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 33 | 1 | 2 | 0 +(1 row) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360009 | 1 | 8192 | localhost | 57637 | 379 + 1360009 | 1 | 0 | localhost | 57638 | 380 +(2 rows) + +-- test valid cases, shard exists at one worker +CREATE TABLE upgrade_reference_table_one_worker(column1 int); +SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_one_worker'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + h | f | 32 | s +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_one_worker'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360010 | f | f +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 32 | 1 | 1 | 23 +(1 row) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360010 | 1 | 0 | localhost | 57637 | 381 +(1 row) + +SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_one_worker'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + n | t | 33 | t +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_one_worker'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360010 | t | t +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 33 | 1 | 2 | 0 +(1 row) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360010 | 1 | 0 | localhost | 57637 | 381 + 1360010 | 1 | 0 | localhost | 57638 | 382 +(2 rows) + +-- test valid cases, shard exists at both workers but one is unhealthy +SET citus.shard_replication_factor TO 2; +CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int); +SELECT create_distributed_table('upgrade_reference_table_one_unhealthy', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360010 AND nodeport = :worker_1_port; +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + h | f | 34 | c +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360011 | f | f +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 34 | 1 | 2 | 23 +(1 row) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360011 | 1 | 0 | localhost | 57637 | 383 + 1360011 | 1 | 0 | localhost | 57638 | 384 +(2 rows) + +SELECT upgrade_to_reference_table('upgrade_reference_table_one_unhealthy'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + n | t | 33 | t +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360011 | t | t +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 33 | 1 | 2 | 0 +(1 row) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360011 | 1 | 0 | localhost | 57637 | 383 + 1360011 | 1 | 0 | localhost | 57638 | 384 +(2 rows) + +-- test valid cases, shard exists at both workers and both are healthy +CREATE TABLE upgrade_reference_table_both_healthy(column1 int); +SELECT create_distributed_table('upgrade_reference_table_both_healthy', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + h | f | 35 | c +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360012 | f | f +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 35 | 1 | 2 | 23 +(1 row) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360012 | 1 | 0 | localhost | 57637 | 385 + 1360012 | 1 | 0 | localhost | 57638 | 386 +(2 rows) + +SELECT upgrade_to_reference_table('upgrade_reference_table_both_healthy'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + n | t | 33 | t +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360012 | t | t +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 33 | 1 | 2 | 0 +(1 row) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360012 | 1 | 0 | localhost | 57637 | 385 + 1360012 | 1 | 0 | localhost | 57638 | 386 +(2 rows) + +-- test valid cases, do it in transaction and ROLLBACK +SET citus.shard_replication_factor TO 1; +CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int); +SELECT create_distributed_table('upgrade_reference_table_transaction_rollback', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + h | f | 32 | s +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360013 | f | f +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 32 | 1 | 1 | 23 +(1 row) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360013 | 1 | 0 | localhost | 57637 | 387 +(1 row) + +BEGIN; +SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +ROLLBACK; +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + h | f | 32 | s +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360013 | f | f +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 32 | 1 | 1 | 23 +(1 row) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360013 | 1 | 0 | localhost | 57637 | 387 +(1 row) + +-- test valid cases, do it in transaction and COMMIT +SET citus.shard_replication_factor TO 1; +CREATE TABLE upgrade_reference_table_transaction_commit(column1 int); +SELECT create_distributed_table('upgrade_reference_table_transaction_commit', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + h | f | 32 | s +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360014 | f | f +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 32 | 1 | 1 | 23 +(1 row) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360014 | 1 | 0 | localhost | 57637 | 389 +(1 row) + +BEGIN; +SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +COMMIT; +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + n | t | 33 | t +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360014 | t | t +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 33 | 1 | 2 | 0 +(1 row) + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 1360014 | 1 | 0 | localhost | 57637 | 389 + 1360014 | 1 | 0 | localhost | 57638 | 390 +(2 rows) + +-- verify that shard is replicated to other worker +\c - - - :worker_2_port +\d upgrade_reference_table_transaction_commit_* +Table "public.upgrade_reference_table_transaction_commit_1360014" + Column | Type | Modifiers +---------+---------+----------- + column1 | integer | + +\c - - - :master_port +-- drop used tables to clean the workspace +DROP TABLE upgrade_reference_table_local; +DROP TABLE upgrade_reference_table_multiple_shard; +DROP TABLE upgrade_reference_table_no_shard; +DROP TABLE upgrade_reference_table_referencing; +DROP TABLE upgrade_reference_table_referenced; +DROP TABLE upgrade_reference_table_unhealthy; +DROP TABLE upgrade_reference_table_composite; +DROP TYPE upgrade_test_composite_type; +DROP TABLE upgrade_reference_table_reference; +DROP TABLE upgrade_reference_table_append; +DROP TABLE upgrade_reference_table_one_worker; +DROP TABLE upgrade_reference_table_one_unhealthy; +DROP TABLE upgrade_reference_table_both_healthy; +DROP TABLE upgrade_reference_table_transaction_rollback; +DROP TABLE upgrade_reference_table_transaction_commit; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 1029d0088..3fb257d68 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -205,3 +205,8 @@ test: multi_citus_tools # multi_foreign_key tests foreign key push down on distributed tables # ---------- test: multi_foreign_key + +# ---------- +# multi_upgrade_reference_table tests for upgrade_reference_table UDF +# ---------- +test: multi_upgrade_reference_table diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 4d28d8e04..72e695767 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -69,6 +69,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-8'; ALTER EXTENSION citus UPDATE TO '6.1-9'; ALTER EXTENSION citus UPDATE TO '6.1-10'; ALTER EXTENSION citus UPDATE TO '6.1-11'; +ALTER EXTENSION citus UPDATE TO '6.1-12'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_upgrade_reference_table.sql b/src/test/regress/sql/multi_upgrade_reference_table.sql new file mode 100644 index 000000000..acfec782e --- /dev/null +++ b/src/test/regress/sql/multi_upgrade_reference_table.sql @@ -0,0 +1,481 @@ +-- +-- MULTI_UPGRADE_REFERENCE_TABLE +-- +-- Tests around upgrade_reference_table UDF +-- + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1360000; + +-- test with not distributed table +CREATE TABLE upgrade_reference_table_local(column1 int); +SELECT upgrade_to_reference_table('upgrade_reference_table_local'); + +-- test with table which has more than one shard +SET citus.shard_count TO 4; +CREATE TABLE upgrade_reference_table_multiple_shard(column1 int); +SELECT create_distributed_table('upgrade_reference_table_multiple_shard', 'column1'); +SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard'); + +-- test with table which has no shard +CREATE TABLE upgrade_reference_table_no_shard(column1 int); +SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append'); +SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard'); + +-- test with table with foreign keys +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE upgrade_reference_table_referenced(column1 int PRIMARY KEY); +SELECT create_distributed_table('upgrade_reference_table_referenced', 'column1'); + +CREATE TABLE upgrade_reference_table_referencing(column1 int REFERENCES upgrade_reference_table_referenced(column1)); +SELECT create_distributed_table('upgrade_reference_table_referencing', 'column1'); + +SELECT upgrade_to_reference_table('upgrade_reference_table_referenced'); +SELECT upgrade_to_reference_table('upgrade_reference_table_referencing'); + +-- test with no healthy placements +CREATE TABLE upgrade_reference_table_unhealthy(column1 int); +SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1'); +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006; +SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy'); + +-- test with table containing composite type +CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text); + +\c - - - :worker_1_port +CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text); + +\c - - - :master_port +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE upgrade_reference_table_composite(column1 int, column2 upgrade_test_composite_type); +SELECT create_distributed_table('upgrade_reference_table_composite', 'column1'); +SELECT upgrade_to_reference_table('upgrade_reference_table_composite'); + +-- test with reference table +CREATE TABLE upgrade_reference_table_reference(column1 int); +SELECT create_reference_table('upgrade_reference_table_reference'); +SELECT upgrade_to_reference_table('upgrade_reference_table_reference'); + +-- test valid cases, append distributed table +CREATE TABLE upgrade_reference_table_append(column1 int); +SELECT create_distributed_table('upgrade_reference_table_append', 'column1', 'append'); +COPY upgrade_reference_table_append FROM STDIN; +1 +2 +3 +4 +5 +\. + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_append'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_append'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); + +SELECT upgrade_to_reference_table('upgrade_reference_table_append'); + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_append'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_append'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); + +-- test valid cases, shard exists at one worker +CREATE TABLE upgrade_reference_table_one_worker(column1 int); +SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1'); + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_one_worker'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_one_worker'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); + +SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker'); + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_one_worker'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_one_worker'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); + +-- test valid cases, shard exists at both workers but one is unhealthy +SET citus.shard_replication_factor TO 2; +CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int); +SELECT create_distributed_table('upgrade_reference_table_one_unhealthy', 'column1'); +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360010 AND nodeport = :worker_1_port; + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); + +SELECT upgrade_to_reference_table('upgrade_reference_table_one_unhealthy'); + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); + +-- test valid cases, shard exists at both workers and both are healthy +CREATE TABLE upgrade_reference_table_both_healthy(column1 int); +SELECT create_distributed_table('upgrade_reference_table_both_healthy', 'column1'); + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); + +SELECT upgrade_to_reference_table('upgrade_reference_table_both_healthy'); + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); + +-- test valid cases, do it in transaction and ROLLBACK +SET citus.shard_replication_factor TO 1; +CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int); +SELECT create_distributed_table('upgrade_reference_table_transaction_rollback', 'column1'); + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + +BEGIN; +SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback'); +ROLLBACK; + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + + logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + +-- test valid cases, do it in transaction and COMMIT +SET citus.shard_replication_factor TO 1; +CREATE TABLE upgrade_reference_table_transaction_commit(column1 int); +SELECT create_distributed_table('upgrade_reference_table_transaction_commit', 'column1'); + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); + +BEGIN; +SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit'); +COMMIT; + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); + +SELECT * +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); + +-- verify that shard is replicated to other worker +\c - - - :worker_2_port +\d upgrade_reference_table_transaction_commit_* +\c - - - :master_port + + +-- drop used tables to clean the workspace +DROP TABLE upgrade_reference_table_local; +DROP TABLE upgrade_reference_table_multiple_shard; +DROP TABLE upgrade_reference_table_no_shard; +DROP TABLE upgrade_reference_table_referencing; +DROP TABLE upgrade_reference_table_referenced; +DROP TABLE upgrade_reference_table_unhealthy; +DROP TABLE upgrade_reference_table_composite; +DROP TYPE upgrade_test_composite_type; +DROP TABLE upgrade_reference_table_reference; +DROP TABLE upgrade_reference_table_append; +DROP TABLE upgrade_reference_table_one_worker; +DROP TABLE upgrade_reference_table_one_unhealthy; +DROP TABLE upgrade_reference_table_both_healthy; +DROP TABLE upgrade_reference_table_transaction_rollback; +DROP TABLE upgrade_reference_table_transaction_commit;