Make `upgrade_to_reference_table` function MX-compatible

pull/1103/head
Eren Basak 2017-01-17 18:22:05 +03:00
parent 56ca590daa
commit e7c15ecc1f
8 changed files with 613 additions and 90 deletions

View File

@ -79,7 +79,6 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN
char *colocateWithTableName,
int shardCount, int replicationFactor);
static Oid ColumnType(Oid relationId, char *columnName);
static void CreateTableMetadataOnWorkers(Oid relationId);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table);
@ -980,29 +979,3 @@ ColumnType(Oid relationId, char *columnName)
return columnType;
}
/*
* CreateTableMetadataOnWorkers creates the list of commands needed to create the
* given distributed table and sends these commands to all metadata workers i.e. workers
* with hasmetadata=true. Before sending the commands, in order to prevent recursive
* propagation, DDL propagation on workers are disabled with a
* `SET citus.enable_ddl_propagation TO off;` command.
*/
static void
CreateTableMetadataOnWorkers(Oid relationId)
{
List *commandList = GetDistributedTableDDLEvents(relationId);
ListCell *commandCell = NULL;
/* prevenet recursive propagation */
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
}

View File

@ -952,3 +952,29 @@ HasMetadataWorkers(void)
return false;
}
/*
* CreateTableMetadataOnWorkers creates the list of commands needed to create the
* given distributed table and sends these commands to all metadata workers i.e. workers
* with hasmetadata=true. Before sending the commands, in order to prevent recursive
* propagation, DDL propagation on workers are disabled with a
* `SET citus.enable_ddl_propagation TO off;` command.
*/
void
CreateTableMetadataOnWorkers(Oid relationId)
{
List *commandList = GetDistributedTableDDLEvents(relationId);
ListCell *commandCell = NULL;
/* prevent recursive propagation */
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
}

View File

@ -55,6 +55,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
List *shardIntervalList = NIL;
ShardInterval *shardInterval = NULL;
uint64 shardId = INVALID_SHARD_ID;
DistTableCacheEntry *tableEntry = NULL;
EnsureSchemaNode();
@ -68,7 +69,9 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
"create_reference_table('%s');", relationName)));
}
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
tableEntry = DistributedTableCacheEntry(relationId);
if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@ -77,6 +80,16 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
relationName)));
}
if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot upgrade to reference table"),
errdetail("Upgrade is only supported for statement-based "
"replicated tables but \"%s\" is streaming replicated",
relationName)));
}
shardIntervalList = LoadShardIntervalList(relationId);
if (list_length(shardIntervalList) != 1)
{
@ -199,11 +212,18 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId)
ReplicateShardToAllWorkers(shardInterval);
/*
* After copying the shards, we need to update metadata tables to mark this table as
* reference table. We modify pg_dist_partition, pg_dist_colocation and pg_dist_shard
* tables in ConvertToReferenceTableMetadata function.
* We need to update metadata tables to mark this table as reference table. We modify
* pg_dist_partition, pg_dist_colocation and pg_dist_shard tables in
* ConvertToReferenceTableMetadata function.
*/
ConvertToReferenceTableMetadata(relationId, shardId);
/*
* After the table has been officially marked as a reference table, we need to create
* the reference table itself and insert its pg_dist_partition, pg_dist_shard and
* existing pg_dist_shard_placement rows.
*/
CreateTableMetadataOnWorkers(relationId);
}

View File

@ -33,6 +33,7 @@ extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId);
extern char * CreateSchemaDDLCommand(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, char *nodeName, uint32 nodePort);
extern void CreateTableMetadataOnWorkers(Oid relationId);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node"

View File

@ -360,6 +360,8 @@ SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
(1 row)
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='replicate_reference_table_hash'::regclass;
CREATE TABLE replicate_reference_table_reference_two(column1 int);
-- status before master_add_node
SELECT
@ -388,11 +390,12 @@ SELECT
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY logicalrelid;
logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 1370004 | t
replicate_reference_table_hash | h | 1370005 | s
replicate_reference_table_hash | h | 1370005 | c
(2 rows)
BEGIN;
@ -448,7 +451,9 @@ SELECT
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY
logicalrelid;
logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 1370004 | t

View File

@ -12,6 +12,7 @@ SELECT upgrade_to_reference_table('upgrade_reference_table_local');
ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_local" is not distributed.
HINT: Instead, you can use; create_reference_table('upgrade_reference_table_local');
DROP TABLE upgrade_reference_table_local;
-- test with table which has more than one shard
SET citus.shard_count TO 4;
CREATE TABLE upgrade_reference_table_multiple_shard(column1 int);
@ -24,6 +25,7 @@ SELECT create_distributed_table('upgrade_reference_table_multiple_shard', 'colum
SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard');
ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_multiple_shard" shard count is not one. Only relations with one shard can be upgraded to reference tables.
DROP TABLE upgrade_reference_table_multiple_shard;
-- test with table which has no shard
CREATE TABLE upgrade_reference_table_no_shard(column1 int);
SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append');
@ -35,6 +37,7 @@ SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', '
SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard');
ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_no_shard" shard count is not one. Only relations with one shard can be upgraded to reference tables.
DROP TABLE upgrade_reference_table_no_shard;
-- test with table with foreign keys
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
@ -52,12 +55,17 @@ SELECT create_distributed_table('upgrade_reference_table_referencing', 'column1'
(1 row)
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referenced'::regclass;
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referencing'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_referenced');
ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_referenced" is part of a foreign constraint. Foreign key constraints are not allowed from or to reference tables.
SELECT upgrade_to_reference_table('upgrade_reference_table_referencing');
ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_referencing" is part of a foreign constraint. Foreign key constraints are not allowed from or to reference tables.
DROP TABLE upgrade_reference_table_referencing;
DROP TABLE upgrade_reference_table_referenced;
-- test with no healthy placements
CREATE TABLE upgrade_reference_table_unhealthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1');
@ -66,9 +74,11 @@ SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1');
(1 row)
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass;
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006;
SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy');
ERROR: could not find any healthy placement for shard 1360006
DROP TABLE upgrade_reference_table_unhealthy;
-- test with table containing composite type
CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text);
\c - - - :worker_1_port
@ -83,9 +93,11 @@ SELECT create_distributed_table('upgrade_reference_table_composite', 'column1');
(1 row)
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_composite');
ERROR: type "public.upgrade_test_composite_type" does not exist
CONTEXT: while executing command on localhost:57638
DROP TABLE upgrade_reference_table_composite;
-- test with reference table
CREATE TABLE upgrade_reference_table_reference(column1 int);
SELECT create_reference_table('upgrade_reference_table_reference');
@ -97,6 +109,7 @@ SELECT create_reference_table('upgrade_reference_table_reference');
SELECT upgrade_to_reference_table('upgrade_reference_table_reference');
ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_reference" is already a reference table
DROP TABLE upgrade_reference_table_reference;
-- test valid cases, append distributed table
CREATE TABLE upgrade_reference_table_append(column1 int);
SELECT create_distributed_table('upgrade_reference_table_append', 'column1', 'append');
@ -166,7 +179,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_append'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 1360002 | t
n | t | 1360005 | t
(1 row)
SELECT
@ -188,7 +201,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360002 | 1 | 2 | 0
1360005 | 1 | 2 | 0
(1 row)
SELECT
@ -206,6 +219,8 @@ ORDER BY
1360009 | 1 | 0 | localhost | 57638
(2 rows)
DROP TABLE upgrade_reference_table_append;
-- test valid cases, shard exists at one worker
CREATE TABLE upgrade_reference_table_one_worker(column1 int);
SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1');
@ -214,6 +229,7 @@ SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1')
(1 row)
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_one_worker'::regclass;
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
@ -223,7 +239,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360001 | s
h | f | 1360006 | c
(1 row)
SELECT
@ -245,7 +261,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360001 | 1 | 1 | 23
1360006 | 1 | 1 | 23
(1 row)
SELECT
@ -275,7 +291,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 1360002 | t
n | t | 1360007 | t
(1 row)
SELECT
@ -297,7 +313,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360002 | 1 | 2 | 0
1360007 | 1 | 2 | 0
(1 row)
SELECT
@ -315,6 +331,8 @@ ORDER BY
1360010 | 1 | 0 | localhost | 57638
(2 rows)
DROP TABLE upgrade_reference_table_one_worker;
-- test valid cases, shard exists at both workers but one is unhealthy
SET citus.shard_replication_factor TO 2;
CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int);
@ -334,7 +352,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360003 | c
h | f | 1360008 | c
(1 row)
SELECT
@ -356,7 +374,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360003 | 1 | 2 | 23
1360008 | 1 | 2 | 23
(1 row)
SELECT
@ -389,7 +407,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 1360002 | t
n | t | 1360009 | t
(1 row)
SELECT
@ -411,7 +429,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360002 | 1 | 2 | 0
1360009 | 1 | 2 | 0
(1 row)
SELECT
@ -429,6 +447,8 @@ ORDER BY
1360011 | 1 | 0 | localhost | 57638
(2 rows)
DROP TABLE upgrade_reference_table_one_unhealthy;
-- test valid cases, shard exists at both workers and both are healthy
CREATE TABLE upgrade_reference_table_both_healthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_both_healthy', 'column1');
@ -446,7 +466,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360004 | c
h | f | 1360010 | c
(1 row)
SELECT
@ -468,7 +488,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360004 | 1 | 2 | 23
1360010 | 1 | 2 | 23
(1 row)
SELECT
@ -501,7 +521,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 1360002 | t
n | t | 1360011 | t
(1 row)
SELECT
@ -523,7 +543,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360002 | 1 | 2 | 0
1360011 | 1 | 2 | 0
(1 row)
SELECT
@ -541,6 +561,8 @@ ORDER BY
1360012 | 1 | 0 | localhost | 57638
(2 rows)
DROP TABLE upgrade_reference_table_both_healthy;
-- test valid cases, do it in transaction and ROLLBACK
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int);
@ -550,6 +572,7 @@ SELECT create_distributed_table('upgrade_reference_table_transaction_rollback',
(1 row)
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_rollback'::regclass;
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
@ -559,7 +582,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360001 | s
h | f | 1360012 | c
(1 row)
SELECT
@ -581,7 +604,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360001 | 1 | 1 | 23
1360012 | 1 | 1 | 23
(1 row)
SELECT
@ -613,7 +636,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360001 | s
h | f | 1360012 | c
(1 row)
SELECT
@ -627,6 +650,9 @@ WHERE
1360013 | f | f
(1 row)
-- eliminate the duplicate intermediate duplicate rows in pg_dist_colocation
VACUUM ANALYZE pg_dist_colocation;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
@ -635,7 +661,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360001 | 1 | 1 | 23
1360012 | 1 | 1 | 23
(1 row)
SELECT
@ -650,6 +676,8 @@ WHERE shardid IN
1360013 | 1 | 0 | localhost | 57637
(1 row)
DROP TABLE upgrade_reference_table_transaction_rollback;
-- test valid cases, do it in transaction and COMMIT
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_transaction_commit(column1 int);
@ -659,6 +687,7 @@ SELECT create_distributed_table('upgrade_reference_table_transaction_commit', 'c
(1 row)
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_commit'::regclass;
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
@ -668,7 +697,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360001 | s
h | f | 1360014 | c
(1 row)
SELECT
@ -690,7 +719,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360001 | 1 | 1 | 23
1360014 | 1 | 1 | 23
(1 row)
SELECT
@ -722,7 +751,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 1360002 | t
n | t | 1360015 | t
(1 row)
SELECT
@ -744,7 +773,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360002 | 1 | 2 | 0
1360015 | 1 | 2 | 0
(1 row)
SELECT
@ -771,19 +800,286 @@ Table "public.upgrade_reference_table_transaction_commit_1360014"
column1 | integer |
\c - - - :master_port
-- drop used tables to clean the workspace
DROP TABLE upgrade_reference_table_local;
DROP TABLE upgrade_reference_table_multiple_shard;
DROP TABLE upgrade_reference_table_no_shard;
DROP TABLE upgrade_reference_table_referencing;
DROP TABLE upgrade_reference_table_referenced;
DROP TABLE upgrade_reference_table_unhealthy;
DROP TABLE upgrade_reference_table_composite;
DROP TYPE upgrade_test_composite_type;
DROP TABLE upgrade_reference_table_reference;
DROP TABLE upgrade_reference_table_append;
DROP TABLE upgrade_reference_table_one_worker;
DROP TABLE upgrade_reference_table_one_unhealthy;
DROP TABLE upgrade_reference_table_both_healthy;
DROP TABLE upgrade_reference_table_transaction_rollback;
DROP TABLE upgrade_reference_table_transaction_commit;
-- create an mx table
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_mx(column1 int);
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
create_distributed_table
--------------------------
(1 row)
-- verify that streaming replicated tables cannot be upgraded to reference tables
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360016 | s
(1 row)
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
shardid | shardminvalueisnull | shardmaxvalueisnull
---------+---------------------+---------------------
1360015 | f | f
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360016 | 1 | 1 | 23
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360015 | 1 | 0 | localhost | 57637
(1 row)
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
ERROR: cannot upgrade to reference table
DETAIL: Upgrade is only supported for statement-based replicated tables but "upgrade_reference_table_mx" is streaming replicated
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360016 | s
(1 row)
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
shardid | shardminvalueisnull | shardmaxvalueisnull
---------+---------------------+---------------------
1360015 | f | f
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360016 | 1 | 1 | 23
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360015 | 1 | 0 | localhost | 57637
(1 row)
DROP TABLE upgrade_reference_table_mx;
-- test valid cases, do it with MX
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
CREATE TABLE upgrade_reference_table_mx(column1 int);
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
create_distributed_table
--------------------------
(1 row)
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE nodeport = :worker_2_port AND
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='upgrade_reference_table_mx'::regclass);
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360017 | c
(1 row)
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
shardid | shardminvalueisnull | shardmaxvalueisnull
---------+---------------------+---------------------
1360016 | f | f
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360017 | 1 | 2 | 23
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360016 | 1 | 0 | localhost | 57637
1360016 | 3 | 0 | localhost | 57638
(2 rows)
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
upgrade_to_reference_table
----------------------------
(1 row)
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 1360018 | t
(1 row)
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
shardid | shardminvalueisnull | shardmaxvalueisnull
---------+---------------------+---------------------
1360016 | t | t
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360018 | 1 | 2 | 0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360016 | 1 | 0 | localhost | 57637
1360016 | 1 | 0 | localhost | 57638
(2 rows)
-- situation on metadata worker
\c - - - :worker_1_port
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 1360018 | t
(1 row)
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
shardid | shardminvalueisnull | shardmaxvalueisnull
---------+---------------------+---------------------
1360016 | t | t
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360016 | 1 | 0 | localhost | 57637
1360016 | 1 | 0 | localhost | 57638
(2 rows)
\c - - - :master_port
DROP TABLE upgrade_reference_table_mx;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------
(1 row)

View File

@ -231,6 +231,9 @@ SET citus.shard_replication_factor TO 1;
CREATE TABLE replicate_reference_table_hash(column1 int);
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='replicate_reference_table_hash'::regclass;
CREATE TABLE replicate_reference_table_reference_two(column1 int);
-- status before master_add_node
@ -253,7 +256,8 @@ SELECT
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY logicalrelid;
BEGIN;
SELECT master_add_node('localhost', :worker_2_port);
@ -283,7 +287,9 @@ SELECT
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two');
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY
logicalrelid;
DROP TABLE replicate_reference_table_reference_one;
DROP TABLE replicate_reference_table_hash;

View File

@ -11,17 +11,20 @@ ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000;
-- test with not distributed table
CREATE TABLE upgrade_reference_table_local(column1 int);
SELECT upgrade_to_reference_table('upgrade_reference_table_local');
DROP TABLE upgrade_reference_table_local;
-- test with table which has more than one shard
SET citus.shard_count TO 4;
CREATE TABLE upgrade_reference_table_multiple_shard(column1 int);
SELECT create_distributed_table('upgrade_reference_table_multiple_shard', 'column1');
SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard');
DROP TABLE upgrade_reference_table_multiple_shard;
-- test with table which has no shard
CREATE TABLE upgrade_reference_table_no_shard(column1 int);
SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append');
SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard');
DROP TABLE upgrade_reference_table_no_shard;
-- test with table with foreign keys
SET citus.shard_count TO 1;
@ -32,14 +35,23 @@ SELECT create_distributed_table('upgrade_reference_table_referenced', 'column1')
CREATE TABLE upgrade_reference_table_referencing(column1 int REFERENCES upgrade_reference_table_referenced(column1));
SELECT create_distributed_table('upgrade_reference_table_referencing', 'column1');
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referenced'::regclass;
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referencing'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_referenced');
SELECT upgrade_to_reference_table('upgrade_reference_table_referencing');
DROP TABLE upgrade_reference_table_referencing;
DROP TABLE upgrade_reference_table_referenced;
-- test with no healthy placements
CREATE TABLE upgrade_reference_table_unhealthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass;
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006;
SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy');
DROP TABLE upgrade_reference_table_unhealthy;
-- test with table containing composite type
CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text);
@ -52,12 +64,15 @@ SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_composite(column1 int, column2 upgrade_test_composite_type);
SELECT create_distributed_table('upgrade_reference_table_composite', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_composite');
DROP TABLE upgrade_reference_table_composite;
-- test with reference table
CREATE TABLE upgrade_reference_table_reference(column1 int);
SELECT create_reference_table('upgrade_reference_table_reference');
SELECT upgrade_to_reference_table('upgrade_reference_table_reference');
DROP TABLE upgrade_reference_table_reference;
-- test valid cases, append distributed table
CREATE TABLE upgrade_reference_table_append(column1 int);
@ -133,10 +148,13 @@ WHERE shardid IN
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass)
ORDER BY
nodeport;
DROP TABLE upgrade_reference_table_append;
-- test valid cases, shard exists at one worker
CREATE TABLE upgrade_reference_table_one_worker(column1 int);
SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_one_worker'::regclass;
-- situation before upgrade_reference_table
SELECT
@ -201,6 +219,8 @@ WHERE shardid IN
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass)
ORDER BY
nodeport;
DROP TABLE upgrade_reference_table_one_worker;
-- test valid cases, shard exists at both workers but one is unhealthy
SET citus.shard_replication_factor TO 2;
@ -273,6 +293,8 @@ WHERE shardid IN
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass)
ORDER BY
nodeport;
DROP TABLE upgrade_reference_table_one_unhealthy;
-- test valid cases, shard exists at both workers and both are healthy
CREATE TABLE upgrade_reference_table_both_healthy(column1 int);
@ -343,11 +365,14 @@ WHERE shardid IN
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass)
ORDER BY
nodeport;
DROP TABLE upgrade_reference_table_both_healthy;
-- test valid cases, do it in transaction and ROLLBACK
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int);
SELECT create_distributed_table('upgrade_reference_table_transaction_rollback', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_rollback'::regclass;
-- situation before upgrade_reference_table
SELECT
@ -396,8 +421,11 @@ SELECT
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
-- eliminate the duplicate intermediate duplicate rows in pg_dist_colocation
VACUUM ANALYZE pg_dist_colocation;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
@ -412,11 +440,14 @@ WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
DROP TABLE upgrade_reference_table_transaction_rollback;
-- test valid cases, do it in transaction and COMMIT
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_transaction_commit(column1 int);
SELECT create_distributed_table('upgrade_reference_table_transaction_commit', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_commit'::regclass;
-- situation before upgrade_reference_table
SELECT
@ -489,20 +520,185 @@ ORDER BY
\d upgrade_reference_table_transaction_commit_*
\c - - - :master_port
-- drop used tables to clean the workspace
DROP TABLE upgrade_reference_table_local;
DROP TABLE upgrade_reference_table_multiple_shard;
DROP TABLE upgrade_reference_table_no_shard;
DROP TABLE upgrade_reference_table_referencing;
DROP TABLE upgrade_reference_table_referenced;
DROP TABLE upgrade_reference_table_unhealthy;
DROP TABLE upgrade_reference_table_composite;
DROP TYPE upgrade_test_composite_type;
DROP TABLE upgrade_reference_table_reference;
DROP TABLE upgrade_reference_table_append;
DROP TABLE upgrade_reference_table_one_worker;
DROP TABLE upgrade_reference_table_one_unhealthy;
DROP TABLE upgrade_reference_table_both_healthy;
DROP TABLE upgrade_reference_table_transaction_rollback;
DROP TABLE upgrade_reference_table_transaction_commit;
-- create an mx table
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_mx(column1 int);
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
-- verify that streaming replicated tables cannot be upgraded to reference tables
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
DROP TABLE upgrade_reference_table_mx;
-- test valid cases, do it with MX
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
CREATE TABLE upgrade_reference_table_mx(column1 int);
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE nodeport = :worker_2_port AND
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='upgrade_reference_table_mx'::regclass);
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
-- situation on metadata worker
\c - - - :worker_1_port
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
\c - - - :master_port
DROP TABLE upgrade_reference_table_mx;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);