Merge pull request #4445 from citusdata/marcocitus/remove-upgrade-to-reference-table

Remove upgrade_to_reference_table UDF
pull/4447/head
Marco Slot 2020-12-23 17:41:57 +01:00 committed by GitHub
commit f7b182ebeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 61 additions and 2188 deletions

View File

@ -1,5 +1,6 @@
-- citus--9.5-1--10.0-1
DROP FUNCTION pg_catalog.upgrade_to_reference_table(regclass);
DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass);
#include "udfs/citus_total_relation_size/10.0-1.sql"

View File

@ -9,3 +9,4 @@ DROP VIEW public.citus_tables;
DROP FUNCTION pg_catalog.citus_total_relation_size(regclass,boolean);
#include "../udfs/citus_total_relation_size/7.0-1.sql"
#include "../udfs/upgrade_to_reference_table/8.0-1.sql"

View File

@ -0,0 +1,6 @@
CREATE FUNCTION pg_catalog.upgrade_to_reference_table(table_name regclass)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$upgrade_to_reference_table$$;
COMMENT ON FUNCTION pg_catalog.upgrade_to_reference_table(table_name regclass)
IS 'upgrades an existing broadcast table to a reference table';

View File

@ -0,0 +1,6 @@
CREATE FUNCTION pg_catalog.upgrade_to_reference_table(table_name regclass)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$upgrade_to_reference_table$$;
COMMENT ON FUNCTION pg_catalog.upgrade_to_reference_table(table_name regclass)
IS 'upgrades an existing broadcast table to a reference table';

View File

@ -45,11 +45,8 @@ static StringInfo CopyShardPlacementToWorkerNodeQuery(
ShardPlacement *sourceShardPlacement,
WorkerNode *workerNode,
char transferMode);
static void ReplicateSingleShardTableToAllNodes(Oid relationId);
static void ReplicateShardToAllNodes(ShardInterval *shardInterval);
static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName,
int nodePort);
static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId);
static bool AnyRelationsModifiedInTransaction(List *relationIdList);
/* exports for SQL callable functions */
@ -134,9 +131,8 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
/*
* We only take an access share lock, otherwise we'll hold up master_add_node.
* In case of create_reference_table() and upgrade_to_reference_table(), where
* we don't want concurrent writes to pg_dist_node, we have already acquired
* ShareLock on pg_dist_node.
* In case of create_reference_table() where we don't want concurrent writes
* to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
*/
List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId,
AccessShareLock);
@ -318,157 +314,14 @@ CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement,
/*
* upgrade_to_reference_table accepts a broadcast table which has only one shard and
* replicates it across all nodes to create a reference table. It also modifies related
* metadata to mark the table as reference.
* upgrade_to_reference_table was removed, but we maintain a dummy implementation
* to support downgrades.
*/
Datum
upgrade_to_reference_table(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
CheckCitusVersion(ERROR);
EnsureCoordinator();
EnsureTableOwner(relationId);
if (!IsCitusTable(relationId))
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot upgrade to reference table"),
errdetail("Relation \"%s\" is not distributed.", relationName),
errhint("Instead, you can use; "
"create_reference_table('%s');", relationName)));
}
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
if (IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE))
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot upgrade to reference table"),
errdetail("Relation \"%s\" is already a reference table",
relationName)));
}
else if (IsCitusTableTypeCacheEntry(tableEntry, CITUS_LOCAL_TABLE))
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot upgrade to reference table"),
errdetail("Relation \"%s\" is a citus local table and "
"currently it is not supported to upgrade "
"a citus local table to a reference table ",
relationName)));
}
if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot upgrade to reference table"),
errdetail("Upgrade is only supported for statement-based "
"replicated tables but \"%s\" is streaming replicated",
relationName)));
}
LockRelationOid(relationId, AccessExclusiveLock);
List *shardIntervalList = LoadShardIntervalList(relationId);
if (list_length(shardIntervalList) != 1)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot upgrade to reference table"),
errdetail("Relation \"%s\" shard count is not one. Only "
"relations with one shard can be upgraded to "
"reference tables.", relationName)));
}
EnsureReferenceTablesExistOnAllNodes();
ReplicateSingleShardTableToAllNodes(relationId);
PG_RETURN_VOID();
}
/*
* ReplicateSingleShardTableToAllNodes accepts a broadcast table and replicates
* it to all worker nodes, and the coordinator if it has been added by the user
* to pg_dist_node. It assumes that caller of this function ensures that given
* broadcast table has only one shard.
*/
static void
ReplicateSingleShardTableToAllNodes(Oid relationId)
{
List *shardIntervalList = LoadShardIntervalList(relationId);
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
List *foreignConstraintCommandList = CopyShardForeignConstraintCommandList(
shardInterval);
if (foreignConstraintCommandList != NIL || TableReferenced(relationId))
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot upgrade to reference table"),
errdetail("Relation \"%s\" is part of a foreign constraint. "
"Foreign key constraints are not allowed "
"from or to reference tables.", relationName)));
}
/*
* ReplicateShardToAllNodes function opens separate transactions (i.e., not part
* of any coordinated transactions) to each worker and replicates given shard to all
* workers. If a worker already has a healthy replica of given shard, it skips that
* worker to prevent copying unnecessary data.
*/
ReplicateShardToAllNodes(shardInterval);
/*
* We need to update metadata tables to mark this table as reference table. We modify
* pg_dist_partition, pg_dist_colocation and pg_dist_shard tables in
* ConvertToReferenceTableMetadata function.
*/
ConvertToReferenceTableMetadata(relationId, shardId);
/*
* After the table has been officially marked as a reference table, we need to create
* the reference table itself and insert its pg_dist_partition, pg_dist_shard and
* existing pg_dist_placement rows.
*/
CreateTableMetadataOnWorkers(relationId);
}
/*
* ReplicateShardToAllNodes function replicates given shard to all nodes
* in separate transactions. While replicating, it only replicates the shard to the
* nodes which does not have a healthy replica of the shard. However, this function
* does not obtain any lock on shard resource and shard metadata. It is caller's
* responsibility to take those locks.
*/
static void
ReplicateShardToAllNodes(ShardInterval *shardInterval)
{
/* prevent concurrent pg_dist_node changes */
List *workerNodeList = ReferenceTablePlacementNodeList(ShareLock);
/*
* We will iterate over all worker nodes and if a healthy placement does not exist
* at given node we will copy the shard to that node. Then we will also modify
* the metadata to reflect newly copied shard.
*/
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort;
ReplicateShardToNode(shardInterval, nodeName, nodePort);
}
errmsg("this function is deprecated and no longer used")));
}
@ -548,34 +401,6 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
}
/*
* ConvertToReferenceTableMetadata accepts a broadcast table and modifies its metadata to
* reference table metadata. To do this, this function updates pg_dist_partition,
* pg_dist_colocation and pg_dist_shard. This function assumes that caller ensures that
* given broadcast table has only one shard.
*/
static void
ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId)
{
uint32 currentColocationId = TableColocationId(relationId);
uint32 newColocationId = CreateReferenceTableColocationId();
Var *distributionColumn = NULL;
char shardStorageType = ShardStorageType(relationId);
text *shardMinValue = NULL;
text *shardMaxValue = NULL;
/* delete old metadata rows */
DeletePartitionRow(relationId);
DeleteColocationGroupIfNoTablesBelong(currentColocationId);
DeleteShardRow(shardId);
/* insert new metadata rows */
InsertIntoPgDistPartition(relationId, DISTRIBUTE_BY_NONE, distributionColumn,
newColocationId, REPLICATION_MODEL_2PC);
InsertShardRow(relationId, shardId, shardStorageType, shardMinValue, shardMaxValue);
}
/*
* CreateReferenceTableColocationId creates a new co-location id for reference tables and
* writes it into pg_dist_colocation, then returns the created co-location id. Since there

View File

@ -307,9 +307,6 @@ SELECT mark_tables_colocated('citus_local_table_1', ARRAY['distributed_table']);
ERROR: citus local tables cannot be colocated with other tables
SELECT mark_tables_colocated('distributed_table', ARRAY['citus_local_table_1']);
ERROR: citus local tables cannot be colocated with other tables
-- upgrade_to_reference_table is not supported
SELECT upgrade_to_reference_table('citus_local_table_1');
ERROR: cannot upgrade to reference table
-- master_create_empty_shard is not supported
SELECT master_create_empty_shard('citus_local_table_1');
ERROR: relation "citus_local_table_1" is a citus local table
@ -618,8 +615,6 @@ BEGIN;
ROLLBACK;
-- should fail --
SELECT upgrade_to_reference_table('citus_local_table_4');
ERROR: cannot upgrade to reference table
SELECT update_distributed_table_colocation('citus_local_table_4', colocate_with => 'none');
ERROR: relation citus_local_table_4 should be a hash distributed table
SELECT master_create_worker_shards('citus_local_table_4', 10, 1);

View File

@ -446,6 +446,7 @@ SELECT * FROM print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
function citus_total_relation_size(regclass) |
function upgrade_to_reference_table(regclass) |
| access method columnar
| function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean)
| function alter_columnar_table_set(regclass,integer,integer,name,integer)
@ -458,7 +459,7 @@ SELECT * FROM print_extension_changes();
| table columnar.columnar_stripes
| table columnar.options
| view citus_tables
(13 rows)
(14 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -446,6 +446,7 @@ SELECT * FROM print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
function citus_total_relation_size(regclass) |
function upgrade_to_reference_table(regclass) |
| function citus_internal.columnar_ensure_objects_exist()
| function citus_total_relation_size(regclass,boolean)
| schema columnar
@ -454,7 +455,7 @@ SELECT * FROM print_extension_changes();
| table columnar.columnar_stripes
| table columnar.options
| view citus_tables
(9 rows)
(10 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -312,9 +312,6 @@ ERROR: permission denied for table test
ABORT;
SELECT * FROM citus_stat_statements_reset();
ERROR: permission denied for function citus_stat_statements_reset
-- should not be allowed to upgrade to reference table
SELECT upgrade_to_reference_table('singleshard');
ERROR: must be owner of table singleshard
-- should not be allowed to co-located tables
SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]);
ERROR: must be owner of table test
@ -572,66 +569,6 @@ SELECT create_distributed_table('full_access_user_schema.t2', 'id');
(1 row)
RESET ROLE;
-- a user with all privileges on a schema should be able to upgrade a distributed table to
-- a reference table
SET ROLE full_access;
BEGIN;
CREATE TABLE full_access_user_schema.r1(id int);
SET LOCAL citus.shard_count TO 1;
SELECT create_distributed_table('full_access_user_schema.r1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT upgrade_to_reference_table('full_access_user_schema.r1');
upgrade_to_reference_table
---------------------------------------------------------------------
(1 row)
COMMIT;
RESET ROLE;
-- the super user should be able to upgrade a distributed table to a reference table, even
-- if it is owned by another user
SET ROLE full_access;
BEGIN;
CREATE TABLE full_access_user_schema.r2(id int);
SET LOCAL citus.shard_count TO 1;
SELECT create_distributed_table('full_access_user_schema.r2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
COMMIT;
RESET ROLE;
-- the usage_access should not be able to upgrade the table
SET ROLE usage_access;
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
ERROR: must be owner of table r2
RESET ROLE;
-- the super user should be able
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
upgrade_to_reference_table
---------------------------------------------------------------------
(1 row)
-- verify the owner of the shards for the reference table
SELECT result FROM run_command_on_workers($cmd$
SELECT tableowner FROM pg_tables WHERE
true
AND schemaname = 'full_access_user_schema'
AND tablename LIKE 'r2_%'
LIMIT 1;
$cmd$);
result
---------------------------------------------------------------------
full_access
full_access
(2 rows)
-- super user should be the only one being able to call worker_cleanup_job_schema_cache
SELECT worker_cleanup_job_schema_cache();
worker_cleanup_job_schema_cache
@ -756,11 +693,9 @@ SELECT citus_rm_job_directory(42::bigint);
\c - - - :master_port
DROP SCHEMA full_access_user_schema CASCADE;
NOTICE: drop cascades to 4 other objects
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table full_access_user_schema.t1
drop cascades to table full_access_user_schema.t2
drop cascades to table full_access_user_schema.r1
drop cascades to table full_access_user_schema.r2
DROP TABLE
my_table,
my_table_with_data,

View File

@ -359,15 +359,6 @@ SELECT create_reference_table('replicate_reference_table_reference_one');
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming';
CREATE TABLE replicate_reference_table_hash(column1 int);
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='replicate_reference_table_hash'::regclass;
CREATE TABLE replicate_reference_table_reference_two(column1 int);
-- status before master_add_node
SELECT
@ -396,13 +387,12 @@ SELECT
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_reference_two')
ORDER BY logicalrelid;
logicalrelid | partmethod | ?column? | repmodel
---------------------------------------------------------------------
replicate_reference_table_reference_one | n | t | t
replicate_reference_table_hash | h | f | c
(2 rows)
(1 row)
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
@ -411,12 +401,6 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
1
(1 row)
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
upgrade_to_reference_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('replicate_reference_table_reference_two');
create_reference_table
---------------------------------------------------------------------
@ -436,8 +420,7 @@ ORDER BY shardid, nodeport;
---------------------------------------------------------------------
1370004 | 1 | 0 | localhost | 57638
1370005 | 1 | 0 | localhost | 57638
1370006 | 1 | 0 | localhost | 57638
(3 rows)
(2 rows)
SELECT shardcount, replicationfactor, distributioncolumntype, distributioncolumncollation FROM pg_dist_colocation
WHERE colocationid IN
@ -454,18 +437,16 @@ SELECT
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_reference_two')
ORDER BY
logicalrelid;
logicalrelid | partmethod | ?column? | repmodel
---------------------------------------------------------------------
replicate_reference_table_reference_one | n | t | t
replicate_reference_table_hash | n | t | t
replicate_reference_table_reference_two | n | t | t
(3 rows)
(2 rows)
DROP TABLE replicate_reference_table_reference_one;
DROP TABLE replicate_reference_table_hash;
DROP TABLE replicate_reference_table_reference_two;
-- test inserting a value then adding a new node in a transaction
SELECT master_remove_node('localhost', :worker_2_port);
@ -733,7 +714,7 @@ WHERE
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370015 | 1 | 0 | localhost | 57637
1370014 | 1 | 0 | localhost | 57637
(1 row)
-- we should see the two shard placements after activation
@ -758,7 +739,7 @@ WHERE
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370015 | 1 | 0 | localhost | 57637
1370014 | 1 | 0 | localhost | 57637
(1 row)
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);

File diff suppressed because it is too large Load Diff

View File

@ -155,7 +155,6 @@ ORDER BY 1;
function truncate_local_data_after_distributing_table(regclass)
function undistribute_table(regclass)
function update_distributed_table_colocation(regclass,text)
function upgrade_to_reference_table(regclass)
function worker_append_table_to_shard(text,text,text,integer)
function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text)
function worker_apply_sequence_command(text)
@ -218,5 +217,5 @@ ORDER BY 1;
view citus_tables
view citus_worker_stat_activity
view pg_dist_shard_placement
(202 rows)
(201 rows)

View File

@ -151,7 +151,6 @@ ORDER BY 1;
function truncate_local_data_after_distributing_table(regclass)
function undistribute_table(regclass)
function update_distributed_table_colocation(regclass,text)
function upgrade_to_reference_table(regclass)
function worker_append_table_to_shard(text,text,text,integer)
function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text)
function worker_apply_sequence_command(text)
@ -214,5 +213,5 @@ ORDER BY 1;
view citus_tables
view citus_worker_stat_activity
view pg_dist_shard_placement
(198 rows)
(197 rows)

View File

@ -304,11 +304,9 @@ test: node_conninfo_reload
test: multi_foreign_key multi_foreign_key_relation_graph
# ----------
# multi_upgrade_reference_table tests for upgrade_reference_table UDF
# multi_replicate_reference_table tests replicating reference tables to new nodes after we add new nodes
# multi_remove_node_reference_table tests metadata changes after master_remove_node
# ----------
test: multi_upgrade_reference_table
test: multi_replicate_reference_table
test: multi_remove_node_reference_table
@ -317,8 +315,6 @@ test: multi_remove_node_reference_table
# and rerun some of the tests.
# --------
test: add_coordinator
test: multi_upgrade_reference_table
test: multi_replicate_reference_table
test: multi_reference_table citus_local_tables_queries
test: foreign_key_to_reference_table citus_local_table_triggers
test: replicate_reference_tables_to_coordinator

View File

@ -230,8 +230,6 @@ SELECT mark_tables_colocated('reference_table', ARRAY['citus_local_table_1']);
SELECT mark_tables_colocated('citus_local_table_1', ARRAY['distributed_table']);
SELECT mark_tables_colocated('distributed_table', ARRAY['citus_local_table_1']);
-- upgrade_to_reference_table is not supported
SELECT upgrade_to_reference_table('citus_local_table_1');
-- master_create_empty_shard is not supported
SELECT master_create_empty_shard('citus_local_table_1');
-- get_shard_id_for_distribution_column is supported
@ -421,7 +419,6 @@ ROLLBACK;
-- should fail --
SELECT upgrade_to_reference_table('citus_local_table_4');
SELECT update_distributed_table_colocation('citus_local_table_4', colocate_with => 'none');
SELECT master_create_worker_shards('citus_local_table_4', 10, 1);

View File

@ -192,9 +192,6 @@ ABORT;
SELECT * FROM citus_stat_statements_reset();
-- should not be allowed to upgrade to reference table
SELECT upgrade_to_reference_table('singleshard');
-- should not be allowed to co-located tables
SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]);
@ -339,44 +336,6 @@ CREATE TABLE full_access_user_schema.t2(id int);
SELECT create_distributed_table('full_access_user_schema.t2', 'id');
RESET ROLE;
-- a user with all privileges on a schema should be able to upgrade a distributed table to
-- a reference table
SET ROLE full_access;
BEGIN;
CREATE TABLE full_access_user_schema.r1(id int);
SET LOCAL citus.shard_count TO 1;
SELECT create_distributed_table('full_access_user_schema.r1', 'id');
SELECT upgrade_to_reference_table('full_access_user_schema.r1');
COMMIT;
RESET ROLE;
-- the super user should be able to upgrade a distributed table to a reference table, even
-- if it is owned by another user
SET ROLE full_access;
BEGIN;
CREATE TABLE full_access_user_schema.r2(id int);
SET LOCAL citus.shard_count TO 1;
SELECT create_distributed_table('full_access_user_schema.r2', 'id');
COMMIT;
RESET ROLE;
-- the usage_access should not be able to upgrade the table
SET ROLE usage_access;
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
RESET ROLE;
-- the super user should be able
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
-- verify the owner of the shards for the reference table
SELECT result FROM run_command_on_workers($cmd$
SELECT tableowner FROM pg_tables WHERE
true
AND schemaname = 'full_access_user_schema'
AND tablename LIKE 'r2_%'
LIMIT 1;
$cmd$);
-- super user should be the only one being able to call worker_cleanup_job_schema_cache
SELECT worker_cleanup_job_schema_cache();
SET ROLE full_access;

View File

@ -235,11 +235,6 @@ SELECT create_reference_table('replicate_reference_table_reference_one');
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming';
CREATE TABLE replicate_reference_table_hash(column1 int);
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='replicate_reference_table_hash'::regclass;
CREATE TABLE replicate_reference_table_reference_two(column1 int);
@ -264,12 +259,11 @@ SELECT
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_reference_two')
ORDER BY logicalrelid;
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
SELECT create_reference_table('replicate_reference_table_reference_two');
RESET client_min_messages;
@ -293,12 +287,11 @@ SELECT
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_reference_two')
ORDER BY
logicalrelid;
DROP TABLE replicate_reference_table_reference_one;
DROP TABLE replicate_reference_table_hash;
DROP TABLE replicate_reference_table_reference_two;

View File

@ -1,730 +0,0 @@
--
-- MULTI_UPGRADE_REFERENCE_TABLE
--
-- Tests around upgrade_reference_table UDF
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000;
-- We run this twice, once with coordinator node in pg_dist_node and once without.
-- Set client_min_messages to WARNING to discard NOTICE messages by
-- upgrade_to_reference_table() to make the output consistent in both cases.
-- We check that reference table placements were actually replicated by checking
-- pg_dist_placement.
SET client_min_messages TO WARNING;
-- test with not distributed table
CREATE TABLE upgrade_reference_table_local(column1 int);
SELECT upgrade_to_reference_table('upgrade_reference_table_local');
DROP TABLE upgrade_reference_table_local;
-- test with table which has more than one shard
SET citus.shard_count TO 4;
CREATE TABLE upgrade_reference_table_multiple_shard(column1 int);
SELECT create_distributed_table('upgrade_reference_table_multiple_shard', 'column1');
SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard');
DROP TABLE upgrade_reference_table_multiple_shard;
-- test with table which has no shard
CREATE TABLE upgrade_reference_table_no_shard(column1 int);
SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append');
SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard');
DROP TABLE upgrade_reference_table_no_shard;
-- test with table with foreign keys
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_referenced(column1 int PRIMARY KEY);
SELECT create_distributed_table('upgrade_reference_table_referenced', 'column1');
CREATE TABLE upgrade_reference_table_referencing(column1 int REFERENCES upgrade_reference_table_referenced(column1));
SELECT create_distributed_table('upgrade_reference_table_referencing', 'column1');
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referenced'::regclass;
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referencing'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_referenced');
SELECT upgrade_to_reference_table('upgrade_reference_table_referencing');
DROP TABLE upgrade_reference_table_referencing;
DROP TABLE upgrade_reference_table_referenced;
-- test with no healthy placements
CREATE TABLE upgrade_reference_table_unhealthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass;
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_unhealthy'::regclass::oid);
SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy');
DROP TABLE upgrade_reference_table_unhealthy;
-- test with table containing composite type
CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text);
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_composite(column1 int, column2 upgrade_test_composite_type);
SELECT create_distributed_table('upgrade_reference_table_composite', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_composite');
DROP TABLE upgrade_reference_table_composite;
DROP TYPE upgrade_test_composite_type;
-- test with reference table
CREATE TABLE upgrade_reference_table_reference(column1 int);
SELECT create_reference_table('upgrade_reference_table_reference');
SELECT upgrade_to_reference_table('upgrade_reference_table_reference');
DROP TABLE upgrade_reference_table_reference;
-- test valid cases, append distributed table
CREATE TABLE upgrade_reference_table_append(column1 int);
SELECT create_distributed_table('upgrade_reference_table_append', 'column1', 'append');
COPY upgrade_reference_table_append FROM STDIN;
1
2
3
4
5
\.
SELECT colocationid AS reference_table_colocationid FROM pg_dist_colocation WHERE distributioncolumntype=0 \gset
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_append'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_append'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
SELECT count(*) active_primaries FROM pg_dist_node WHERE isactive AND noderole='primary' \gset
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass)
GROUP BY shardid
ORDER BY shardid;
SELECT upgrade_to_reference_table('upgrade_reference_table_append');
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_append'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_append'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass)
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_append;
-- test valid cases, shard exists at one worker
CREATE TABLE upgrade_reference_table_one_worker(column1 int);
SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_one_worker'::regclass;
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass)
GROUP BY shardid
ORDER BY shardid;
SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker');
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass)
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_one_worker;
-- test valid cases, shard exists at both workers but one is unhealthy
SET citus.shard_replication_factor TO 2;
CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_one_unhealthy', 'column1');
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass::oid) AND nodeport = :worker_1_port;
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass)
AND shardstate = 1
GROUP BY shardid
ORDER BY shardid;
SELECT upgrade_to_reference_table('upgrade_reference_table_one_unhealthy');
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass)
AND shardstate = 1
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_one_unhealthy;
-- test valid cases, shard exists at both workers and both are healthy
CREATE TABLE upgrade_reference_table_both_healthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_both_healthy', 'column1');
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
SELECT
shardid
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass)
GROUP BY shardid
ORDER BY shardid;
SELECT upgrade_to_reference_table('upgrade_reference_table_both_healthy');
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass)
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_both_healthy;
-- test valid cases, do it in transaction and ROLLBACK
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int);
SELECT create_distributed_table('upgrade_reference_table_transaction_rollback', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_rollback'::regclass;
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass)
GROUP BY shardid
ORDER BY shardid;
BEGIN;
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback');
ROLLBACK;
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass)
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_transaction_rollback;
-- test valid cases, do it in transaction and COMMIT
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_transaction_commit(column1 int);
SELECT create_distributed_table('upgrade_reference_table_transaction_commit', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_commit'::regclass;
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass)
GROUP BY shardid
ORDER BY shardid;
BEGIN;
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit');
COMMIT;
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass)
GROUP BY shardid
ORDER BY shardid;
-- verify that shard is replicated to other worker
\c - - - :worker_2_port
\dt upgrade_reference_table_transaction_commit_*
\c - - - :master_port
DROP TABLE upgrade_reference_table_transaction_commit;
-- create an mx table
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO 'streaming';
CREATE TABLE upgrade_reference_table_mx(column1 int);
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
-- verify that streaming replicated tables cannot be upgraded to reference tables
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
GROUP BY shardid
ORDER BY shardid;
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_mx;
-- test valid cases, do it with MX
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
RESET citus.replication_model;
CREATE TABLE upgrade_reference_table_mx(column1 int);
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE nodeport = :worker_2_port AND
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='upgrade_reference_table_mx'::regclass);
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
GROUP BY shardid
ORDER BY shardid;
SET client_min_messages TO WARNING;
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT shardcount, replicationfactor, distributioncolumntype
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
GROUP BY shardid
ORDER BY shardid;
-- situation on metadata worker
\c - - - :worker_1_port
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
GROUP BY shardid
ORDER BY shardid;
\c - - - :master_port
DROP TABLE upgrade_reference_table_mx;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
RESET client_min_messages;